-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dry run for backfill #45062
base: main
Are you sure you want to change the base?
Add dry run for backfill #45062
Conversation
a7a1efd
to
d288934
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed with daniel, maybe a separate endpoint makes more sense to avoid mixed returned type BackfillResponse | BackfillDryRunResponse
on the same endpoint. That's hard to handle for clients.
Created a separate endpoint for dry run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looking nice.
A few improvement suggestions.
Note: As PR #45312 has been merged, the code formatting rules have changed for new UI. Please rebase and re-run pre-commit checks to ensure that formatting in folder airflow/ui is adjusted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good.
A few suggestions.
I would wait for @dstandish approval before merging that, just to be sure that the backfill logic is correct. (It looks good to me)
).all() | ||
} | ||
|
||
print(existing_dag_runs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove print
statements
from airflow.utils.state import DagRunState | ||
from airflow.utils.types import DagRunTriggeredByType, DagRunType | ||
|
||
if TYPE_CHECKING: | ||
from datetime import datetime | ||
|
||
from typing_extensions import Literal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import Literal from typing
session=session, | ||
backfill_sort_ordinal = 0 | ||
logical_dates = [] | ||
dagrun_infos = list(dagrun_info_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to slightly modify _get_info_list
to not have to explicitely cast to dict. Also we can add types. ( -> list[DagRunInfo])
.join( | ||
dag_run_ranked, | ||
(DagRun.logical_date == dag_run_ranked.c.logical_date) | ||
& ( | ||
(DagRun.start_date == dag_run_ranked.c.start_date) | ||
| ((DagRun.start_date.is_(None)) & (dag_run_ranked.c.start_date.is_(None))) | ||
) | ||
& (DagRun.dag_id == dag_run_ranked.c.dag_id), | ||
) | ||
.where(dag_run_ranked.c.row_number == 1) | ||
).all() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume all that is necessary because of the removal of the unique constraint on the logical_date
?
What are the update on that ?
cc: @dstandish
dag_run_ranked, | ||
(DagRun.logical_date == dag_run_ranked.c.logical_date) | ||
& ( | ||
(DagRun.start_date == dag_run_ranked.c.start_date) | ||
| ((DagRun.start_date.is_(None)) & (dag_run_ranked.c.start_date.is_(None))) | ||
) | ||
& (DagRun.dag_id == dag_run_ranked.c.dag_id), | ||
) | ||
.where(dag_run_ranked.c.row_number == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sqlalchemy has and
and or
function and this is what I usually see in the codebase. I don't use binary &
and |
I'm not sure it always behaves as we expect.
if non_create_reason: | ||
if not dry_run: | ||
nested.rollback() | ||
session.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that a usefull comment was removed:
# rolling back here restores to start of this nested tran
# which releases the lock on the latest dag run, since we
# are not creating a new one
|
||
|
||
class DryRunBackfillResponse(BaseModel): | ||
"""Data model for run information during a backfill operation.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
"""Data model for run information during a backfill operation.""" | |
"""Backfill serializer for responses in dry-run mode. """ |
|
||
|
||
class DryRunBackfillCollectionResponse(BaseModel): | ||
"""Serializer for responses in dry-run mode for backfill operations.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Serializer for responses in dry-run mode for backfill operations.""" | |
"""Backfill collection serializer for responses in dry-run mode.""" |
) | ||
return br | ||
|
||
backfill_response = _create_backfill_dag_run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backfill_response
that variable name is not great. This makes me think of an http
response.
maybe logical_dates
is more natural ?
reprocess_behavior=body.reprocess_behavior, | ||
dry_run=True, | ||
) | ||
backfills = [DryRunBackfillResponse(logical_date=logical_date) for logical_date in backfills_dry_run] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backfills = [DryRunBackfillResponse(logical_date=logical_date) for logical_date in backfills_dry_run] | |
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run] |
@@ -158,72 +151,125 @@ def validate_sort_ordinal(self, key, val): | |||
def _create_backfill_dag_run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you not make changes to this function but just keep it simple and essentially do the following:
Do just like done for CLI, i.e. in _do_dry_run
, but add the extra step of checking whether it would actually create the run? I.e. extra logic check for existence?
We can use the same function for both cli and api.
essentially, we just need to return the list "these rows would be created". I don't think we need to modify the path where we actually create the runs at this time. Let me know what you think of that.
Closes #44395
Response: