Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fully parallel upload processing #658

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions helpers/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import dataclasses
from typing import Self

from rollouts import (
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)


@dataclasses.dataclass
class ParallelProcessing:
"""
This encapsulates Parallel Upload Processing logic

Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`is_experiment_serial`):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super clear about what the difference is between is_experiment_serial and is_experiment_parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing the "experiment", the processor and finisher task run twice for the same upload.
There is also an example flow showing those two runs, and which flags they use.

How can I make this clearer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think my main confusion is around their dependency (i.e. you can't have both of them be true at the same time) and about how serial is "experimental" (because it's the default option right now). Looking at the code, I guess the "default" serial path is separate from the experiment, so there is (in some paths) a distinction on "experiment" serial and "default" serial.

To address the issue of both variables being dependent, I'm wondering if it'll be clearer if we used an enum called "ExperimentMode" or something. This way, they cannot be true at the same time, and we can just use the same variable to distinguish these cases.

- In this mode, each `UploadProcessor` task saves a copy of the raw upload,
as well as a copy of the final report (`is_final`) for later verification.
- Parallel processing, but running "experiment" code (`is_experiment_parallel`):
- In this mode, another parallel set of `UploadProcessor` tasks runs *after*
the main set up tasks.
- These tasks are using the copied-over raw uploads that were prepared by
the `is_experiment_serial` tasks to do their processing.
- These tasks are not persisting any of their results in the database,
instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`is_fully_parallel`):
- In this mode, the final `UploadFinisher` task is responsible for merging
the final report and persisting it.

An example Task chain might look like this, in "experiment" mode:
- Upload
- UploadProcessor (`is_experiment_serial`)
- UploadProcessor (`is_experiment_serial`)
- UploadProcessor (`is_experiment_serial`, `is_final`)
- UploadFinisher
- UploadProcessor (`is_experiment_parallel`)
- UploadProcessor (`is_experiment_parallel`)
- UploadProcessor (`is_experiment_parallel`)
- UploadFinisher (`is_experiment_parallel`)
- ParallelVerification

The `is_fully_parallel` mode looks like this:
- Upload
- UploadProcessor (`is_fully_parallel`)
- UploadProcessor (`is_fully_parallel`)
- UploadProcessor (`is_fully_parallel`)
- UploadFinisher (`is_fully_parallel`)
"""

run_experiment: bool = False
run_fully_parallel: bool = False

is_fully_parallel: bool = False
is_experiment_parallel: bool = False
is_experiment_serial: bool = False
is_final: bool = False
parallel_idx: int | None = None

@classmethod
def initial(cls, repoid: int) -> Self:
run_fully_parallel = FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
run_experiment = (
False
if run_fully_parallel
else PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
)
Comment on lines +62 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't really grok what these mean individually or what it means if they overlap

FULLY_PARALLEL_UPLOAD_PROCESSING appears to be controlling the actual launch, so i assume you'll leave that at 0 at this stage. and then PARALLEL_UPLOAD_PROCESSING appears to control the experiment for anyone not in FULLY_PARALLEL..., and is_experiment_serial and is_experiment_parallel are based on that value + whether the currently-running task was scheduled to be serial or scheduled to be parallel?

Feature supports non-bool values + more than 2 values if that helps. you could have a single Feature with variants fully_serial, experiment, and fully_parallel if that would simplify anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would make a lot of sense, yes. thanks for the suggestion.


return cls(
run_fully_parallel=run_fully_parallel,
run_experiment=run_experiment,
is_fully_parallel=run_fully_parallel,
)

@classmethod
def from_task_args(
cls,
repoid: int,
in_parallel: bool = False,
fully_parallel: bool = False,
is_final: bool = False,
parallel_idx: bool | None = None,
**kwargs,
) -> Self:
slf = cls.initial(repoid)

if fully_parallel:
slf.is_fully_parallel = True
return slf

Check warning on line 93 in helpers/parallel.py

View check run for this annotation

Codecov Notifications / codecov/patch

helpers/parallel.py#L92-L93

Added lines #L92 - L93 were not covered by tests

slf.is_experiment_parallel = slf.run_experiment and in_parallel
slf.is_experiment_serial = slf.run_experiment and not in_parallel
slf.is_final = is_final
slf.parallel_idx = parallel_idx
return slf

