diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index d363c8cd23780..39172ce64afd7 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -46,11 +46,9 @@ from airflow.models.taskreschedule import TaskReschedule from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep from airflow.utils import timezone -from airflow.utils.session import NEW_SESSION, create_session, provide_session +from airflow.utils.session import create_session if TYPE_CHECKING: - from sqlalchemy.orm.session import Session - from airflow.typing_compat import Self from airflow.utils.context import Context @@ -84,30 +82,6 @@ def __bool__(self) -> bool: return self.is_done -@provide_session -def _orig_start_date( - dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int, session: Session = NEW_SESSION -): - """ - Get the original start_date for a rescheduled task. - - :meta private: - """ - return session.scalar( - select(TaskReschedule) - .where( - TaskReschedule.dag_id == dag_id, - TaskReschedule.task_id == task_id, - TaskReschedule.run_id == run_id, - TaskReschedule.map_index == map_index, - TaskReschedule.try_number == try_number, - ) - .order_by(TaskReschedule.id.asc()) - .with_only_columns(TaskReschedule.start_date) - .limit(1) - ) - - class BaseSensorOperator(BaseOperator, SkipMixin): """ Sensor operators are derived from this class and inherit these attributes. @@ -246,8 +220,12 @@ def execute(self, context: Context) -> Any: ti = context["ti"] max_tries: int = ti.max_tries or 0 retries: int = self.retries or 0 + # If reschedule, use the start date of the first try (first try can be either the very - # first execution of the task, or the first execution after the task was cleared.) + # first execution of the task, or the first execution after the task was cleared). + # If the first try's record was not saved due to the Exception occurred and the following + # transaction rollback, the next available attempt should be taken + # to prevent falling in the endless rescheduling first_try_number = max_tries - retries + 1 with create_session() as session: start_date = session.scalar( @@ -257,7 +235,7 @@ def execute(self, context: Context) -> Any: TaskReschedule.task_id == ti.task_id, TaskReschedule.run_id == ti.run_id, TaskReschedule.map_index == ti.map_index, - TaskReschedule.try_number == first_try_number, + TaskReschedule.try_number >= first_try_number, ) .order_by(TaskReschedule.id.asc()) .with_only_columns(TaskReschedule.start_date) diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 9bb4f5b9934d2..348062394fb2f 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -83,7 +83,7 @@ def wrapper(ti): class DummySensor(BaseSensorOperator): - def __init__(self, return_value=False, **kwargs): + def __init__(self, return_value: bool | None = False, **kwargs): super().__init__(**kwargs) self.return_value = return_value @@ -422,7 +422,7 @@ def _get_tis(): assert task_reschedules[0].try_number == 1 assert dummy_ti.state == State.NONE - # second poke timesout and task instance is failed + # second poke times out and task instance is failed time_machine.coordinates.shift(sensor.poke_interval) with pytest.raises(AirflowSensorTimeout): self._run(sensor) @@ -878,6 +878,287 @@ def _increment_try_number(): assert sensor_ti.max_tries == 4 assert sensor_ti.state == State.FAILED + def test_reschedule_set_retries_no_errors_timeout_fail( + self, make_sensor, time_machine, session, task_reschedules_for_ti + ): + """ + Mode "reschedule", retries set, but no errors occurred, time gone out. + Retries and timeout configurations interact correctly. + + Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7) + If no errors occurred, no retries should be executed. + This is how it is expected to behave: + 1. 00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE + 2. 00:05 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE + 3. 00:10 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE + 4. 00:15 Raises AirflowSensorTimeout: try_number=1 max_tries=2 state=FAILED + """ + sensor, dr = make_sensor( + return_value=None, + poke_interval=5, + timeout=10, + retries=2, + retry_delay=timedelta(seconds=3), + mode="reschedule", + silent_fail=False, + ) + + def _get_sensor_ti(): + return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP) + + sensor.poke = Mock(side_effect=[False, False, False, False]) + + # Scheduler does this before the first run + sensor_ti = _get_sensor_ti() + sensor_ti.state = State.SCHEDULED + sensor_ti.try_number += 1 + session.commit() + + # 1-st poke + date1 = timezone.utcnow() + time_machine.move_to(date1, tick=False) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 1 + + # 2-nd poke + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 2 + + # 3-rd poke + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 3 + + # 4-th poke causes timeout + time_machine.coordinates.shift(sensor.poke_interval) + + with pytest.raises(AirflowSensorTimeout): + self._run(sensor) + + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.FAILED + + # On failed by timeout poke reschedule attempt is not saved + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 3 + + def test_reschedule_set_retries_1st_poke_runtime_error_timeout_fail( + self, make_sensor, time_machine, session, task_reschedules_for_ti + ): + """ + Mode "reschedule", retries set, fist poke causes RuntimeError, time gone out. + RuntimeError on first iteration results in no reschedule attempt recorded in DB. + In that case sensor should not result in endless loop and should calculate running + timeout from: + 1. The current system date on the second poke + 2. From the second attempt on subsequent attempts + See issue #45050 https://github.com/apache/airflow/issues/45050 + Retries and timeout configurations interact correctly. + + Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7) + Number of retries incremented on error, but run timeout should not be extended. + This is how it is expected to behave: + 1. 00:00 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY + 2. 00:07 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE + This is a first saved reschedule attempt, and it is used to calculate the run duration + 3. 00:12 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE + 4. 00:17 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE + 5. 00:23 Raises AirflowSensorTimeout: try_number=2 max_tries=2 state=FAILED + """ + sensor, dr = make_sensor( + return_value=None, + poke_interval=5, + timeout=10, + retries=2, + retry_delay=timedelta(seconds=7), + mode="reschedule", + silent_fail=False, + ) + + def _get_sensor_ti(): + return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP) + + sensor.poke = Mock(side_effect=[RuntimeError, False, False, False, False]) + + # Scheduler does this before the first run + sensor_ti = _get_sensor_ti() + sensor_ti.state = State.SCHEDULED + sensor_ti.try_number += 1 + session.commit() + + # 1-st poke + date1 = timezone.utcnow() + time_machine.move_to(date1, tick=False) + + with pytest.raises(RuntimeError): + self._run(sensor) + + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RETRY + + # On runtime error no reschedule attempt saved + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 0 + + # Scheduler does this before retry + sensor_ti = _get_sensor_ti() + sensor_ti.try_number += 1 + session.commit() + + # 2-nd poke + time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1)) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 2 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 1 + + # 3-rd poke + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 2 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 2 + + # 4-th poke + time_machine.coordinates.shift(sensor.poke_interval) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 2 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 3 + + # 5-th poke causes timeout + time_machine.coordinates.shift(sensor.poke_interval) + + with pytest.raises(AirflowSensorTimeout): + self._run(sensor) + + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 2 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.FAILED + + # On failed by timeout poke reschedule attempt is not saved + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 3 + + def test_reschedule_set_retries_2nd_poke_runtime_error_timeout_fail( + self, make_sensor, time_machine, session, task_reschedules_for_ti + ): + """ + Mode "reschedule", retries set, fist poke causes RuntimeError, time gone out. + RuntimeError on second iteration results in no reschedule for the second attempt + recorded in DB. + In that case running timeout calculation should not be affected because sensor + should use the first attempt as the start of execution. + Retries and timeout configurations interact correctly. + + Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7) + Number of retries incremented on error, but run timeout should not be extended. + This is how it is expected to behave: + 00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE + 00:05 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY + 00:12 Raises AirflowSensorTimeout: try_number=2 max_tries=2 state=FAILED + + """ + sensor, dr = make_sensor( + return_value=None, + poke_interval=5, + timeout=10, + retries=2, + retry_delay=timedelta(seconds=7), + mode="reschedule", + silent_fail=False, + ) + + def _get_sensor_ti(): + return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP) + + sensor.poke = Mock(side_effect=[False, RuntimeError, False]) + + # Scheduler does this before the first run + sensor_ti = _get_sensor_ti() + sensor_ti.state = State.SCHEDULED + sensor_ti.try_number += 1 + session.commit() + + # 1-st poke + print("**1" * 40) + date1 = timezone.utcnow() + time_machine.move_to(date1, tick=False) + self._run(sensor) + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RESCHEDULE + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 1 + + # 2-nd poke + print("**2" * 40) + time_machine.coordinates.shift(sensor.poke_interval) + + with pytest.raises(RuntimeError): + self._run(sensor) + + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 1 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.UP_FOR_RETRY + + # On runtime error no reschedule attempt saved + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 1 + + # Scheduler does this before retry + sensor_ti = _get_sensor_ti() + sensor_ti.try_number += 1 + session.commit() + + # 3-rd poke causes timeout + print("*3*" * 40) + time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1)) + + with pytest.raises(AirflowSensorTimeout): + self._run(sensor) + + sensor_ti = _get_sensor_ti() + assert sensor_ti.try_number == 2 + assert sensor_ti.max_tries == 2 + assert sensor_ti.state == State.FAILED + + # On failed by timeout poke reschedule attempt is not saved + task_reschedules = task_reschedules_for_ti(sensor_ti) + assert len(task_reschedules) == 0 + def test_reschedule_and_retry_timeout_and_silent_fail(self, make_sensor, time_machine, session): """ Test mode="reschedule", silent_fail=True then retries and timeout configurations interact correctly.