Skip to content

Scheduler crashes on expanding a task with a literal list #47735

@tirkarthi

Description

@tirkarthi

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Running attached reproducer dag which has a literal list to expand crashes the scheduler with below stack trace. It looks like SchedulerListOfDictsExpandInput in similar file also needs to implement resolve method.

[2025-03-13T20:56:12.085+0530] {cli_action_loggers.py:97} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, in <module>
    sys.exit(main())
[2025-03-13 20:56:12 +0530] [12759] [INFO] Worker exiting (pid: 12759)
             ^^^^^^
[2025-03-13 20:56:12 +0530] [12760] [INFO] Worker exiting (pid: 12760)
  File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 111, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 342, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 371, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 937, in _execute
    self._run_scheduler_loop()
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1063, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1163, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", line 93, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 443, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 398, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", line 102, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1569, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1569, in <listcomp>
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1683, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query)
  File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", line 1711, in schedule_tis
    start_from_trigger = ti.task.expand_start_from_trigger(context=context, session=session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/models/mappedoperator.py", line 64, in expand_start_from_trigger
    mapped_kwargs, _ = self._expand_mapped_kwargs(context)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/definitions/mappedoperator.py", line 691, in _expand_mapped_kwargs
    return self._get_specified_expand_input().resolve(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'SchedulerDictOfListsExpandInput' object has no attribute 'resolve'
[2025-03-13 20:56:12 +0530] [12758] [INFO] Shutting down: Master

What you think should happen instead?

No response

How to reproduce

  1. Run the below dag with scheduler in another tab.
from __future__ import annotations

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.date_time import DateTimeSensorAsync
from airflow.utils import timezone

with DAG(
    dag_id="file_trigger_expand_crash_1",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule=None,
) as dag:
    instant = timezone.datetime(2026, 11, 22)
    task = DateTimeSensorAsync.partial(task_id="async", poke_interval=3).expand(
        target_time=[str(instant + timedelta(seconds=3)), str(instant + timedelta(seconds=10))]
    )

    task

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions