Skip to content

Commit

Permalink
AIP-84 Migrate /object/grid_data from views to FastAPI (#44332)
Browse files Browse the repository at this point in the history
* Include grid endpoint to FastAPI

* Finalise the endpoint other than dynamically created tasks and address comments

* Finalise the endpoint with dynamically mapped tasks and include unit test

* Use SessionDep for session param

* Create service/grid.py and move methods, adjust usage of SortParam, use SortParam rather than order_expressions, remove unnecessary version check, include additional checks for MappedTaskGroups, Move include upstream and downstream to parameters.py, remove unrelated comment and include task and dagrun notes

* Fix unit test according to adjustments

* Include missing MappedTaskGroup parent check to MappedOperator

* Remove remaining version check

* Move service/ to services/ui/, make sort_param unique for dag_run_sort_param, remove unreachable statement from parameters.py::_transform_ti_states, make include upstream/downstream Annotated optional and include new test for upstream/downstream

* Fix SortParam creation

* Revert changes in QueryLastDagRunStateFilter

* include missing task_count to parent_node for recursive taskgroups, fix loop order for calculating overall_state, fix sorting and include user to pass it as parameter and include new test for order_by

* Rename num_runs to limit for consistency, make base_date filter range query

* Fix task_count and states for nested task_groups, add alias to run_id, change types of Pydantic models

* Rebase and rerun pre-commit

* Change GridTaskInstanceSummary state to TaskInstanceState object

* Fix setting state in GridTaskInstanceSummary, change name states to child_states to prevent confusion

* Select all model columns, move priority to paramteres as state_priority, decide the order_by with first element of timetable.run_ordering, add __or__ method to SortParam, calculate overall_ti_state after proper additions in child_states, add more test cases around order_by, limit, filtering such as logical_date_gte/lte, make test 3 depth nested task group

* Fix run_type and state param not working due to naming and include remaining filter in unit test, make tests use params and parametrize test methods

* Move SortParam not provided comparison logic to view

* Remove forgotten code piece

* Remove None from param definition, adjust the log of wrong DagRun type in parameters and test
  • Loading branch information
bugraoz93 authored Dec 23, 2024
1 parent 7002966 commit 43d7c1c
Show file tree
Hide file tree
Showing 18 changed files with 4,781 additions and 16 deletions.
52 changes: 52 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from airflow.typing_compat import Self
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
from sqlalchemy.sql import ColumnElement, Select
Expand Down Expand Up @@ -528,6 +529,32 @@ def _transform_dag_run_states(states: Iterable[str] | None) -> list[DagRunState
),
]


def _transform_dag_run_types(types: list[str] | None) -> list[DagRunType | None] | None:
try:
if not types:
return None
return [None if run_type in ("none", None) else DagRunType(run_type) for run_type in types]
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid value for run type. Valid values are {', '.join(DagRunType)}",
)


QueryDagRunRunTypesFilter = Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
attribute=DagRun.run_type,
_type=list[str],
filter_option=FilterOptionEnum.ANY_EQUAL,
default_factory=list,
transform_callable=_transform_dag_run_types,
)
),
]

# DAGTags
QueryDagTagPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(DagTag.name, "tag_name_pattern"))
Expand Down Expand Up @@ -601,3 +628,28 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
QueryVariableKeyPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(Variable.key, "variable_key_pattern"))
]


# UI Shared
def _optional_boolean(value: bool | None) -> bool | None:
return value if value is not None else False


QueryIncludeUpstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]
QueryIncludeDownstream = Annotated[Union[bool], AfterValidator(_optional_boolean)]

state_priority: list[None | TaskInstanceState] = [
TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.QUEUED,
TaskInstanceState.SCHEDULED,
TaskInstanceState.DEFERRED,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
None,
TaskInstanceState.SUCCESS,
TaskInstanceState.SKIPPED,
TaskInstanceState.REMOVED,
]
62 changes: 62 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/ui/grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime
from uuid import UUID

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType


class GridTaskInstanceSummary(BaseModel):
"""Task Instance Summary model for the Grid UI."""

task_id: str
try_number: int
start_date: datetime | None
end_date: datetime | None
queued_dttm: datetime | None
child_states: dict[str, int] | None
task_count: int
state: TaskInstanceState | None
note: str | None


class GridDAGRunwithTIs(BaseModel):
"""DAG Run model for the Grid UI."""

run_id: str = Field(serialization_alias="dag_run_id", validation_alias="run_id")
queued_at: datetime | None
start_date: datetime | None
end_date: datetime | None
state: DagRunState
run_type: DagRunType
data_interval_start: datetime | None
data_interval_end: datetime | None
version_number: UUID | None
note: str | None
task_instances: list[GridTaskInstanceSummary]


class GridResponse(BaseModel):
"""Response model for the Grid UI."""

dag_runs: list[GridDAGRunwithTIs]
Loading

0 comments on commit 43d7c1c

Please sign in to comment.