diff --git a/airflow/api_fastapi/core_api/datamodels/dag_report.py b/airflow/api_fastapi/core_api/datamodels/dag_report.py new file mode 100644 index 0000000000000..aeab3f91af91b --- /dev/null +++ b/airflow/api_fastapi/core_api/datamodels/dag_report.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import timedelta + +from airflow.api_fastapi.core_api.base import BaseModel + + +class DagReportResponse(BaseModel): + """DAG Report serializer for responses.""" + + file: str + duration: timedelta + dag_num: int + task_num: int + dags: str + warning_num: int + + +class DagReportCollectionResponse(BaseModel): + """DAG Report Collection serializer for responses.""" + + dag_reports: list[DagReportResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index ed0ce173bc55b..5eb90ce235da6 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2588,6 +2588,50 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dagReports: + get: + tags: + - DagReport + summary: Get Dag Report + description: Get DAG report. + operationId: get_dag_report + parameters: + - name: subdir + in: query + required: true + schema: + type: string + title: Subdir + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/config: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index 2a7c81782a9da..89465177845d3 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -26,6 +26,7 @@ from airflow.api_fastapi.core_api.routes.public.config import config_router from airflow.api_fastapi.core_api.routes.public.connections import connections_router from airflow.api_fastapi.core_api.routes.public.dag_parsing import dag_parsing_router +from airflow.api_fastapi.core_api.routes.public.dag_report import dag_report_router from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router @@ -60,6 +61,7 @@ authenticated_router.include_router(dag_run_router) authenticated_router.include_router(dag_sources_router) authenticated_router.include_router(dag_stats_router) +authenticated_router.include_router(dag_report_router) authenticated_router.include_router(config_router) authenticated_router.include_router(dag_warning_router) authenticated_router.include_router(dags_router) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_report.py b/airflow/api_fastapi/core_api/routes/public/dag_report.py new file mode 100644 index 0000000000000..03dfd5aad9e78 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/dag_report.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import os +from typing import cast + +from fastapi import HTTPException, status + +from airflow import settings +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.dag_report import ( + DagReportCollectionResponse, + DagReportResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.models.dagbag import DagBag + +dag_report_router = AirflowRouter(tags=["DagReport"], prefix="/dagReports") + + +@dag_report_router.get( + "", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + ] + ), +) +def get_dag_report( + subdir: str, +): + """Get DAG report.""" + fullpath = os.path.normpath(subdir) + if not fullpath.startswith(settings.DAGS_FOLDER): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "subdir should be subpath of DAGS_FOLDER settings") + dagbag = DagBag(fullpath) + return DagReportCollectionResponse( + dag_reports=cast(list[DagReportResponse], dagbag.dagbag_stats), + total_entries=len(dagbag.dagbag_stats), + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b6cf77099005b..6e6a4c480460b 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -7,6 +7,7 @@ import { ConfigService, ConnectionService, DagParsingService, + DagReportService, DagRunService, DagService, DagSourceService, @@ -647,6 +648,22 @@ export const UseDagStatsServiceGetDagStatsKeyFn = ( } = {}, queryKey?: Array, ) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])]; +export type DagReportServiceGetDagReportDefaultResponse = Awaited< + ReturnType +>; +export type DagReportServiceGetDagReportQueryResult< + TData = DagReportServiceGetDagReportDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useDagReportServiceGetDagReportKey = "DagReportServiceGetDagReport"; +export const UseDagReportServiceGetDagReportKeyFn = ( + { + subdir, + }: { + subdir: string; + }, + queryKey?: Array, +) => [useDagReportServiceGetDagReportKey, ...(queryKey ?? [{ subdir }])]; export type DagWarningServiceListDagWarningsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 82db0eccdedeb..975347cfe9cb5 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -6,6 +6,7 @@ import { BackfillService, ConfigService, ConnectionService, + DagReportService, DagRunService, DagService, DagSourceService, @@ -882,6 +883,26 @@ export const prefetchUseDagStatsServiceGetDagStats = ( queryKey: Common.UseDagStatsServiceGetDagStatsKeyFn({ dagIds }), queryFn: () => DagStatsService.getDagStats({ dagIds }), }); +/** + * Get Dag Report + * Get DAG report. + * @param data The data for the request. + * @param data.subdir + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseDagReportServiceGetDagReport = ( + queryClient: QueryClient, + { + subdir, + }: { + subdir: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }), + queryFn: () => DagReportService.getDagReport({ subdir }), + }); /** * List Dag Warnings * Get a list of DAG warnings. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index a43172d73c7ca..decc943fecb7f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -7,6 +7,7 @@ import { ConfigService, ConnectionService, DagParsingService, + DagReportService, DagRunService, DagService, DagSourceService, @@ -1070,6 +1071,32 @@ export const useDagStatsServiceGetDagStats = < queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData, ...options, }); +/** + * Get Dag Report + * Get DAG report. + * @param data The data for the request. + * @param data.subdir + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagReportServiceGetDagReport = < + TData = Common.DagReportServiceGetDagReportDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + subdir, + }: { + subdir: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }, queryKey), + queryFn: () => DagReportService.getDagReport({ subdir }) as TData, + ...options, + }); /** * List Dag Warnings * Get a list of DAG warnings. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index f56b431a4c537..4e67c0f459c36 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -6,6 +6,7 @@ import { BackfillService, ConfigService, ConnectionService, + DagReportService, DagRunService, DagService, DagSourceService, @@ -1049,6 +1050,32 @@ export const useDagStatsServiceGetDagStatsSuspense = < queryFn: () => DagStatsService.getDagStats({ dagIds }) as TData, ...options, }); +/** + * Get Dag Report + * Get DAG report. + * @param data The data for the request. + * @param data.subdir + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagReportServiceGetDagReportSuspense = < + TData = Common.DagReportServiceGetDagReportDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + subdir, + }: { + subdir: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseDagReportServiceGetDagReportKeyFn({ subdir }, queryKey), + queryFn: () => DagReportService.getDagReport({ subdir }) as TData, + ...options, + }); /** * List Dag Warnings * Get a list of DAG warnings. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 7f888aca34da8..d5d605f31bd45 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -93,6 +93,8 @@ import type { GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, + GetDagReportData, + GetDagReportResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, @@ -1520,6 +1522,32 @@ export class DagStatsService { } } +export class DagReportService { + /** + * Get Dag Report + * Get DAG report. + * @param data The data for the request. + * @param data.subdir + * @returns unknown Successful Response + * @throws ApiError + */ + public static getDagReport(data: GetDagReportData): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dagReports", + query: { + subdir: data.subdir, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 422: "Validation Error", + }, + }); + } +} + export class DagWarningService { /** * List Dag Warnings diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 81925913fba42..3e766e12f13e3 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1800,6 +1800,12 @@ export type GetDagStatsData = { export type GetDagStatsResponse = DagStatsCollectionResponse; +export type GetDagReportData = { + subdir: string; +}; + +export type GetDagReportResponse = unknown; + export type ListDagWarningsData = { dagId?: string | null; limit?: number; @@ -3469,6 +3475,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dagReports": { + get: { + req: GetDagReportData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dagWarnings": { get: { req: ListDagWarningsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_report.py b/tests/api_fastapi/core_api/routes/public/test_dag_report.py new file mode 100644 index 0000000000000..4a72d16c2df54 --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_dag_report.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest + +from airflow.utils.file import list_py_file_paths + +from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_dags + +pytestmark = pytest.mark.db_test + +TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"] +TEST_DAG_FOLDER_WITH_SUBDIR = f"{TEST_DAG_FOLDER}/subdir2" +TEST_DAG_FOLDER_INVALID = "/invalid/path" +TEST_DAG_FOLDER_INVALID_2 = "/root/airflow/tests/dags/" + + +def get_corresponding_dag_file_count(dir: str, include_examples: bool = True) -> int: + return len(list_py_file_paths(directory=dir)) + ( + len(list_py_file_paths("/opt/airflow/airflow/example_dags")) if include_examples else 0 + ) + + +class TestDagReportEndpoint: + @pytest.fixture(autouse=True) + def setup(self) -> None: + self.clear_db() + + def teardown_method(self) -> None: + self.clear_db() + + def clear_db(self): + clear_db_dags() + + @pytest.mark.parametrize( + "subdir,include_example,expected_total_entries", + [ + pytest.param( + TEST_DAG_FOLDER, + True, + get_corresponding_dag_file_count(TEST_DAG_FOLDER), + id="dag_path_with_example", + ), + pytest.param( + TEST_DAG_FOLDER, + False, + get_corresponding_dag_file_count(TEST_DAG_FOLDER, False), + id="dag_path_without_example", + ), + pytest.param( + TEST_DAG_FOLDER_WITH_SUBDIR, + True, + get_corresponding_dag_file_count(TEST_DAG_FOLDER_WITH_SUBDIR), + id="dag_path_subdir_with_example", + ), + pytest.param( + TEST_DAG_FOLDER_WITH_SUBDIR, + False, + get_corresponding_dag_file_count(TEST_DAG_FOLDER_WITH_SUBDIR, False), + id="dag_path_subdir_without_example", + ), + ], + ) + def test_should_response_200(self, test_client, subdir, include_example, expected_total_entries): + with conf_vars({("core", "load_examples"): str(include_example)}): + response = test_client.get("/public/dagReports", params={"subdir": subdir}) + assert response.status_code == 200 + response_json = response.json() + assert response_json["total_entries"] == expected_total_entries + + def test_should_response_200_with_empty_dagbag(self, test_client): + # the constructor of DagBag will call `collect_dags` method and store the result in `dagbag_stats` + def _mock_collect_dags(self, *args, **kwargs): + self.dagbag_stats = [] + + with patch("airflow.models.dagbag.DagBag.collect_dags", _mock_collect_dags): + response = test_client.get("/public/dagReports", params={"subdir": TEST_DAG_FOLDER}) + assert response.status_code == 200 + assert response.json() == {"dag_reports": [], "total_entries": 0} + + @pytest.mark.parametrize( + "subdir", + [ + pytest.param(TEST_DAG_FOLDER_INVALID, id="invalid_dag_path"), + pytest.param(TEST_DAG_FOLDER_INVALID_2, id="invalid_dag_path_2"), + ], + ) + def test_should_response_400(self, test_client, subdir): + response = test_client.get("/public/dagReports", params={"subdir": subdir}) + assert response.status_code == 400 + assert response.json() == {"detail": "subdir should be subpath of DAGS_FOLDER settings"} + + def test_should_response_422(self, test_client): + response = test_client.get("/public/dagReports") + assert response.status_code == 422 + assert response.json() == { + "detail": [ + { + "input": None, + "loc": ["query", "subdir"], + "msg": "Field required", + "type": "missing", + } + ] + }