Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fully parallel upload processing #658

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

Swatinem
Copy link
Contributor

@Swatinem Swatinem commented Aug 29, 2024

This adds another feature/rollout flag which prefers the parallel upload processing pipeline in favor of running it as an experiment.

Upload Processing can run in essentially 4 modes:

  • Completely serial processing
  • Serial processing, but running "experiment" code (is_experiment_serial):
    • In this mode, each UploadProcessor task saves a copy of the raw upload,
      as well as a copy of the final report (is_final) for later verification.
  • Parallel processing, but running "experiment" code (is_experiment_parallel):
    • In this mode, another parallel set of UploadProcessor tasks runs after
      the main set up tasks.
    • These tasks are using the copied-over raw uploads that were prepared by
      the is_experiment_serial tasks to do their processing.
    • These tasks are not persisting any of their results in the database,
      instead the final UploadFinisher task will launch the ParallelVerification task.
  • Fully parallel processing (is_fully_parallel):
    • In this mode, the final UploadFinisher task is responsible for merging
      the final report and persisting it.

An example Task chain might look like this, in "experiment" mode:

  • Upload
    • UploadProcessor (is_experiment_serial)
      • UploadProcessor (is_experiment_serial)
        • UploadProcessor (is_experiment_serial, is_final)
          • UploadFinisher
            • UploadProcessor (is_experiment_parallel)
            • UploadProcessor (is_experiment_parallel)
            • UploadProcessor (is_experiment_parallel)
              • UploadFinisher (is_experiment_parallel)
                • ParallelVerification

The is_fully_parallel mode looks like this:

  • Upload
    • UploadProcessor (is_fully_parallel)
    • UploadProcessor (is_fully_parallel)
    • UploadProcessor (is_fully_parallel)
      • UploadFinisher (is_fully_parallel)

fixes codecov/engineering-team#2450

@Swatinem Swatinem self-assigned this Aug 29, 2024
@codecov-notifications
Copy link

codecov-notifications bot commented Aug 29, 2024

Codecov Report

Attention: Patch coverage is 93.96552% with 7 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files Patch % Lines
services/report/__init__.py 58.33% 5 Missing ⚠️
helpers/parallel.py 93.54% 2 Missing ⚠️

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #658      +/-   ##
==========================================
- Coverage   98.09%   98.08%   -0.01%     
==========================================
  Files         434      435       +1     
  Lines       36548    36590      +42     
==========================================
+ Hits        35852    35891      +39     
- Misses        696      699       +3     
Flag Coverage Δ
integration 98.08% <93.96%> (-0.01%) ⬇️
latest-uploader-overall 98.08% <93.96%> (-0.01%) ⬇️
unit 98.08% <93.96%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
NonTestCode 96.00% <93.57%> (-0.01%) ⬇️
OutsideTasks 98.08% <84.78%> (-0.02%) ⬇️
Files Coverage Δ
helpers/parallel_upload_processing.py 85.45% <100.00%> (-0.26%) ⬇️
rollouts/__init__.py 100.00% <100.00%> (ø)
tasks/tests/integration/test_upload_e2e.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_processing_task.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_task.py 99.54% <100.00%> (-0.01%) ⬇️
tasks/upload.py 96.44% <100.00%> (+0.10%) ⬆️
tasks/upload_finisher.py 97.22% <100.00%> (+0.06%) ⬆️
tasks/upload_processor.py 100.00% <100.00%> (+0.53%) ⬆️
helpers/parallel.py 93.54% <93.54%> (ø)
services/report/__init__.py 97.01% <58.33%> (-0.41%) ⬇️

@codecov-qa
Copy link

codecov-qa bot commented Aug 29, 2024

Codecov Report

Attention: Patch coverage is 93.96552% with 7 lines in your changes missing coverage. Please review.

Project coverage is 98.08%. Comparing base (ab427f8) to head (62139b2).

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
services/report/__init__.py 58.33% 5 Missing ⚠️
helpers/parallel.py 93.54% 2 Missing ⚠️

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #658      +/-   ##
==========================================
- Coverage   98.09%   98.08%   -0.01%     
==========================================
  Files         434      435       +1     
  Lines       36548    36590      +42     
==========================================
+ Hits        35852    35891      +39     
- Misses        696      699       +3     
Flag Coverage Δ
integration 98.08% <93.96%> (-0.01%) ⬇️
latest-uploader-overall 98.08% <93.96%> (-0.01%) ⬇️
unit 98.08% <93.96%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
NonTestCode 96.00% <93.57%> (-0.01%) ⬇️
OutsideTasks 98.08% <84.78%> (-0.02%) ⬇️
Files with missing lines Coverage Δ
helpers/parallel_upload_processing.py 85.45% <100.00%> (-0.26%) ⬇️
rollouts/__init__.py 100.00% <100.00%> (ø)
tasks/tests/integration/test_upload_e2e.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_processing_task.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_task.py 99.54% <100.00%> (-0.01%) ⬇️
tasks/upload.py 96.44% <100.00%> (+0.10%) ⬆️
tasks/upload_finisher.py 97.22% <100.00%> (+0.06%) ⬆️
tasks/upload_processor.py 100.00% <100.00%> (+0.53%) ⬆️
helpers/parallel.py 93.54% <93.54%> (ø)
services/report/__init__.py 97.01% <58.33%> (-0.41%) ⬇️

Copy link

codecov-public-qa bot commented Aug 29, 2024

Codecov Report

Attention: Patch coverage is 93.96552% with 7 lines in your changes missing coverage. Please review.

Project coverage is 98.08%. Comparing base (ab427f8) to head (62139b2).

✅ All tests successful. No failed tests found.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #658      +/-   ##
==========================================
- Coverage   98.09%   98.08%   -0.01%     
==========================================
  Files         434      435       +1     
  Lines       36548    36590      +42     
==========================================
+ Hits        35852    35891      +39     
- Misses        696      699       +3     
Flag Coverage Δ
integration 98.08% <93.96%> (-0.01%) ⬇️
latest-uploader-overall 98.08% <93.96%> (-0.01%) ⬇️
unit 98.08% <93.96%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
NonTestCode 96.00% <93.57%> (-0.01%) ⬇️
OutsideTasks 98.08% <84.78%> (-0.02%) ⬇️
Files Coverage Δ
helpers/parallel_upload_processing.py 85.45% <100.00%> (-0.26%) ⬇️
rollouts/__init__.py 100.00% <100.00%> (ø)
tasks/tests/integration/test_upload_e2e.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_processing_task.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_task.py 99.54% <100.00%> (-0.01%) ⬇️
tasks/upload.py 96.44% <100.00%> (+0.10%) ⬆️
tasks/upload_finisher.py 97.22% <100.00%> (+0.06%) ⬆️
tasks/upload_processor.py 100.00% <100.00%> (+0.53%) ⬆️
helpers/parallel.py 93.54% <93.54%> (ø)
services/report/__init__.py 97.01% <58.33%> (-0.41%) ⬇️

Copy link

codecov bot commented Aug 29, 2024

Codecov Report

Attention: Patch coverage is 93.96552% with 7 lines in your changes missing coverage. Please review.

Project coverage is 98.13%. Comparing base (ab427f8) to head (62139b2).

Changes have been made to critical files, which contain lines commonly executed in production. Learn more

Files with missing lines Patch % Lines
services/report/__init__.py Critical 58.33% 5 Missing ⚠️
helpers/parallel.py 93.54% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #658      +/-   ##
==========================================
- Coverage   98.13%   98.13%   -0.01%     
==========================================
  Files         475      476       +1     
  Lines       37903    37945      +42     
==========================================
+ Hits        37198    37237      +39     
- Misses        705      708       +3     
Flag Coverage Δ
integration 98.08% <93.96%> (-0.01%) ⬇️
latest-uploader-overall 98.08% <93.96%> (-0.01%) ⬇️
unit 98.08% <93.96%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
NonTestCode 96.10% <93.57%> (-0.01%) ⬇️
OutsideTasks 98.08% <84.78%> (-0.02%) ⬇️
Files with missing lines Coverage Δ
helpers/parallel_upload_processing.py 85.45% <100.00%> (-0.26%) ⬇️
rollouts/__init__.py 100.00% <100.00%> (ø)
tasks/tests/integration/test_upload_e2e.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_processing_task.py 100.00% <100.00%> (ø)
tasks/tests/unit/test_upload_task.py 99.54% <100.00%> (-0.01%) ⬇️
tasks/upload.py Critical 97.74% <100.00%> (+0.06%) ⬆️
tasks/upload_finisher.py 97.23% <100.00%> (+0.06%) ⬆️
tasks/upload_processor.py 100.00% <100.00%> (+0.53%) ⬆️
helpers/parallel.py 93.54% <93.54%> (ø)
services/report/__init__.py Critical 97.02% <58.33%> (-0.41%) ⬇️
Related Entrypoints
run/app.tasks.upload.Upload
run/app.tasks.upload.UploadFinisher
run/app.tasks.upload.UploadProcessor
run/app.tasks.upload.PreProcessUpload

@Swatinem Swatinem force-pushed the swatinem/fully-parallel branch 3 times, most recently from 7635506 to b9f675a Compare September 10, 2024 11:10
@Swatinem Swatinem marked this pull request as ready for review September 10, 2024 11:10
@Swatinem Swatinem requested a review from a team September 10, 2024 11:10
This adds another feature/rollout flag which prefers the parallel upload processing pipeline in favor of running it as an experiment.

Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`is_experiment_serial`):
  - In this mode, each `UploadProcessor` task saves a copy of the raw upload,
    as well as a copy of the final report (`is_final`) for later verification.
- Parallel processing, but running "experiment" code (`is_experiment_parallel`):
  - In this mode, another parallel set of `UploadProcessor` tasks runs *after*
    the main set up tasks.
  - These tasks are using the copied-over raw uploads that were prepared by
    the `is_experiment_serial` tasks to do their processing.
  - These tasks are not persisting any of their results in the database,
    instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`is_fully_parallel`):
  - In this mode, the final `UploadFinisher` task is responsible for merging
    the final report and persisting it.

An example Task chain might look like this, in "experiment" mode:
- Upload
  - UploadProcessor (`is_experiment_serial`)
    - UploadProcessor (`is_experiment_serial`)
      - UploadProcessor (`is_experiment_serial`, `is_final`)
        - UploadFinisher
          - UploadProcessor (`is_experiment_parallel`)
          - UploadProcessor (`is_experiment_parallel`)
          - UploadProcessor (`is_experiment_parallel`)
            - UploadFinisher (`is_experiment_parallel`)
              - ParallelVerification

Once implemented, `is_fully_parallel` will look like this:
- Upload
  - UploadProcessor (`is_fully_parallel`)
  - UploadProcessor (`is_fully_parallel`)
  - UploadProcessor (`is_fully_parallel`)
    - UploadFinisher (`is_fully_parallel`)
Copy link
Contributor

@michelletran-codecov michelletran-codecov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few comments

tasks/upload_processor.py Outdated Show resolved Hide resolved
tasks/upload_processor.py Show resolved Hide resolved
tasks/upload_processor.py Outdated Show resolved Hide resolved
helpers/parallel.py Outdated Show resolved Hide resolved
)

if not do_parallel_processing:
if not parallel_processing.run_experiment:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Should this actually be if not parallel_processing.run_fully_parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the run_fully_parallel case, we already early-returned just the parallel tasks. So down here we either run the experiment, or fully serial.


Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`is_experiment_serial`):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super clear about what the difference is between is_experiment_serial and is_experiment_parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing the "experiment", the processor and finisher task run twice for the same upload.
There is also an example flow showing those two runs, and which flags they use.

How can I make this clearer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think my main confusion is around their dependency (i.e. you can't have both of them be true at the same time) and about how serial is "experimental" (because it's the default option right now). Looking at the code, I guess the "default" serial path is separate from the experiment, so there is (in some paths) a distinction on "experiment" serial and "default" serial.

To address the issue of both variables being dependent, I'm wondering if it'll be clearer if we used an enum called "ExperimentMode" or something. This way, they cannot be true at the same time, and we can just use the same variable to distinguish these cases.

Copy link
Contributor

@matt-codecov matt-codecov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is somewhat cleaner than the existing harness but to be honest i still feel a little uncomfortable with all the if/else branches peppered around to copy something here and skip writing something there. it feels too easy to accidentally break real processing or leave side-effects that real users will be able to see

the approach i imagine would be simpler would be a separate task that either runs nightly and chooses a batch of N commits, or is scheduled as a followup after X% of finisher tasks. this task would fetch completed report JSONs and use the sessions list from them to reconstruct UploadTask arguments but with dummy commits/repos owned by Codecov plugged in. one dummy repo would be overridden into the expt and the other overridden out of it. we run the identical task arguments for each repo and compare the results

with that approach, any and all copying/staging we need to do for verification can happen in one place, and there's little to no risk of our test procedure accidentally breaking things for production users or accidentally leaving side-effects that they can see. there's nothing to clean up when transitioning from validation to running the actual experiment, it's just a Feature with a test and control group. it doesn't faithfully reproduce carryforward inheritance, but CFF is all settled before anything changes for parallel processing anyway. i think the main downside is having to suppress GitHub API errors because our dummy repos probably won't have unique authentic commits/PRs for each batch of tasks we want to test

out of steam for the day but will see your thoughts tomorrow

Comment on lines +62 to +71
run_fully_parallel = FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
run_experiment = (
False
if run_fully_parallel
else PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't really grok what these mean individually or what it means if they overlap

FULLY_PARALLEL_UPLOAD_PROCESSING appears to be controlling the actual launch, so i assume you'll leave that at 0 at this stage. and then PARALLEL_UPLOAD_PROCESSING appears to control the experiment for anyone not in FULLY_PARALLEL..., and is_experiment_serial and is_experiment_parallel are based on that value + whether the currently-running task was scheduled to be serial or scheduled to be parallel?

Feature supports non-bool values + more than 2 values if that helps. you could have a single Feature with variants fully_serial, experiment, and fully_parallel if that would simplify anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would make a lot of sense, yes. thanks for the suggestion.

Comment on lines 291 to 293
# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumption isn't always true: next_session_number() starts at len(sessions) and then returns the first unused number

for example:

  • Commit A has CFF sessions 0, 1, 2 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 is free so we use that for the new session
    • New session 3 invalidates 2 so we remove it
  • Commit B has CFF sessions 0, 1, 3 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 is taken so we use 4 for the new session
    • New session 4 invalidates 3 so we remove it
  • Commit C has CFF sessions 0, 1, 4 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 has become free again so we use that for the new session

the assumed behavior here would use ID=5 for the new session in Commit C but next_session_id() will actually use 3. this logic will still return an unused ID, but it may differ from the serial ID which makes comparison difficult

the ID scheme described in https://l.codecov.dev/LyFm6s (open on VPN bc private link shortener) would assign IDs consistently for both serial and parallel processing. we could ship that as its own change and not have to worry about session IDs at all anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considering the logic as-is, i don't remember whether/why we have to do this redis key update here in the first place. tasks/upload.py will set it if it isn't set here. do you know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t really know TBH, this is quite some obscure code/logic that hasn’t yet revealed its secrets to me.

With the session id allocations being such a big problem, why not just use the unique database ID of the upload? that should ideally solve all these problems as the DB guarantees uniqueness here.

Is there any reason not to do that?

I also believe this ID allocation is one of the reasons the "time under lock" is longer than it necessarily needs to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. i'm not aware of a reason we couldn't use the database PK. that might be a great idea. in every place i can think of, these session IDs are just dict keys so they don't really need to be clustered/sequential, and i don't think we have to keep them stable across reports

a little code dive:

  • initialize_and_save_report() is called in tasks/upload.py and tasks/preprocess_upload.py. carryforward inheritance happens in here (more specifically, in here)
    • create_new_report_for_commit() generates the carryforward report. carried-forward sessions reuse their IDs from the parent report, this code does not know anything about the DB objects
    • save_full_report() creates DB records for each of the sessions in the new report. we create new DB records for carried-forward sessions, we don't try to point back at the existing one
  • report.add_session() is called in process_raw_upload(). we may not have the actual DB object here, not sure

in save_full_report() you could probably override the session ID for each session after you do the DB insert but before you serialize as chunks/report json, and then the process_raw_upload() bits are just plumbing + updating the shared API. this approach would make it harder to turn the Upload inserts in save_full_report() into a batch insert, but it should be possible. if nothing else you can get a stable order for the inserted IDs with raw SQL and then sort/update in python :P

with inserted as (
    insert into uploads (...)
    values (...) (...) (...)
    returning *
)
select id
from inserted
order by timestamp desc;

Comment on lines -1052 to -1058
# this should be enabled for the actual rollout of parallel upload processing.
# if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
# "this should be the repo id"
# ):
# upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id
# upload_obj.state = "parallel_processed"
# else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you haven't found it, this enum value is what this commented out block is about

a state of PROCESSED implies the upload's data will be found if you call get_existing_report_for_commit(). a state of PARALLEL_PROCESSED indicates UploadProcessorTask has finished but UploadFinisherTask has not gotten to it yet. don't remember if the distinction mattered

fully forgot about this bit

@Swatinem
Copy link
Contributor Author

to be honest i still feel a little uncomfortable with all the if/else branches peppered around to copy something here and skip writing something there.

I totally agree with this. I’m tempted to just create a new task for parallel processing which removes all the code related to handling multiple uploads in one chunk, and have ideas for further simplification ahead of time.

@Swatinem Swatinem marked this pull request as draft September 12, 2024 08:32
@matt-codecov
Copy link
Contributor

i think some of the brittleness is inherent to the "kick off parallel tasks but copy all the inputs and then skip saving the outputs" approach to verification, but i'd be happy to be proved wrong haha. my suggested alternative requires us to handle any GH request failure non-fatally which may be easier said than done

there's a lot in upload_processor.py that could be reused, but you're right that we'll have to clean up the multi-upload batch stuff sooner or later and it's easier to reason about the parallel implementation if we do it sooner

i should have said this in my initial comment but: i can't see any specific problems in the PR apart from the edge case with IDs which only matters for comparison with serial results, and that was already there. i think this is all logically correct, and less fragile than it was before. i am excited to see this PR and for this project to get some momentum

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Worker] Implement parallel uploads via feature flag
3 participants