-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
attempt to fix cff duplicates #497
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import logging | ||
from itertools import islice | ||
from typing import List | ||
|
||
import sentry_sdk | ||
|
||
from database.engine import Session | ||
from database.models import ( | ||
Commit, | ||
Upload, | ||
UploadError, | ||
UploadLevelTotals, | ||
uploadflagmembership, | ||
) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
@sentry_sdk.trace | ||
def delete_upload_by_ids(db_session: Session, upload_ids: List[int], commit: Commit): | ||
db_session.query(UploadError).filter(UploadError.upload_id.in_(upload_ids)).delete( | ||
synchronize_session=False | ||
) | ||
db_session.query(UploadLevelTotals).filter( | ||
UploadLevelTotals.upload_id.in_(upload_ids) | ||
).delete(synchronize_session=False) | ||
db_session.query(uploadflagmembership).filter( | ||
uploadflagmembership.c.upload_id.in_(upload_ids) | ||
).delete(synchronize_session=False) | ||
db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).delete( | ||
synchronize_session=False | ||
) | ||
db_session.commit() | ||
log.info( | ||
"Deleted uploads", | ||
extra=dict( | ||
commit=commit.commitid, | ||
repo=commit.repoid, | ||
number_uploads=len(upload_ids), | ||
upload_ids=islice(upload_ids, 20), | ||
), | ||
) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,10 +4,11 @@ | |||||
import sys | ||||||
import typing | ||||||
import uuid | ||||||
from collections import defaultdict | ||||||
from dataclasses import dataclass | ||||||
from json import loads | ||||||
from time import time | ||||||
from typing import Any, Dict, Mapping, Optional, Sequence | ||||||
from typing import Any, Dict, List, Mapping, Optional, Sequence | ||||||
|
||||||
import sentry_sdk | ||||||
from celery.exceptions import SoftTimeLimitExceeded | ||||||
|
@@ -49,6 +50,7 @@ | |||||
PARALLEL_UPLOAD_PROCESSING_BY_REPO, | ||||||
) | ||||||
from services.archive import ArchiveService | ||||||
from services.delete.delete import delete_upload_by_ids | ||||||
from services.redis import ( | ||||||
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, | ||||||
get_parallel_upload_processing_session_counter_redis_key, | ||||||
|
@@ -60,6 +62,8 @@ | |||||
from services.repository import get_repo_provider_service | ||||||
from services.yaml.reader import get_paths_from_flags, read_yaml_field | ||||||
|
||||||
MIN_PARENT_COMMIT_DEPTH = 1 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this ever not be 1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This value itself will always be 1, but the parent depth a commit chooses is determined by this worker/services/report/__init__.py Lines 648 to 649 in f12fe8f
|
||||||
|
||||||
|
||||||
@dataclass | ||||||
class ProcessingError(object): | ||||||
|
@@ -646,7 +650,8 @@ def get_appropriate_commit_to_carryforward_from( | |||||
) -> Optional[Commit]: | ||||||
parent_commit = commit.get_parent_commit() | ||||||
parent_commit_tracking = [] | ||||||
count = 1 # `parent_commit` is already the first parent | ||||||
count = MIN_PARENT_COMMIT_DEPTH # `parent_commit` is already the first parent | ||||||
|
||||||
while ( | ||||||
parent_commit is not None | ||||||
and parent_commit.state not in ("complete", "skipped") | ||||||
|
@@ -688,7 +693,7 @@ def get_appropriate_commit_to_carryforward_from( | |||||
parent_tracing=parent_commit_tracking, | ||||||
), | ||||||
) | ||||||
return None | ||||||
return (None, None) | ||||||
if parent_commit.state not in ("complete", "skipped"): | ||||||
log.warning( | ||||||
"None of the parent commits were in a complete state to be used as CFing base", | ||||||
|
@@ -700,8 +705,8 @@ def get_appropriate_commit_to_carryforward_from( | |||||
would_be_parent=parent_commit.commitid, | ||||||
), | ||||||
) | ||||||
return None | ||||||
return parent_commit | ||||||
return (None, None) | ||||||
return (parent_commit, count) | ||||||
|
||||||
async def _possibly_shift_carryforward_report( | ||||||
self, carryforward_report: Report, base_commit: Commit, head_commit: Commit | ||||||
|
@@ -778,8 +783,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: | |||||
) | ||||||
) | ||||||
|
||||||
parent_commit = self.get_appropriate_commit_to_carryforward_from( | ||||||
commit, max_parenthood_deepness | ||||||
parent_commit, parent_depth = ( | ||||||
self.get_appropriate_commit_to_carryforward_from( | ||||||
commit, max_parenthood_deepness | ||||||
) | ||||||
) | ||||||
if parent_commit is None: | ||||||
log.warning( | ||||||
|
@@ -807,6 +814,14 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: | |||||
if self.current_yaml.flag_has_carryfoward(flag_name): | ||||||
flags_to_carryforward.append(flag_name) | ||||||
if not flags_to_carryforward: | ||||||
log.warning( | ||||||
"There weren't flags to carry forward in the parent commit", | ||||||
extra=dict( | ||||||
commit=commit.commitid, | ||||||
repoid=commit.repoid, | ||||||
parent_commit=parent_commit.commitid, | ||||||
), | ||||||
) | ||||||
return Report() | ||||||
paths_to_carryforward = get_paths_from_flags( | ||||||
self.current_yaml, flags_to_carryforward | ||||||
|
@@ -826,7 +841,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: | |||||
parent_report, | ||||||
flags_to_carryforward, | ||||||
paths_to_carryforward, | ||||||
session_extras=dict(carriedforward_from=parent_commit.commitid), | ||||||
session_extras=dict( | ||||||
carriedforward_from=parent_commit.commitid, | ||||||
parent_depth=parent_depth, | ||||||
), | ||||||
) | ||||||
# If the parent report has labels we also need to carryforward the label index | ||||||
# Considerations: | ||||||
|
@@ -1173,6 +1191,74 @@ def save_report(self, commit: Commit, report: Report, report_code=None): | |||||
) | ||||||
return {"url": url} | ||||||
|
||||||
def _determine_cffs_and_depths_in_db( | ||||||
self, db_session, commit: Commit | ||||||
) -> Optional[Dict[int, int]]: | ||||||
depths_with_cff_upload_ids = defaultdict(list) | ||||||
|
||||||
existing_cff_uploads: Optional[List[Upload]] = ( | ||||||
db_session.query(Upload) | ||||||
.filter_by( | ||||||
report_id=commit.report.id_, | ||||||
upload_type=SessionType.carriedforward.value, | ||||||
) | ||||||
.all() | ||||||
) | ||||||
|
||||||
if existing_cff_uploads: | ||||||
for upload in existing_cff_uploads: | ||||||
parent_depth = upload.upload_extras.get("parent_depth") | ||||||
if parent_depth: | ||||||
depths_with_cff_upload_ids[parent_depth].append(upload.id) | ||||||
|
||||||
log.info( | ||||||
"Existing cffs and their respective depths", | ||||||
extra=dict( | ||||||
commit=commit.commitid, | ||||||
repo=commit.repoid, | ||||||
existing_uploads=existing_cff_uploads, | ||||||
), | ||||||
) | ||||||
|
||||||
return depths_with_cff_upload_ids | ||||||
|
||||||
def _possibly_delete_existing_cffs( | ||||||
self, | ||||||
db_session, | ||||||
commit: Commit, | ||||||
carryforward_report: Report, | ||||||
): | ||||||
cffs_and_depths_in_db = self._determine_cffs_and_depths_in_db( | ||||||
db_session=db_session, commit=commit | ||||||
) | ||||||
|
||||||
if not cffs_and_depths_in_db: | ||||||
return | ||||||
|
||||||
# Gets first 'parent_depth' it finds, as all cffs should be carried forward | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it get the minimum 'parent_depth' instead of the first? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The next four lines do that, it goes through the Report object which contains the uploads that will be carryforwarded and will determine the depth they have. I'm initializing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well... no. The question is if the code should get the smallest 'parent_depth' from all the sessions in the report. Both the comment and the code are getting the 1st parent_depth that appears. Those values can be different depending on the order of sessions that were carried forward, and you made no claims about the order of the sessions asked differently: Is the 1st session that appears always the smallest one? |
||||||
# by the same parent within the same report | ||||||
carryforward_report_depth = MIN_PARENT_COMMIT_DEPTH | ||||||
for upload in carryforward_report.sessions.values(): | ||||||
if upload.session_extras.get("parent_depth"): | ||||||
carryforward_report_depth = upload.session_extras.get("parent_depth") | ||||||
break | ||||||
|
||||||
# Delete all cff uploads that have a higher depth than the one in | ||||||
# the latest report | ||||||
for parent_depth, upload_ids in cffs_and_depths_in_db.items(): | ||||||
if parent_depth > carryforward_report_depth: | ||||||
log.info( | ||||||
"Deleting upload from DB", | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] Big chance that this will spam our logs. Might be better to accumulate the number of uploads deleted and just make a single log afterwards... |
||||||
extra=dict( | ||||||
repoid=commit.repoid, | ||||||
commit=commit.commitid, | ||||||
), | ||||||
) | ||||||
# Potentially make task here - this would make the preprocess task likely timeout | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The upload task would be likely to timeout here too? Because that's a way bigger problem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could depending on the amount of things to delete, hence the ask here to see if it would make sense to create a separate deletion task. I could do that if we think it makes more sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that would be a good idea, yes. [ps.: "I" can't speak for the team, obviously. You might want to gather more opinions] |
||||||
delete_upload_by_ids( | ||||||
db_session=db_session, upload_ids=upload_ids, commit=commit | ||||||
) | ||||||
|
||||||
def save_full_report(self, commit: Commit, report: Report, report_code=None): | ||||||
""" | ||||||
Saves the report (into database and storage) AND takes care of backfilling its sessions | ||||||
|
@@ -1193,6 +1279,17 @@ def save_full_report(self, commit: Commit, report: Report, report_code=None): | |||||
) | ||||||
res = self.save_report(commit, report, report_code) | ||||||
db_session = commit.get_db_session() | ||||||
|
||||||
# We've seen instances where the a commit report carries forward | ||||||
# uploads from multiple parents, a side effect of choosing a different parent | ||||||
# commit when the direct parent is still pending. This function will delete out | ||||||
# of date uploads that weren't carried forward by the closest parent commit | ||||||
self._possibly_delete_existing_cffs( | ||||||
db_session=db_session, | ||||||
commit=commit, | ||||||
carryforward_report=report, | ||||||
) | ||||||
|
||||||
for sess_id, session in report.sessions.items(): | ||||||
upload = Upload( | ||||||
build_code=session.build, | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use
flush
instead ofcommit
? Like we do in the sync_repos and sync_teams tasks? I don't really know the difference, tho. Just going for the consistency