diff --git a/services/delete/__init__.py b/services/delete/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/services/delete/delete.py b/services/delete/delete.py new file mode 100644 index 000000000..6feb41eaf --- /dev/null +++ b/services/delete/delete.py @@ -0,0 +1,42 @@ +import logging +from itertools import islice +from typing import List + +import sentry_sdk + +from database.engine import Session +from database.models import ( + Commit, + Upload, + UploadError, + UploadLevelTotals, + uploadflagmembership, +) + +log = logging.getLogger(__name__) + + +@sentry_sdk.trace +def delete_upload_by_ids(db_session: Session, upload_ids: List[int], commit: Commit): + db_session.query(UploadError).filter(UploadError.upload_id.in_(upload_ids)).delete( + synchronize_session=False + ) + db_session.query(UploadLevelTotals).filter( + UploadLevelTotals.upload_id.in_(upload_ids) + ).delete(synchronize_session=False) + db_session.query(uploadflagmembership).filter( + uploadflagmembership.c.upload_id.in_(upload_ids) + ).delete(synchronize_session=False) + db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).delete( + synchronize_session=False + ) + db_session.commit() + log.info( + "Deleted uploads", + extra=dict( + commit=commit.commitid, + repo=commit.repoid, + number_uploads=len(upload_ids), + upload_ids=islice(upload_ids, 20), + ), + ) diff --git a/services/report/__init__.py b/services/report/__init__.py index 8e8bc13cb..ab3bc08a1 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -4,10 +4,11 @@ import sys import typing import uuid +from collections import defaultdict from dataclasses import dataclass from json import loads from time import time -from typing import Any, Dict, Mapping, Optional, Sequence +from typing import Any, Dict, List, Mapping, Optional, Sequence import sentry_sdk from celery.exceptions import SoftTimeLimitExceeded @@ -49,6 +50,7 @@ PARALLEL_UPLOAD_PROCESSING_BY_REPO, ) from services.archive import ArchiveService +from services.delete.delete import delete_upload_by_ids from services.redis import ( PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, get_parallel_upload_processing_session_counter_redis_key, @@ -60,6 +62,8 @@ from services.repository import get_repo_provider_service from services.yaml.reader import get_paths_from_flags, read_yaml_field +MIN_PARENT_COMMIT_DEPTH = 1 + @dataclass class ProcessingError(object): @@ -646,7 +650,8 @@ def get_appropriate_commit_to_carryforward_from( ) -> Optional[Commit]: parent_commit = commit.get_parent_commit() parent_commit_tracking = [] - count = 1 # `parent_commit` is already the first parent + count = MIN_PARENT_COMMIT_DEPTH # `parent_commit` is already the first parent + while ( parent_commit is not None and parent_commit.state not in ("complete", "skipped") @@ -688,7 +693,7 @@ def get_appropriate_commit_to_carryforward_from( parent_tracing=parent_commit_tracking, ), ) - return None + return (None, None) if parent_commit.state not in ("complete", "skipped"): log.warning( "None of the parent commits were in a complete state to be used as CFing base", @@ -700,8 +705,8 @@ def get_appropriate_commit_to_carryforward_from( would_be_parent=parent_commit.commitid, ), ) - return None - return parent_commit + return (None, None) + return (parent_commit, count) async def _possibly_shift_carryforward_report( self, carryforward_report: Report, base_commit: Commit, head_commit: Commit @@ -778,8 +783,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: ) ) - parent_commit = self.get_appropriate_commit_to_carryforward_from( - commit, max_parenthood_deepness + parent_commit, parent_depth = ( + self.get_appropriate_commit_to_carryforward_from( + commit, max_parenthood_deepness + ) ) if parent_commit is None: log.warning( @@ -807,6 +814,14 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: if self.current_yaml.flag_has_carryfoward(flag_name): flags_to_carryforward.append(flag_name) if not flags_to_carryforward: + log.warning( + "There weren't flags to carry forward in the parent commit", + extra=dict( + commit=commit.commitid, + repoid=commit.repoid, + parent_commit=parent_commit.commitid, + ), + ) return Report() paths_to_carryforward = get_paths_from_flags( self.current_yaml, flags_to_carryforward @@ -826,7 +841,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report: parent_report, flags_to_carryforward, paths_to_carryforward, - session_extras=dict(carriedforward_from=parent_commit.commitid), + session_extras=dict( + carriedforward_from=parent_commit.commitid, + parent_depth=parent_depth, + ), ) # If the parent report has labels we also need to carryforward the label index # Considerations: @@ -1173,6 +1191,74 @@ def save_report(self, commit: Commit, report: Report, report_code=None): ) return {"url": url} + def _determine_cffs_and_depths_in_db( + self, db_session, commit: Commit + ) -> Optional[Dict[int, int]]: + depths_with_cff_upload_ids = defaultdict(list) + + existing_cff_uploads: Optional[List[Upload]] = ( + db_session.query(Upload) + .filter_by( + report_id=commit.report.id_, + upload_type=SessionType.carriedforward.value, + ) + .all() + ) + + if existing_cff_uploads: + for upload in existing_cff_uploads: + parent_depth = upload.upload_extras.get("parent_depth") + if parent_depth: + depths_with_cff_upload_ids[parent_depth].append(upload.id) + + log.info( + "Existing cffs and their respective depths", + extra=dict( + commit=commit.commitid, + repo=commit.repoid, + existing_uploads=existing_cff_uploads, + ), + ) + + return depths_with_cff_upload_ids + + def _possibly_delete_existing_cffs( + self, + db_session, + commit: Commit, + carryforward_report: Report, + ): + cffs_and_depths_in_db = self._determine_cffs_and_depths_in_db( + db_session=db_session, commit=commit + ) + + if not cffs_and_depths_in_db: + return + + # Gets first 'parent_depth' it finds, as all cffs should be carried forward + # by the same parent within the same report + carryforward_report_depth = MIN_PARENT_COMMIT_DEPTH + for upload in carryforward_report.sessions.values(): + if upload.session_extras.get("parent_depth"): + carryforward_report_depth = upload.session_extras.get("parent_depth") + break + + # Delete all cff uploads that have a higher depth than the one in + # the latest report + for parent_depth, upload_ids in cffs_and_depths_in_db.items(): + if parent_depth > carryforward_report_depth: + log.info( + "Deleting upload from DB", + extra=dict( + repoid=commit.repoid, + commit=commit.commitid, + ), + ) + # Potentially make task here - this would make the preprocess task likely timeout + delete_upload_by_ids( + db_session=db_session, upload_ids=upload_ids, commit=commit + ) + def save_full_report(self, commit: Commit, report: Report, report_code=None): """ Saves the report (into database and storage) AND takes care of backfilling its sessions @@ -1193,6 +1279,17 @@ def save_full_report(self, commit: Commit, report: Report, report_code=None): ) res = self.save_report(commit, report, report_code) db_session = commit.get_db_session() + + # We've seen instances where the a commit report carries forward + # uploads from multiple parents, a side effect of choosing a different parent + # commit when the direct parent is still pending. This function will delete out + # of date uploads that weren't carried forward by the closest parent commit + self._possibly_delete_existing_cffs( + db_session=db_session, + commit=commit, + carryforward_report=report, + ) + for sess_id, session in report.sessions.items(): upload = Upload( build_code=session.build, diff --git a/services/tests/test_report.py b/services/tests/test_report.py index 86cbdf375..5d9c7542a 100644 --- a/services/tests/test_report.py +++ b/services/tests/test_report.py @@ -1,4 +1,5 @@ from asyncio import Future +from collections import defaultdict from decimal import Decimal import mock @@ -77,6 +78,42 @@ def sample_report(): return report +@pytest.fixture +def sample_report_with_grandparent_cffs(): + report = Report() + first_file = ReportFile("file_1.go") + first_file.append( + 1, ReportLine.create(coverage=1, sessions=[[0, 1]], complexity=(10, 2)) + ) + second_file = ReportFile("file_2.py") + second_file.append(12, ReportLine.create(coverage=1, sessions=[[0, 1]])) + report.append(first_file) + report.append(second_file) + report.add_session( + Session( + name="current_sesh_1", + flags=["unit"], + provider="circleci", + session_type=SessionType.carriedforward, + build="aycaramba", + totals=ReportTotals(2, 10), + session_extras={"carriedforward_from": "parent_sha_123", "parent_depth": 1}, + ), + ) + report.add_session( + Session( + name="current_sesh_2", + flags=["integration"], + provider="circleci", + session_type=SessionType.carriedforward, + build="aycaramba", + totals=ReportTotals(2, 10), + session_extras={"carriedforward_from": "parent_sha_123", "parent_depth": 1}, + ) + ) + return report + + @pytest.fixture def sample_commit_with_report_big(dbsession, mock_storage): sessions_dict = { @@ -1869,7 +1906,10 @@ async def test_create_new_report_for_commit( "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "st": "carriedforward", "t": None, "u": None, @@ -1884,7 +1924,10 @@ async def test_create_new_report_for_commit( "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "st": "carriedforward", "t": None, "u": None, @@ -2341,7 +2384,10 @@ async def test_create_new_report_for_commit_with_labels( "j": None, "n": None, "p": None, - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "st": "carriedforward", "t": None, "u": None, @@ -3056,7 +3102,10 @@ def fake_possibly_shift(report, base, head): "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "st": "carriedforward", "t": None, "u": None, @@ -3071,7 +3120,10 @@ def fake_possibly_shift(report, base, head): "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "st": "carriedforward", "t": None, "u": None, @@ -3419,7 +3471,10 @@ async def test_create_new_report_for_commit_parent_not_ready( "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": grandparent_commit.commitid}, + "se": { + "carriedforward_from": grandparent_commit.commitid, + "parent_depth": 2, + }, "st": "carriedforward", "t": None, "u": None, @@ -3434,7 +3489,10 @@ async def test_create_new_report_for_commit_parent_not_ready( "N": "Carriedforward", "n": None, "p": None, - "se": {"carriedforward_from": grandparent_commit.commitid}, + "se": { + "carriedforward_from": grandparent_commit.commitid, + "parent_depth": 2, + }, "st": "carriedforward", "t": None, "u": None, @@ -3454,6 +3512,168 @@ async def test_create_new_report_for_commit_parent_not_ready( ) assert expected_results_report == readable_report["report"] + @pytest.mark.django_db(databases={"default", "timeseries"}) + def test_determine_cffs_and_depths_in_db(self, dbsession): + commit = CommitFactory.create() + dbsession.add(commit) + commit_report = CommitReport(commit_id=commit.id_) + upload_1 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "123test123", "parent_depth": 2}, + ) + upload_2 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "456test456", "parent_depth": 3}, + ) + dbsession.add_all([commit, commit_report, upload_1, upload_2]) + dbsession.flush() + report_service: ReportService = ReportService({}) + + res = report_service._determine_cffs_and_depths_in_db( + db_session=dbsession, commit=commit + ) + expected_res = defaultdict(list, {2: [upload_1.id], 3: [upload_2.id]}) + assert len(res) == 2 + assert res == expected_res + + @pytest.mark.django_db(databases={"default", "timeseries"}) + def test_possibly_delete_existing_cffs_no_preexisting(self, dbsession): + commit = CommitFactory.create() + dbsession.add(commit) + commit_report = CommitReport(commit_id=commit.id_) + dbsession.add_all([commit, commit_report]) + dbsession.flush() + report_service: ReportService = ReportService({}) + + res = report_service._possibly_delete_existing_cffs( + db_session=dbsession, commit=commit, carryforward_report=None + ) + assert res == None + + @pytest.mark.django_db(databases={"default", "timeseries"}) + def test_possibly_delete_existing_cffs_empties_db_if_preexisting_cffs_with_older_parent_commit( + self, dbsession, sample_report_with_grandparent_cffs + ): + commit = CommitFactory.create() + dbsession.add(commit) + commit_report = CommitReport(commit_id=commit.id_) + upload_1 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={ + "carriedforward_from": "grandparent_sha_123", + "parent_depth": 2, + }, + ) + upload_2 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={ + "carriedforward_from": "great_grandparent_sha_123", + "parent_depth": 3, + }, + ) + dbsession.add_all([commit, commit_report, upload_1, upload_2]) + dbsession.flush() + report_service: ReportService = ReportService({}) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 2 + report_service._possibly_delete_existing_cffs( + db_session=dbsession, + commit=commit, + carryforward_report=sample_report_with_grandparent_cffs, + ) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 0 + + @pytest.mark.django_db(databases={"default", "timeseries"}) + def test_possibly_delete_existing_cffs_db_unchanged_if_preexisting_cffs_without_parent_commit_depth( + self, dbsession, sample_report_with_grandparent_cffs + ): + commit = CommitFactory.create() + dbsession.add(commit) + commit_report = CommitReport(commit_id=commit.id_) + upload_1 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123"}, + ) + upload_2 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123"}, + ) + upload_3 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123"}, + ) + dbsession.add_all([commit, commit_report, upload_1, upload_2, upload_3]) + dbsession.flush() + report_service: ReportService = ReportService({}) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 3 + report_service._possibly_delete_existing_cffs( + db_session=dbsession, + commit=commit, + carryforward_report=sample_report_with_grandparent_cffs, + ) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 3 + + @pytest.mark.django_db(databases={"default", "timeseries"}) + def test_possibly_delete_existing_cffs_db_unchanged_if_preexisting_cffs_with_parent_commit( + self, dbsession, sample_report_with_grandparent_cffs + ): + commit = CommitFactory.create() + dbsession.add(commit) + commit_report = CommitReport(commit_id=commit.id_) + upload_1 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123", "parent_depth": 1}, + ) + upload_2 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123", "parent_depth": 1}, + ) + upload_3 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={"carriedforward_from": "parent_sha_123", "parent_depth": 1}, + ) + dbsession.add_all([commit, commit_report, upload_1, upload_2, upload_3]) + dbsession.flush() + report_service: ReportService = ReportService({}) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 3 + report_service._possibly_delete_existing_cffs( + db_session=dbsession, + commit=commit, + carryforward_report=sample_report_with_grandparent_cffs, + ) + + uploads = dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + assert len(uploads) == 3 + @pytest.mark.asyncio @pytest.mark.django_db(databases={"default", "timeseries"}) async def test_create_new_report_for_commit_parent_not_ready_but_skipped( @@ -3529,7 +3749,10 @@ async def test_create_new_report_for_commit_parent_not_ready_but_skipped( "n": None, "p": None, "st": "carriedforward", - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "t": None, "u": None, }, @@ -3544,7 +3767,10 @@ async def test_create_new_report_for_commit_parent_not_ready_but_skipped( "n": None, "p": None, "st": "carriedforward", - "se": {"carriedforward_from": parent_commit.commitid}, + "se": { + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, + }, "t": None, "u": None, }, @@ -3838,46 +4064,73 @@ def test_save_full_report( }, ] - def test_save_report_empty_report(self, dbsession, mock_storage): - report = Report() + def test_save_full_report_with_cffs_that_should_be_deleted( + self, + dbsession, + mock_storage, + sample_report_with_grandparent_cffs, + mock_configuration, + ): + mock_configuration.set_params( + { + "setup": { + "save_report_data_in_storage": { + "only_codecov": False, + "report_details_files_array": True, + }, + } + } + ) commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() - current_report_row = CommitReport(commit_id=commit.id_) - dbsession.add(current_report_row) + commit_report = CommitReport(commit_id=commit.id_) + dbsession.add(commit_report) dbsession.flush() - report_details = ReportDetails(report_id=current_report_row.id_) - dbsession.add(report_details) + report_details = ReportDetails(report_id=commit_report.id_) + upload_1 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={ + "carriedforward_from": "grandparent_sha_123", + "parent_depth": 2, + }, + ) + upload_2 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={ + "carriedforward_from": "great_grandparent_sha_123", + "parent_depth": 3, + }, + ) + upload_3 = UploadFactory( + report=commit_report, + order_number=0, + upload_type="carriedforward", + upload_extras={ + "carriedforward_from": "great_grandparent_sha_123", + "parent_depth": 3, + }, + ) + dbsession.add_all([report_details, upload_1, upload_2, upload_3]) dbsession.flush() + preexisting_uploads = ( + dbsession.query(Upload).filter_by(report_id=commit_report.id).all() + ) + assert len(preexisting_uploads) == 3 + report_service = ReportService({}) - res = report_service.save_report(commit, report) - storage_hash = report_service.get_archive_service( - commit.repository - ).storage_hash - assert res == { - "url": f"v4/repos/{storage_hash}/commits/{commit.commitid}/chunks.txt" - } - assert commit.totals == { - "f": 0, - "n": 0, - "h": 0, - "m": 0, - "p": 0, - "c": 0, - "b": 0, - "d": 0, - "M": 0, - "s": 0, - "C": 0, - "N": 0, - "diff": None, - } - assert commit.report_json == {"files": {}, "sessions": {}} - assert res["url"] in mock_storage.storage["archive"] - assert ( - mock_storage.storage["archive"][res["url"]].decode() - == "{}\n<<<<< end_of_header >>>>>\n" + report_service.save_full_report(commit, sample_report_with_grandparent_cffs) + + uploads_after_saving = ( + dbsession.query(Upload).filter_by(report_id=commit_report.id).all() ) + assert len(uploads_after_saving) == 2 + assert uploads_after_saving[0].name == "current_sesh_1" + assert uploads_after_saving[1].name == "current_sesh_2" def test_save_report(self, dbsession, mock_storage, sample_report): commit = CommitFactory.create() @@ -4383,7 +4636,8 @@ async def test_initialize_and_save_report_carryforward_needed( assert first_upload.flags[0].flag_name == "enterprise" assert first_upload.totals is None assert first_upload.upload_extras == { - "carriedforward_from": parent_commit.commitid + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, } assert first_upload.upload_type == "carriedforward" assert second_upload.build_code is None @@ -4403,7 +4657,8 @@ async def test_initialize_and_save_report_carryforward_needed( ] assert second_upload.totals is None assert second_upload.upload_extras == { - "carriedforward_from": parent_commit.commitid + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, } assert second_upload.upload_type == "carriedforward" assert r.details is not None @@ -4466,7 +4721,8 @@ async def test_initialize_and_save_report_report_but_no_details_carryforward_nee assert first_upload.flags[0].flag_name == "enterprise" assert first_upload.totals is None assert first_upload.upload_extras == { - "carriedforward_from": parent_commit.commitid + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, } assert first_upload.upload_type == "carriedforward" assert second_upload.build_code is None @@ -4486,7 +4742,8 @@ async def test_initialize_and_save_report_report_but_no_details_carryforward_nee ] assert second_upload.totals is None assert second_upload.upload_extras == { - "carriedforward_from": parent_commit.commitid + "carriedforward_from": parent_commit.commitid, + "parent_depth": 1, } assert second_upload.upload_type == "carriedforward" assert r.details is not None