Skip to content

Commit 347c98c

Browse files
authored
Move BaseSensorOperator to TaskSDK definitions (#48244)
closes #45743
1 parent ee97610 commit 347c98c

File tree

39 files changed

+1284
-2291
lines changed

39 files changed

+1284
-2291
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import Annotated
2121
from uuid import UUID
2222

23-
from fastapi import HTTPException, Query, status
23+
from fastapi import Query, status
2424
from sqlalchemy import select
2525

2626
from airflow.api_fastapi.common.db.common import SessionDep
@@ -39,7 +39,8 @@
3939
@router.get("/{task_instance_id}/start_date")
4040
def get_start_date(
4141
task_instance_id: UUID, session: SessionDep, try_number: Annotated[int, Query()] = 1
42-
) -> UtcDateTime:
42+
) -> UtcDateTime | None:
43+
"""Get the first reschedule date if found, None if no records exist."""
4344
start_date = session.scalar(
4445
select(TaskReschedule)
4546
.where(
@@ -50,7 +51,5 @@ def get_start_date(
5051
.with_only_columns(TaskReschedule.start_date)
5152
.limit(1)
5253
)
53-
if start_date is None:
54-
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
5554

5655
return start_date

airflow-core/src/airflow/models/baseoperator.py

-10
Original file line numberDiff line numberDiff line change
@@ -386,16 +386,6 @@ def pre_execute(self, context: Any):
386386
logger=self.log,
387387
).run(context)
388388

389-
def execute(self, context: Context) -> Any:
390-
"""
391-
Derive when creating an operator.
392-
393-
Context is the same dictionary used as when rendering jinja templates.
394-
395-
Refer to get_template_context for more context.
396-
"""
397-
raise NotImplementedError()
398-
399389
@apply_lineage
400390
def post_execute(self, context: Any, result: Any = None):
401391
"""

airflow-core/src/airflow/models/taskinstance.py

+10
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,16 @@ def get_failed_dep_statuses(self, dep_context: DepContext | None = None, session
23242324
if TYPE_CHECKING:
23252325
assert isinstance(self.task, BaseOperator)
23262326

2327+
if not hasattr(self.task, "deps"):
2328+
# These deps are not on BaseOperator since they are only needed and evaluated
2329+
# in the scheduler and not needed at the Runtime.
2330+
from airflow.serialization.serialized_objects import SerializedBaseOperator
2331+
2332+
serialized_op = SerializedBaseOperator.deserialize_operator(
2333+
SerializedBaseOperator.serialize_operator(self.task)
2334+
)
2335+
setattr(self.task, "deps", serialized_op.deps) # type: ignore[union-attr]
2336+
23272337
dep_context = dep_context or DepContext()
23282338
for dep in dep_context.deps | self.task.deps:
23292339
for dep_status in dep.get_dep_statuses(self, session, dep_context):

airflow-core/src/airflow/sensors/README.md

-24
This file was deleted.

airflow-core/src/airflow/sensors/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from airflow.utils.deprecation_tools import add_deprecated_classes
2828

29+
# TODO: Add definition from Task SDK here and remove `base.py` file
2930
__deprecated_classes = {
3031
"python":{
3132
"PythonSensor": "airflow.providers.standard.sensors.python.PythonSensor",

0 commit comments

Comments
 (0)