Skip to content

Commit

Permalink
AIP-81 Add DAG Report API (#45393)
Browse files Browse the repository at this point in the history
* AIP-81 Add DAG Report API

* Fix CodeQL security issue

* Refactor based on code review

- check subdir startwith DAGS_FOLDER
- fix test detail

* Fix test detail

* Fix list_py_file_paths in test

* Normalize input path
  • Loading branch information
jason810496 authored Jan 21, 2025
1 parent 2d9ab54 commit d151ab9
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 0 deletions.
40 changes: 40 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import timedelta

from airflow.api_fastapi.core_api.base import BaseModel


class DagReportResponse(BaseModel):
"""DAG Report serializer for responses."""

file: str
duration: timedelta
dag_num: int
task_num: int
dags: str
warning_num: int


class DagReportCollectionResponse(BaseModel):
"""DAG Report Collection serializer for responses."""

dag_reports: list[DagReportResponse]
total_entries: int
44 changes: 44 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,50 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagReports:
get:
tags:
- DagReport
summary: Get Dag Report
description: Get DAG report.
operationId: get_dag_report
parameters:
- name: subdir
in: query
required: true
schema:
type: string
title: Subdir
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/config:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airflow.api_fastapi.core_api.routes.public.config import config_router
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_parsing import dag_parsing_router
from airflow.api_fastapi.core_api.routes.public.dag_report import dag_report_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
Expand Down Expand Up @@ -60,6 +61,7 @@
authenticated_router.include_router(dag_run_router)
authenticated_router.include_router(dag_sources_router)
authenticated_router.include_router(dag_stats_router)
authenticated_router.include_router(dag_report_router)
authenticated_router.include_router(config_router)
authenticated_router.include_router(dag_warning_router)
authenticated_router.include_router(dags_router)
Expand Down
56 changes: 56 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import os
from typing import cast

from fastapi import HTTPException, status

from airflow import settings
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dag_report import (
DagReportCollectionResponse,
DagReportResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.dagbag import DagBag

dag_report_router = AirflowRouter(tags=["DagReport"], prefix="/dagReports")


@dag_report_router.get(
"",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
]
),
)
def get_dag_report(
subdir: str,
):
"""Get DAG report."""
fullpath = os.path.normpath(subdir)
if not fullpath.startswith(settings.DAGS_FOLDER):
raise HTTPException(status.HTTP_400_BAD_REQUEST, "subdir should be subpath of DAGS_FOLDER settings")
dagbag = DagBag(fullpath)
return DagReportCollectionResponse(
dag_reports=cast(list[DagReportResponse], dagbag.dagbag_stats),
total_entries=len(dagbag.dagbag_stats),
)
17 changes: 17 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ConfigService,
ConnectionService,
DagParsingService,
DagReportService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -647,6 +648,22 @@ export const UseDagStatsServiceGetDagStatsKeyFn = (
} = {},
queryKey?: Array<unknown>,
) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])];
export type DagReportServiceGetDagReportDefaultResponse = Awaited<
ReturnType<typeof DagReportService.getDagReport>
>;
export type DagReportServiceGetDagReportQueryResult<
TData = DagReportServiceGetDagReportDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagReportServiceGetDagReportKey = "DagReportServiceGetDagReport";
export const UseDagReportServiceGetDagReportKeyFn = (
{
subdir,
}: {
subdir: string;
},
queryKey?: Array<unknown>,
) => [useDagReportServiceGetDagReportKey, ...(queryKey ?? [{ subdir }])];
export type DagWarningServiceListDagWarningsDefaultResponse = Awaited<
ReturnType<typeof DagWarningService.listDagWarnings>
>;
Expand Down
21 changes: 21 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagReportService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -882,6 +883,26 @@ export const prefetchUseDagStatsServiceGetDagStats = (
queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }),
queryFn: () => DagStatsService.getDagStats({ dagIds }),
});
/**
* Get Dag Report
* Get DAG report.
* @param data The data for the request.
* @param data.subdir
* @returns unknown Successful Response
* @throws ApiError
*/
export const prefetchUseDagReportServiceGetDagReport = (
queryClient: QueryClient,
{
subdir,
}: {
subdir: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }),
queryFn: () => DagReportService.getDagReport({ subdir }),
});
/**
* List Dag Warnings
* Get a list of DAG warnings.
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ConfigService,
ConnectionService,
DagParsingService,
DagReportService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -1070,6 +1071,32 @@ export const useDagStatsServiceGetDagStats = <
queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
...options,
});
/**
* Get Dag Report
* Get DAG report.
* @param data The data for the request.
* @param data.subdir
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagReportServiceGetDagReport = <
TData = Common.DagReportServiceGetDagReportDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
subdir,
}: {
subdir: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }, queryKey),
queryFn: () => DagReportService.getDagReport({ subdir }) as TData,
...options,
});
/**
* List Dag Warnings
* Get a list of DAG warnings.
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagReportService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -1049,6 +1050,32 @@ export const useDagStatsServiceGetDagStatsSuspense = <
queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData,
...options,
});
/**
* Get Dag Report
* Get DAG report.
* @param data The data for the request.
* @param data.subdir
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagReportServiceGetDagReportSuspense = <
TData = Common.DagReportServiceGetDagReportDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
subdir,
}: {
subdir: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }, queryKey),
queryFn: () => DagReportService.getDagReport({ subdir }) as TData,
...options,
});
/**
* List Dag Warnings
* Get a list of DAG warnings.
Expand Down
28 changes: 28 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ import type {
GetDagSourceResponse,
GetDagStatsData,
GetDagStatsResponse,
GetDagReportData,
GetDagReportResponse,
ListDagWarningsData,
ListDagWarningsResponse,
GetDagsData,
Expand Down Expand Up @@ -1520,6 +1522,32 @@ export class DagStatsService {
}
}

export class DagReportService {
/**
* Get Dag Report
* Get DAG report.
* @param data The data for the request.
* @param data.subdir
* @returns unknown Successful Response
* @throws ApiError
*/
public static getDagReport(data: GetDagReportData): CancelablePromise<GetDagReportResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dagReports",
query: {
subdir: data.subdir,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
422: "Validation Error",
},
});
}
}

export class DagWarningService {
/**
* List Dag Warnings
Expand Down
Loading

0 comments on commit d151ab9

Please sign in to comment.