Skip to content

Commit

Permalink
Removes AIP-44 from dag processing manager and internal_api_call (apa…
Browse files Browse the repository at this point in the history
…che#44549)

This is the last "internal_api_call" change from AIP-44 removal
and it not only removes the dag_processsing/manager calls but also
the internal_api_call itself.

Part of apache#44436
  • Loading branch information
potiuk authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent f11e084 commit 0c0d305
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 48 deletions.
34 changes: 0 additions & 34 deletions airflow/api_internal/internal_api_call.py

This file was deleted.

15 changes: 1 addition & 14 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from tabulate import tabulate

import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
Expand Down Expand Up @@ -499,7 +498,6 @@ def _scan_stale_dags(self):
self.last_deactivate_stale_dags_time = timezone.utcnow()

@classmethod
@internal_api_call
@provide_session
def deactivate_stale_dags(
cls,
Expand Down Expand Up @@ -698,23 +696,14 @@ def _run_parsing_loop(self):
poll_time = 0.0

@classmethod
@internal_api_call
@provide_session
@retry_db_transaction
def _fetch_callbacks(
cls,
max_callbacks: int,
standalone_dag_processor: bool,
dag_directory: str,
session: Session = NEW_SESSION,
) -> list[CallbackRequest]:
return cls._fetch_callbacks_with_retries(
max_callbacks, standalone_dag_processor, dag_directory, session
)

@classmethod
@retry_db_transaction
def _fetch_callbacks_with_retries(
cls, max_callbacks: int, standalone_dag_processor: bool, dag_directory: str, session: Session
) -> list[CallbackRequest]:
"""Fetch callbacks from database and add them to the internal queue for execution."""
cls.logger().debug("Fetching callbacks from the database.")
Expand Down Expand Up @@ -765,7 +754,6 @@ def _refresh_requested_filelocs(self) -> None:
self._file_path_queue.appendleft(fileloc)

@classmethod
@internal_api_call
@provide_session
def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
"""Get filelocs from DB table."""
Expand Down Expand Up @@ -831,7 +819,6 @@ def _print_stat(self):
self.last_stat_print_time = time.monotonic()

@staticmethod
@internal_api_call
@provide_session
def clear_nonexistent_import_errors(
file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION
Expand Down

0 comments on commit 0c0d305

Please sign in to comment.