Skip to content

Commit

Permalink
Implement fully parallel upload processing
Browse files Browse the repository at this point in the history
This adds another feature/rollout flag which prefers the parallel upload processing pipeline in favor of running it as an experiment.

Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`is_experiment_serial`):
  - In this mode, each `UploadProcessor` task saves a copy of the raw upload,
    as well as a copy of the final report (`is_final`) for later verification.
- Parallel processing, but running "experiment" code (`is_experiment_parallel`):
  - In this mode, another parallel set of `UploadProcessor` tasks runs *after*
    the main set up tasks.
  - These tasks are using the copied-over raw uploads that were prepared by
    the `is_experiment_serial` tasks to do their processing.
  - These tasks are not persisting any of their results in the database,
    instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`is_fully_parallel`):
  - In this mode, the final `UploadFinisher` task is responsible for merging
    the final report and persisting it.

An example Task chain might look like this, in "experiment" mode:
- Upload
  - UploadProcessor (`is_experiment_serial`)
    - UploadProcessor (`is_experiment_serial`)
      - UploadProcessor (`is_experiment_serial`, `is_final`)
        - UploadFinisher
          - UploadProcessor (`is_experiment_parallel`)
          - UploadProcessor (`is_experiment_parallel`)
          - UploadProcessor (`is_experiment_parallel`)
            - UploadFinisher (`is_experiment_parallel`)
              - ParallelVerification

Once implemented, `is_fully_parallel` will look like this:
- Upload
  - UploadProcessor (`is_fully_parallel`)
  - UploadProcessor (`is_fully_parallel`)
  - UploadProcessor (`is_fully_parallel`)
    - UploadFinisher (`is_fully_parallel`)
  • Loading branch information
Swatinem committed Sep 10, 2024
1 parent ab427f8 commit f1ab443
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 193 deletions.
100 changes: 100 additions & 0 deletions helpers/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import dataclasses
from typing import Self

from rollouts import (
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)


@dataclasses.dataclass(frozen=True)
class ParallelProcessing:
"""
This encapsulates Parallel Upload Processing logic
Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`is_experiment_serial`):
- In this mode, each `UploadProcessor` task saves a copy of the raw upload,
as well as a copy of the final report (`is_final`) for later verification.
- Parallel processing, but running "experiment" code (`is_experiment_parallel`):
- In this mode, another parallel set of `UploadProcessor` tasks runs *after*
the main set up tasks.
- These tasks are using the copied-over raw uploads that were prepared by
the `is_experiment_serial` tasks to do their processing.
- These tasks are not persisting any of their results in the database,
instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`is_fully_parallel`):
- In this mode, the final `UploadFinisher` task is responsible for merging
the final report and persisting it.
An example Task chain might look like this, in "experiment" mode:
- Upload
- UploadProcessor (`is_experiment_serial`)
- UploadProcessor (`is_experiment_serial`)
- UploadProcessor (`is_experiment_serial`, `is_final`)
- UploadFinisher
- UploadProcessor (`is_experiment_parallel`)
- UploadProcessor (`is_experiment_parallel`)
- UploadProcessor (`is_experiment_parallel`)
- UploadFinisher (`is_experiment_parallel`)
- ParallelVerification
The `is_fully_parallel` mode looks like this:
- Upload
- UploadProcessor (`is_fully_parallel`)
- UploadProcessor (`is_fully_parallel`)
- UploadProcessor (`is_fully_parallel`)
- UploadFinisher (`is_fully_parallel`)
"""

run_experiment: bool = False
run_fully_parallel: bool = False

is_fully_parallel: bool = False
is_experiment_parallel: bool = False
is_experiment_serial: bool = False
is_final: bool = False
parallel_idx: int | None = None

def initial(repoid: int) -> Self:
run_fully_parallel = FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
run_experiment = (
False
if run_fully_parallel
else PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default=False
)
)

return ParallelProcessing(
run_fully_parallel=run_fully_parallel,
run_experiment=run_experiment,
is_fully_parallel=run_fully_parallel,
)

def from_task_args(
repoid: int,
in_parallel: bool = False,
fully_parallel: bool = False,
is_final: bool = False,
parallel_idx: bool | None = None,
**kwargs,
) -> Self:
slf = ParallelProcessing.initial(repoid)

if fully_parallel:
return dataclasses.replace(slf, is_fully_parallel=True)

Check warning on line 89 in helpers/parallel.py

View check run for this annotation

Codecov Notifications / codecov/patch

helpers/parallel.py#L89

Added line #L89 was not covered by tests

is_experiment_parallel = slf.run_experiment and in_parallel
is_experiment_serial = slf.run_experiment and not in_parallel

return dataclasses.replace(
slf,
is_experiment_parallel=is_experiment_parallel,
is_experiment_serial=is_experiment_serial,
is_final=is_final,
parallel_idx=parallel_idx,
)
6 changes: 2 additions & 4 deletions helpers/parallel_upload_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ def _adjust_sessions(
def get_parallel_session_ids(
sessions, argument_list, db_session, report_service, commit_yaml
):
num_sessions = len(argument_list)

mock_sessions = copy.deepcopy(sessions) # the sessions already in the report
get_parallel_session_ids = []

# iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic)
for i in range(num_sessions):
for arguments in argument_list:
next_session_id = next_session_number(mock_sessions)

upload_pk = argument_list[i]["upload_pk"]
upload_pk = arguments["upload_pk"]
upload = db_session.query(Upload).filter_by(id_=upload_pk).first()
to_merge_session = report_service.build_session(upload)
flags = upload.flag_names
Expand Down
1 change: 1 addition & 0 deletions rollouts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("parallel_upload_processing")
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO = Feature("fully_parallel_upload_processing")

CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER = Feature("carryforward_base_search_range")

Expand Down
74 changes: 34 additions & 40 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
RepositoryWithoutValidBotError,
)
from helpers.labels import get_labels_per_session
from helpers.parallel import ParallelProcessing
from helpers.telemetry import MetricContext
from rollouts import (
CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER
from services.archive import ArchiveService
from services.redis import (
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
Expand All @@ -61,9 +59,7 @@
RAW_UPLOAD_RAW_REPORT_COUNT,
RAW_UPLOAD_SIZE,
)
from services.report.raw_upload_processor import (
process_raw_upload,
)
from services.report.raw_upload_processor import process_raw_upload
from services.repository import get_repo_provider_service
from services.yaml.reader import get_paths_from_flags, read_yaml_field

Expand Down Expand Up @@ -207,7 +203,9 @@ def has_initialized_report(self, commit: Commit) -> bool:

@sentry_sdk.trace
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self,
commit: Commit,
report_code: str = None,
) -> CommitReport:
"""
Initializes the commit report
Expand Down Expand Up @@ -287,26 +285,28 @@ def initialize_and_save_report(
# This means there is a report to carryforward
self.save_full_report(commit, report, report_code)

parallel_processing = ParallelProcessing.initial(
commit.repository.repoid
)
# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
if parallel_processing.run_experiment:
self.save_parallel_report_to_archive(commit, report, report_code)
highest_session_id = max(
report.sessions.keys()
) # the largest id among the CFFs
get_redis_connection().incrby(
name=get_parallel_upload_processing_session_counter_redis_key(
# the largest id among the CFFs:
highest_session_id = max(report.sessions.keys())
redis = get_redis_connection()
redis_key = (

Check warning on line 299 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L297-L299

Added lines #L297 - L299 were not covered by tests
get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
)
)
redis.incrby(

Check warning on line 304 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L304

Added line #L304 was not covered by tests
name=redis_key,
amount=highest_session_id + 1,
)
get_redis_connection().expire(
name=get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
redis.expire(

Check warning on line 308 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L308

Added line #L308 was not covered by tests
name=redis_key,
time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
)

Expand Down Expand Up @@ -840,7 +840,7 @@ def create_new_report_for_commit(self, commit: Commit) -> Report:

@sentry_sdk.trace
def parse_raw_report_from_storage(
self, repo: Repository, upload: Upload, is_parallel=False
self, repo: Repository, upload: Upload, parallel_processing: ParallelProcessing
) -> ParsedRawReport:
"""Pulls the raw uploaded report from storage and parses it so it's
easier to access different parts of the raw upload.
Expand All @@ -851,23 +851,19 @@ def parse_raw_report_from_storage(
archive_service = self.get_archive_service(repo)
archive_url = upload.storage_path

# TODO: For the parallel experiment, can remove once finished
log.info(
"Parsing the raw report from storage",
extra=dict(
commit=upload.report.commit_id,
repoid=repo.repoid,
archive_url=archive_url,
is_parallel=is_parallel,
),
)

# For the parallel upload verification experiment, we need to make a copy of the raw uploaded reports
# so that the parallel pipeline can use those to parse. The serial pipeline rewrites the raw uploaded
# reports to a human readable version that doesn't include file fixes, so that's why copying is necessary.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repo.repoid, default=False
):
if parallel_processing.run_experiment:
parallel_url = archive_url.removesuffix(".txt") + "_PARALLEL.txt"
log.info(
"In the parallel experiment for parsing raw report in storage",
Expand All @@ -878,7 +874,7 @@ def parse_raw_report_from_storage(
archive_url=archive_url,
),
)
if not is_parallel:
if parallel_processing.is_experiment_serial:
archive_file = archive_service.read_file(archive_url)
archive_service.write_file(parallel_url, archive_file)
log.info(
Expand Down Expand Up @@ -929,7 +925,7 @@ def build_report_from_raw_content(
report: Report,
raw_report_info: RawReportInfo,
upload: Upload,
parallel_idx=None,
parallel_processing: ParallelProcessing,
) -> ProcessingResult:
"""
Processes an upload on top of an existing report `master` and returns
Expand Down Expand Up @@ -965,7 +961,7 @@ def build_report_from_raw_content(

try:
raw_report = self.parse_raw_report_from_storage(
commit.repository, upload, is_parallel=parallel_idx is not None
commit.repository, upload, parallel_processing
)
raw_report_info.raw_report = raw_report
except FileNotInStorageError:
Expand All @@ -977,7 +973,7 @@ def build_report_from_raw_content(
reportid=reportid,
commit_yaml=self.current_yaml.to_dict(),
archive_url=archive_url,
in_parallel=parallel_idx is not None,
parallel_processing=parallel_processing,
),
)
result.error = ProcessingError(
Expand All @@ -997,12 +993,17 @@ def build_report_from_raw_content(
flags,
session,
upload=upload,
parallel_idx=parallel_idx,
parallel_idx=parallel_processing.parallel_idx,
)
result.report = process_result.report
log.info(
"Successfully processed report"
+ (" (in parallel)" if parallel_idx is not None else ""),
+ (
" (in parallel)"
if parallel_processing.is_experiment_parallel
or parallel_processing.is_fully_parallel
else ""
),
extra=dict(
session=session.id,
ci=f"{session.provider}:{session.build}:{session.job}",
Expand Down Expand Up @@ -1049,13 +1050,6 @@ def update_upload_with_processing_result(
db_session = upload_obj.get_db_session()
session = processing_result.session
if processing_result.error is None:
# this should be enabled for the actual rollout of parallel upload processing.
# if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
# "this should be the repo id"
# ):
# upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id
# upload_obj.state = "parallel_processed"
# else:
upload_obj.state_id = UploadState.PROCESSED.db_id
upload_obj.state = "processed"
upload_obj.order_number = session.id
Expand Down
21 changes: 19 additions & 2 deletions tasks/tests/integration/test_upload_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from database.models.core import Commit, CompareCommit, Repository
from database.tests.factories import CommitFactory, RepositoryFactory
from database.tests.factories.core import PullFactory
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from rollouts import (
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from services.archive import ArchiveService
from services.redis import get_redis_connection
from services.report import ReportService
Expand Down Expand Up @@ -114,9 +117,18 @@ def setup_mock_get_compare(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize(
"do_fully_parallel_processing,do_parallel_processing",
[
(False, False),
(False, True),
(True, True),
],
ids=["fully synchronous", "parallel experiment", "fully parallel"],
)
def test_full_upload(
dbsession: Session,
do_fully_parallel_processing: bool,
do_parallel_processing: bool,
mocker,
mock_repo_provider,
Expand Down Expand Up @@ -146,6 +158,11 @@ def test_full_upload(
}
)
# use parallel processing:
mocker.patch.object(
FULLY_PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_fully_parallel_processing,
)
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
Expand Down
6 changes: 5 additions & 1 deletion tasks/tests/unit/test_upload_processing_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ReportExpiredException,
RepositoryWithoutValidBotError,
)
from helpers.parallel import ParallelProcessing
from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID
from services.archive import ArchiveService
from services.report import RawReportInfo, ReportService
Expand Down Expand Up @@ -346,6 +347,7 @@ def test_upload_processor_call_with_upload_obj(
commit_yaml={"codecov": {"max_report_age": False}},
arguments_list=redis_queue,
report_code=None,
parallel_processing=ParallelProcessing(),
)
expected_result = {
"processings_so_far": [
Expand Down Expand Up @@ -545,7 +547,7 @@ def test_upload_task_call_exception_within_individual_upload(
assert upload.state_id == UploadState.ERROR.db_id
assert upload.state == "error"
assert not mocked_3.called
mocked_4.assert_called_with(commit.repository, upload, is_parallel=False)
mocked_4.assert_called_with(commit.repository, upload, mocker.ANY)
mocked_5.assert_called()

@pytest.mark.django_db(databases={"default"})
Expand Down Expand Up @@ -715,6 +717,7 @@ def test_upload_task_process_individual_report_with_notfound_report(
report=false_report,
raw_report_info=RawReportInfo(),
upload=upload,
parallel_processing=ParallelProcessing(),
)
assert result.error.as_dict() == {
"code": "file_not_in_storage",
Expand All @@ -740,6 +743,7 @@ def test_upload_task_process_individual_report_with_notfound_report_no_retries_y
Report(),
UploadFactory.create(),
RawReportInfo(),
parallel_processing=ParallelProcessing(),
)

@pytest.mark.django_db(databases={"default"})
Expand Down
Loading

0 comments on commit f1ab443

Please sign in to comment.