Skip to content

Commit

Permalink
Remove AIP-44 code from renderedtifields.py (apache#44546)
Browse files Browse the repository at this point in the history
Part of apache#44436
  • Loading branch information
potiuk authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent 27a7256 commit bd80719
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
18 changes: 0 additions & 18 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.models.base import StringID, TaskInstanceDependencies
from airflow.serialization.helpers import serialize_template_field
Expand Down Expand Up @@ -154,23 +153,6 @@ def _redact(self):
for field, rendered in self.rendered_fields.items():
self.rendered_fields[field] = redact(rendered, field)

@classmethod
@internal_api_call
@provide_session
def _update_runtime_evaluated_template_fields(
cls, ti: TaskInstance, session: Session = NEW_SESSION
) -> None:
"""Update rendered task instance fields for cases where runtime evaluated, not templated."""
# Note: Need lazy import to break the partly loaded class loop
from airflow.models.taskinstance import TaskInstance

# If called via remote API the DAG needs to be re-loaded
TaskInstance.ensure_dag(ti, session=session)

rtif = RenderedTaskInstanceFields(ti)
RenderedTaskInstanceFields.write(rtif, session=session)
RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session)

@classmethod
@provide_session
def get_templated_fields(cls, ti: TaskInstance, session: Session = NEW_SESSION) -> dict | None:
Expand Down
18 changes: 16 additions & 2 deletions providers/src/airflow/providers/standard/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
from airflow.models.baseoperator import BaseOperator
from airflow.providers.standard.hooks.subprocess import SubprocessHook, SubprocessResult, working_directory
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import ArgNotSet

if TYPE_CHECKING:
from sqlalchemy.orm import Session as SASession

from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

Expand Down Expand Up @@ -199,8 +202,10 @@ def subprocess_hook(self):
"""Returns hook for running the bash command."""
return SubprocessHook()

# TODO: This should be replaced with Task SDK API call
@staticmethod
def refresh_bash_command(ti: TaskInstance) -> None:
@provide_session
def refresh_bash_command(ti: TaskInstance, session: SASession = NEW_SESSION) -> None:
"""
Rewrite the underlying rendered bash_command value for a task instance in the metadatabase.
Expand All @@ -212,7 +217,16 @@ def refresh_bash_command(ti: TaskInstance) -> None:
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields

RenderedTaskInstanceFields._update_runtime_evaluated_template_fields(ti)
"""Update rendered task instance fields for cases where runtime evaluated, not templated."""
# Note: Need lazy import to break the partly loaded class loop
from airflow.models.taskinstance import TaskInstance

# If called via remote API the DAG needs to be re-loaded
TaskInstance.ensure_dag(ti, session=session)

rtif = RenderedTaskInstanceFields(ti)
RenderedTaskInstanceFields.write(rtif, session=session)
RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session)

def get_env(self, context) -> dict:
"""Build the set of environment variables to be exposed for the bash command."""
Expand Down

0 comments on commit bd80719

Please sign in to comment.