@property
def is_parallel(self) -> bool:
return self.is_experiment_parallel or self.is_fully_parallel
6 changes: 2 additions & 4 deletions helpers/parallel_upload_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ def _adjust_sessions(
def get_parallel_session_ids(
sessions, argument_list, db_session, report_service, commit_yaml
):
num_sessions = len(argument_list)

mock_sessions = copy.deepcopy(sessions) # the sessions already in the report
get_parallel_session_ids = []

# iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic)
for i in range(num_sessions):
for arguments in argument_list:
next_session_id = next_session_number(mock_sessions)

upload_pk = argument_list[i]["upload_pk"]
upload_pk = arguments["upload_pk"]
upload = db_session.query(Upload).filter_by(id_=upload_pk).first()
to_merge_session = report_service.build_session(upload)
flags = upload.flag_names
Expand Down
1 change: 1 addition & 0 deletions rollouts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("parallel_upload_processing")
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("fully_parallel_upload_processing")

CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER = Feature("carryforward_base_search_range")

Expand Down
69 changes: 29 additions & 40 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
RepositoryWithoutValidBotError,
)
from helpers.labels import get_labels_per_session
from helpers.parallel import ParallelProcessing
from helpers.telemetry import MetricContext
from rollouts import (
CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER
from services.archive import ArchiveService
from services.redis import (
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
Expand All @@ -61,9 +59,7 @@
RAW_UPLOAD_RAW_REPORT_COUNT,
RAW_UPLOAD_SIZE,
)
from services.report.raw_upload_processor import (
process_raw_upload,
)
from services.report.raw_upload_processor import process_raw_upload
from services.repository import get_repo_provider_service
from services.yaml.reader import get_paths_from_flags, read_yaml_field

Expand Down Expand Up @@ -207,7 +203,9 @@

@sentry_sdk.trace
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self,
commit: Commit,
report_code: str = None,
) -> CommitReport:
"""
Initializes the commit report
Expand Down Expand Up @@ -287,26 +285,28 @@
# This means there is a report to carryforward
self.save_full_report(commit, report, report_code)

parallel_processing = ParallelProcessing.initial(
commit.repository.repoid
)
# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
Comment on lines 291 to 293
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumption isn't always true: next_session_number() starts at len(sessions) and then returns the first unused number

for example:

  • Commit A has CFF sessions 0, 1, 2 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 is free so we use that for the new session
    • New session 3 invalidates 2 so we remove it
  • Commit B has CFF sessions 0, 1, 3 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 is taken so we use 4 for the new session
    • New session 4 invalidates 3 so we remove it
  • Commit C has CFF sessions 0, 1, 4 and gets a new session
    • We have 3 sessions so next_session_id() starts at 3
    • Session ID 3 has become free again so we use that for the new session

the assumed behavior here would use ID=5 for the new session in Commit C but next_session_id() will actually use 3. this logic will still return an unused ID, but it may differ from the serial ID which makes comparison difficult

the ID scheme described in https://l.codecov.dev/LyFm6s (open on VPN bc private link shortener) would assign IDs consistently for both serial and parallel processing. we could ship that as its own change and not have to worry about session IDs at all anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considering the logic as-is, i don't remember whether/why we have to do this redis key update here in the first place. tasks/upload.py will set it if it isn't set here. do you know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t really know TBH, this is quite some obscure code/logic that hasn’t yet revealed its secrets to me.

With the session id allocations being such a big problem, why not just use the unique database ID of the upload? that should ideally solve all these problems as the DB guarantees uniqueness here.

Is there any reason not to do that?

I also believe this ID allocation is one of the reasons the "time under lock" is longer than it necessarily needs to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. i'm not aware of a reason we couldn't use the database PK. that might be a great idea. in every place i can think of, these session IDs are just dict keys so they don't really need to be clustered/sequential, and i don't think we have to keep them stable across reports

a little code dive:

  • initialize_and_save_report() is called in tasks/upload.py and tasks/preprocess_upload.py. carryforward inheritance happens in here (more specifically, in here)
    • create_new_report_for_commit() generates the carryforward report. carried-forward sessions reuse their IDs from the parent report, this code does not know anything about the DB objects
    • save_full_report() creates DB records for each of the sessions in the new report. we create new DB records for carried-forward sessions, we don't try to point back at the existing one
  • report.add_session() is called in process_raw_upload(). we may not have the actual DB object here, not sure

in save_full_report() you could probably override the session ID for each session after you do the DB insert but before you serialize as chunks/report json, and then the process_raw_upload() bits are just plumbing + updating the shared API. this approach would make it harder to turn the Upload inserts in save_full_report() into a batch insert, but it should be possible. if nothing else you can get a stable order for the inserted IDs with raw SQL and then sort/update in python :P

with inserted as (
    insert into uploads (...)
    values (...) (...) (...)
    returning *
)
select id
from inserted
order by timestamp desc;

if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
if parallel_processing.run_experiment:
self.save_parallel_report_to_archive(commit, report, report_code)
highest_session_id = max(
report.sessions.keys()
) # the largest id among the CFFs
get_redis_connection().incrby(
name=get_parallel_upload_processing_session_counter_redis_key(
# the largest id among the CFFs:
highest_session_id = max(report.sessions.keys())
redis = get_redis_connection()
redis_key = (

Check warning on line 299 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L297-L299

Added lines #L297 - L299 were not covered by tests
get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
)
)
redis.incrby(

Check warning on line 304 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L304

Added line #L304 was not covered by tests
name=redis_key,
amount=highest_session_id + 1,
)
get_redis_connection().expire(
name=get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
redis.expire(

Check warning on line 308 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L308

Added line #L308 was not covered by tests
name=redis_key,
time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
)

Expand Down Expand Up @@ -840,7 +840,7 @@

@sentry_sdk.trace
def parse_raw_report_from_storage(
self, repo: Repository, upload: Upload, is_parallel=False
self, repo: Repository, upload: Upload, parallel_processing: ParallelProcessing
) -> ParsedRawReport:
"""Pulls the raw uploaded report from storage and parses it so it's
easier to access different parts of the raw upload.
Expand All @@ -851,23 +851,19 @@
archive_service = self.get_archive_service(repo)
archive_url = upload.storage_path

# TODO: For the parallel experiment, can remove once finished
log.info(
"Parsing the raw report from storage",
extra=dict(
commit=upload.report.commit_id,
repoid=repo.repoid,
archive_url=archive_url,
is_parallel=is_parallel,
),
)

# For the parallel upload verification experiment, we need to make a copy of the raw uploaded reports
# so that the parallel pipeline can use those to parse. The serial pipeline rewrites the raw uploaded
# reports to a human readable version that doesn't include file fixes, so that's why copying is necessary.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repo.repoid, default=False
):
if parallel_processing.run_experiment:
parallel_url = archive_url.removesuffix(".txt") + "_PARALLEL.txt"
log.info(
"In the parallel experiment for parsing raw report in storage",
Expand All @@ -878,7 +874,7 @@
archive_url=archive_url,
),
)
if not is_parallel:
if parallel_processing.is_experiment_serial:
archive_file = archive_service.read_file(archive_url)
archive_service.write_file(parallel_url, archive_file)
log.info(
Expand Down Expand Up @@ -929,7 +925,7 @@
report: Report,
raw_report_info: RawReportInfo,
upload: Upload,
parallel_idx=None,
parallel_processing: ParallelProcessing,
) -> ProcessingResult:
"""
Processes an upload on top of an existing report `master` and returns
Expand Down Expand Up @@ -965,7 +961,7 @@

try:
raw_report = self.parse_raw_report_from_storage(
commit.repository, upload, is_parallel=parallel_idx is not None
commit.repository, upload, parallel_processing
)
raw_report_info.raw_report = raw_report
except FileNotInStorageError:
Expand All @@ -977,7 +973,7 @@
reportid=reportid,
commit_yaml=self.current_yaml.to_dict(),
archive_url=archive_url,
in_parallel=parallel_idx is not None,
parallel_processing=parallel_processing,
),
)
result.error = ProcessingError(
Expand All @@ -997,12 +993,12 @@
flags,
session,
upload=upload,
parallel_idx=parallel_idx,
parallel_idx=parallel_processing.parallel_idx,
)
result.report = process_result.report
log.info(
"Successfully processed report"
+ (" (in parallel)" if parallel_idx is not None else ""),
+ (" (in parallel)" if parallel_processing.is_parallel else ""),
extra=dict(
session=session.id,
ci=f"{session.provider}:{session.build}:{session.job}",
Expand Down Expand Up @@ -1049,13 +1045,6 @@
db_session = upload_obj.get_db_session()
session = processing_result.session
if processing_result.error is None:
# this should be enabled for the actual rollout of parallel upload processing.
# if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
# "this should be the repo id"
# ):
# upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id
# upload_obj.state = "parallel_processed"
# else:
Comment on lines -1052 to -1058
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you haven't found it, this enum value is what this commented out block is about

a state of PROCESSED implies the upload's data will be found if you call get_existing_report_for_commit(). a state of PARALLEL_PROCESSED indicates UploadProcessorTask has finished but UploadFinisherTask has not gotten to it yet. don't remember if the distinction mattered

fully forgot about this bit

upload_obj.state_id = UploadState.PROCESSED.db_id
upload_obj.state = "processed"
upload_obj.order_number = session.id
Expand Down
21 changes: 19 additions & 2 deletions tasks/tests/integration/test_upload_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from database.models.core import Commit, CompareCommit, Repository
from database.tests.factories import CommitFactory, RepositoryFactory
from database.tests.factories.core import PullFactory
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from rollouts import (
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from services.archive import ArchiveService
from services.redis import get_redis_connection
from services.report import ReportService
Expand Down Expand Up @@ -114,9 +117,18 @@ def setup_mock_get_compare(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize(
"do_fully_parallel_processing,do_parallel_processing",
[
(False, False),
(False, True),
(True, True),
],
ids=["fully synchronous", "parallel experiment", "fully parallel"],
)
def test_full_upload(
dbsession: Session,
do_fully_parallel_processing: bool,
do_parallel_processing: bool,
mocker,
mock_repo_provider,
Expand Down Expand Up @@ -146,6 +158,11 @@ def test_full_upload(
}
)
# use parallel processing:
mocker.patch.object(
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_fully_parallel_processing,
)
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
Expand Down
Loading
Loading