Skip to content

Commit 653d927

Browse files
committed
Remove scan_stale_dag method in DAG processor manager
This method has been replaced by `handle_removed_files` and `deactivate_deleted_dags` method. Its present now causes issues as it deactivates DAGs incorrectly. `handle_removed_files` is a better method more suited to dag bundles as the file's processor is also terminated. Also removed unused config variable in scheduler and config.yml Closes: apache#47294
1 parent c4e96d4 commit 653d927

File tree

5 files changed

+8
-163
lines changed

5 files changed

+8
-163
lines changed

airflow/cli/commands/remote_commands/config_command.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,10 @@ def message(self) -> str | None:
381381
config=ConfigParameter("scheduler", "deactivate_stale_dags_interval"),
382382
renamed_to=ConfigParameter("scheduler", "parsing_cleanup_interval"),
383383
),
384+
ConfigChange(
385+
config=ConfigParameter("scheduler", "dag_stale_not_seen_duration"),
386+
was_removed=True,
387+
),
384388
ConfigChange(
385389
config=ConfigParameter("scheduler", "statsd_on"), renamed_to=ConfigParameter("metrics", "statsd_on")
386390
),
@@ -442,7 +446,7 @@ def message(self) -> str | None:
442446
),
443447
ConfigChange(
444448
config=ConfigParameter("scheduler", "stale_dag_threshold"),
445-
renamed_to=ConfigParameter("dag_processor", "stale_dag_threshold"),
449+
was_removed=True,
446450
),
447451
ConfigChange(
448452
config=ConfigParameter("scheduler", "print_stats_interval"),

airflow/config_templates/config.yml

-19
Original file line numberDiff line numberDiff line change
@@ -2448,13 +2448,6 @@ scheduler:
24482448
type: boolean
24492449
example: ~
24502450
default: "True"
2451-
dag_stale_not_seen_duration:
2452-
description: |
2453-
Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
2454-
version_added: 2.4.0
2455-
type: integer
2456-
example: ~
2457-
default: "600"
24582451
use_job_schedule:
24592452
description: |
24602453
Turn off scheduler use of cron intervals by setting this to ``False``.
@@ -2726,18 +2719,6 @@ dag_processor:
27262719
type: integer
27272720
example: ~
27282721
default: "30"
2729-
stale_dag_threshold:
2730-
description: |
2731-
How long (in seconds) to wait after we have re-parsed a DAG file before deactivating stale
2732-
DAGs (DAGs which are no longer present in the expected files). The reason why we need
2733-
this threshold is to account for the time between when the file is parsed and when the
2734-
DAG is loaded. The absolute maximum that this could take is
2735-
``[dag_processor] dag_file_processor_timeout``, but when you have a long timeout configured,
2736-
it results in a significant delay in the deactivation of stale dags.
2737-
version_added: ~
2738-
type: integer
2739-
example: ~
2740-
default: "50"
27412722
dag_file_processor_timeout:
27422723
description: |
27432724
How long before timing out a DagFileProcessor, which processes a dag file

airflow/dag_processing/manager.py

+2-64
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@
3333
from collections import defaultdict, deque
3434
from collections.abc import Callable, Iterable, Iterator
3535
from dataclasses import dataclass, field
36-
from datetime import datetime, timedelta
36+
from datetime import datetime
3737
from importlib import import_module
3838
from operator import attrgetter, itemgetter
3939
from pathlib import Path
4040
from typing import TYPE_CHECKING, Any, NamedTuple, cast
4141

4242
import attrs
4343
import structlog
44-
from sqlalchemy import select, update
44+
from sqlalchemy import select
4545
from sqlalchemy.orm import load_only
4646
from tabulate import tabulate
4747
from uuid6 import uuid7
@@ -158,17 +158,9 @@ class DagFileProcessorManager(LoggingMixin):
158158

159159
_parallelism: int = attrs.field(factory=_config_int_factory("dag_processor", "parsing_processes"))
160160

161-
parsing_cleanup_interval: float = attrs.field(
162-
factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
163-
)
164161
_file_process_interval: float = attrs.field(
165162
factory=_config_int_factory("dag_processor", "min_file_process_interval")
166163
)
167-
stale_dag_threshold: float = attrs.field(
168-
factory=_config_int_factory("dag_processor", "stale_dag_threshold")
169-
)
170-
171-
_last_deactivate_stale_dags_time: float = attrs.field(default=0, init=False)
172164
print_stats_interval: float = attrs.field(
173165
factory=_config_int_factory("dag_processor", "print_stats_interval")
174166
)
@@ -251,59 +243,6 @@ def run(self):
251243

252244
return self._run_parsing_loop()
253245

254-
def _scan_stale_dags(self):
255-
"""Scan and deactivate DAGs which are no longer present in files."""
256-
now = time.monotonic()
257-
elapsed_time_since_refresh = now - self._last_deactivate_stale_dags_time
258-
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
259-
last_parsed = {
260-
file_info: stat.last_finish_time
261-
for file_info, stat in self._file_stats.items()
262-
if stat.last_finish_time
263-
}
264-
self.deactivate_stale_dags(last_parsed=last_parsed)
265-
self._last_deactivate_stale_dags_time = time.monotonic()
266-
267-
@provide_session
268-
def deactivate_stale_dags(
269-
self,
270-
last_parsed: dict[DagFileInfo, datetime | None],
271-
session: Session = NEW_SESSION,
272-
):
273-
"""Detect and deactivate DAGs which are no longer present in files."""
274-
to_deactivate = set()
275-
bundle_names = {b.name for b in self._dag_bundles}
276-
query = select(
277-
DagModel.dag_id,
278-
DagModel.bundle_name,
279-
DagModel.fileloc,
280-
DagModel.last_parsed_time,
281-
DagModel.relative_fileloc,
282-
).where(DagModel.is_active, DagModel.bundle_name.in_(bundle_names))
283-
dags_parsed = session.execute(query)
284-
285-
for dag in dags_parsed:
286-
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
287-
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
288-
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
289-
# significant delay in deactivation of stale dags when a large timeout is configured
290-
file_info = DagFileInfo(rel_path=Path(dag.relative_fileloc), bundle_name=dag.bundle_name)
291-
if last_finish_time := last_parsed.get(file_info, None):
292-
if dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold) < last_finish_time:
293-
self.log.info("DAG %s is missing and will be deactivated.", dag.dag_id)
294-
to_deactivate.add(dag.dag_id)
295-
296-
if to_deactivate:
297-
deactivated_dagmodel = session.execute(
298-
update(DagModel)
299-
.where(DagModel.dag_id.in_(to_deactivate))
300-
.values(is_active=False)
301-
.execution_options(synchronize_session="fetch")
302-
)
303-
deactivated = deactivated_dagmodel.rowcount
304-
if deactivated:
305-
self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated)
306-
307246
def _run_parsing_loop(self):
308247
# initialize cache to mutualize calls to Variable.get in DAGs
309248
# needs to be done before this process is forked to create the DAG parsing processes.
@@ -342,7 +281,6 @@ def _run_parsing_loop(self):
342281

343282
for callback in self._fetch_callbacks():
344283
self._add_callback_to_queue(callback)
345-
self._scan_stale_dags()
346284
DagWarning.purge_inactive_dag_warnings()
347285

348286
# Update number of loop iteration.

airflow/jobs/scheduler_job_runner.py

-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ def __init__(
179179
self._task_instance_heartbeat_timeout_secs = conf.getint(
180180
"scheduler", "task_instance_heartbeat_timeout"
181181
)
182-
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
183182
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
184183
self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
185184

tests/dag_processing/test_manager.py

+1-78
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
import pytest
3838
import time_machine
39-
from sqlalchemy import func, select
39+
from sqlalchemy import select
4040
from uuid6 import uuid7
4141

4242
from airflow.callbacks.callback_requests import DagCallbackRequest
@@ -413,83 +413,6 @@ def test_file_paths_in_queue_sorted_by_priority(self):
413413
parsing_request_after = session2.query(DagPriorityParsingRequest).get(parsing_request.id)
414414
assert parsing_request_after is None
415415

416-
def test_scan_stale_dags(self, testing_dag_bundle):
417-
"""
418-
Ensure that DAGs are marked inactive when the file is parsed but the
419-
DagModel.last_parsed_time is not updated.
420-
"""
421-
manager = DagFileProcessorManager(
422-
max_runs=1,
423-
processor_timeout=10 * 60,
424-
)
425-
bundle = MagicMock()
426-
bundle.name = "testing"
427-
manager._dag_bundles = [bundle]
428-
429-
test_dag_path = DagFileInfo(
430-
bundle_name="testing",
431-
rel_path=Path("test_example_bash_operator.py"),
432-
bundle_path=TEST_DAGS_FOLDER,
433-
)
434-
dagbag = DagBag(
435-
test_dag_path.absolute_path,
436-
read_dags_from_db=False,
437-
include_examples=False,
438-
bundle_path=test_dag_path.bundle_path,
439-
)
440-
441-
with create_session() as session:
442-
# Add stale DAG to the DB
443-
dag = dagbag.get_dag("test_example_bash_operator")
444-
dag.last_parsed_time = timezone.utcnow()
445-
DAG.bulk_write_to_db("testing", None, [dag])
446-
SerializedDagModel.write_dag(dag, bundle_name="testing")
447-
448-
# Add DAG to the file_parsing_stats
449-
stat = DagFileStat(
450-
num_dags=1,
451-
import_errors=0,
452-
last_finish_time=timezone.utcnow() + timedelta(hours=1),
453-
last_duration=1,
454-
run_count=1,
455-
last_num_of_db_queries=1,
456-
)
457-
manager._files = [test_dag_path]
458-
manager._file_stats[test_dag_path] = stat
459-
460-
active_dag_count = (
461-
session.query(func.count(DagModel.dag_id))
462-
.filter(
463-
DagModel.is_active,
464-
DagModel.relative_fileloc == str(test_dag_path.rel_path),
465-
DagModel.bundle_name == test_dag_path.bundle_name,
466-
)
467-
.scalar()
468-
)
469-
assert active_dag_count == 1
470-
471-
manager._scan_stale_dags()
472-
473-
active_dag_count = (
474-
session.query(func.count(DagModel.dag_id))
475-
.filter(
476-
DagModel.is_active,
477-
DagModel.relative_fileloc == str(test_dag_path.rel_path),
478-
DagModel.bundle_name == test_dag_path.bundle_name,
479-
)
480-
.scalar()
481-
)
482-
assert active_dag_count == 0
483-
484-
serialized_dag_count = (
485-
session.query(func.count(SerializedDagModel.dag_id))
486-
.filter(SerializedDagModel.dag_id == dag.dag_id)
487-
.scalar()
488-
)
489-
# Deactivating the DagModel should not delete the SerializedDagModel
490-
# SerializedDagModel gives history about Dags
491-
assert serialized_dag_count == 1
492-
493416
def test_kill_timed_out_processors_kill(self):
494417
manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
495418

0 commit comments

Comments
 (0)