Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AIP-44 from models/taskinstance #44510

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _get_ti(
)

ti_or_none = dag_run.get_task_instance(task.task_id, map_index=map_index, session=session)
ti: TaskInstance | TaskInstancePydantic
ti: TaskInstance
if ti_or_none is None:
if not create_if_necessary:
raise TaskInstanceNotFound(
Expand All @@ -249,9 +249,7 @@ def _get_ti(
return ti, dr_created


def _run_task_by_selected_method(
args, dag: DAG, ti: TaskInstance | TaskInstancePydantic
) -> None | TaskReturnCode:
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
"""
Run the task based on a mode.

Expand Down Expand Up @@ -308,7 +306,7 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
executor.end()


def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) -> TaskReturnCode | None:
def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None:
"""Run LocalTaskJob, which monitors the raw task execution process."""
job_runner = LocalTaskJobRunner(
job=Job(dag_id=ti.dag_id),
Expand Down Expand Up @@ -354,7 +352,7 @@ def _extract_external_executor_id(args) -> str | None:


@contextmanager
def _move_task_handlers_to_root(ti: TaskInstance | TaskInstancePydantic) -> Generator[None, None, None]:
def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]:
"""
Move handlers for task logging to root logger.

Expand All @@ -381,7 +379,7 @@ def _move_task_handlers_to_root(ti: TaskInstance | TaskInstancePydantic) -> Gene


@contextmanager
def _redirect_stdout_to_ti_log(ti: TaskInstance | TaskInstancePydantic) -> Generator[None, None, None]:
def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None, None]:
"""
Redirect stdout to ti logger.

Expand Down
3 changes: 1 addition & 2 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

from airflow.jobs.job import Job
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic

SIGSEGV_MESSAGE = """
******************************************* Received SIGSEGV *******************************************
Expand Down Expand Up @@ -83,7 +82,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
def __init__(
self,
job: Job,
task_instance: TaskInstance | TaskInstancePydantic,
task_instance: TaskInstance,
ignore_all_deps: bool = False,
ignore_depends_on_past: bool = False,
wait_for_past_depends_before_skipping: bool = False,
Expand Down
6 changes: 2 additions & 4 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from sqlalchemy.sql import FromClause

from airflow.models import Operator
from airflow.models.taskinstance import TaskInstance, TaskInstancePydantic
from airflow.models.taskinstance import TaskInstance


def get_serialized_template_fields(task: Operator):
Expand Down Expand Up @@ -173,9 +173,7 @@ def _update_runtime_evaluated_template_fields(

@classmethod
@provide_session
def get_templated_fields(
cls, ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION
) -> dict | None:
def get_templated_fields(cls, ti: TaskInstance, session: Session = NEW_SESSION) -> dict | None:
"""
Get templated field for a TaskInstance from the RenderedTaskInstanceFields table.

Expand Down
5 changes: 2 additions & 3 deletions airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from airflow.models.operator import Operator
from airflow.sdk.definitions.node import DAGNode
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic

# The key used by SkipMixin to store XCom data.
XCOM_SKIPMIXIN_KEY = "skipmixin_key"
Expand Down Expand Up @@ -153,7 +152,7 @@ def _skip(

def skip_all_except(
self,
ti: TaskInstance | TaskInstancePydantic,
ti: TaskInstance,
branch_task_ids: None | str | Iterable[str],
):
"""Facade for compatibility for call to internal API."""
Expand All @@ -167,7 +166,7 @@ def skip_all_except(
@provide_session
def _skip_all_except(
cls,
ti: TaskInstance | TaskInstancePydantic,
ti: TaskInstance,
branch_task_ids: None | str | Iterable[str],
session: Session = NEW_SESSION,
):
Expand Down
Loading