diff --git a/services/tests/test_timeseries.py b/services/tests/test_timeseries.py index 546ac7c1a..2066c5cf5 100644 --- a/services/tests/test_timeseries.py +++ b/services/tests/test_timeseries.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone import pytest +from celery import group from shared.reports.readonly import ReadOnlyReport from shared.reports.resources import Report, ReportFile, ReportLine from shared.utils.sessions import Session @@ -16,8 +17,8 @@ delete_repository_measurements, repository_commits_query, repository_datasets_query, - save_commit_measurements, ) +from tasks.save_commit_measurements import save_commit_measurements @pytest.fixture @@ -101,11 +102,19 @@ def repository(dbsession): return _create_repository(dbsession) +@pytest.fixture +def dataset_names(): + return [ + MeasurementName.coverage.value, + MeasurementName.flag_coverage.value, + MeasurementName.component_coverage.value, + ] + + class TestTimeseriesService(object): def test_insert_commit_measurement( - self, dbsession, sample_report, repository, mocker + self, dbsession, sample_report, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -115,7 +124,7 @@ def test_insert_commit_measurement( dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) measurement = ( dbsession.query(Measurement) @@ -139,8 +148,9 @@ def test_insert_commit_measurement( assert measurement.branch == "foo" assert measurement.value == 60.0 - def test_save_commit_measurements_no_report(self, dbsession, repository, mocker): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + def test_save_commit_measurements_no_report( + self, dbsession, repository, dataset_names, mocker + ): mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=None, @@ -150,7 +160,7 @@ def test_save_commit_measurements_no_report(self, dbsession, repository, mocker) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) measurement = ( dbsession.query(Measurement) @@ -165,9 +175,8 @@ def test_save_commit_measurements_no_report(self, dbsession, repository, mocker) assert measurement is None def test_update_commit_measurement( - self, dbsession, sample_report, repository, mocker + self, dbsession, sample_report, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -190,7 +199,7 @@ def test_update_commit_measurement( dbsession.add(measurement) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) measurements = ( dbsession.query(Measurement) @@ -216,9 +225,8 @@ def test_update_commit_measurement( assert measurement.value == 60.0 def test_commit_measurement_insert_flags( - self, dbsession, sample_report, repository, mocker + self, dbsession, sample_report, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -240,7 +248,7 @@ def test_commit_measurement_insert_flags( dbsession.add(repository_flag2) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) measurement = ( dbsession.query(Measurement) @@ -289,9 +297,8 @@ def test_commit_measurement_insert_flags( assert measurement.value == 100.0 def test_commit_measurement_update_flags( - self, dbsession, sample_report, repository, mocker + self, dbsession, sample_report, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -339,7 +346,7 @@ def test_commit_measurement_update_flags( dbsession.add(measurement2) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) measurement = ( dbsession.query(Measurement) @@ -388,9 +395,12 @@ def test_commit_measurement_update_flags( assert measurement.value == 100.0 def test_commit_measurement_insert_components( - self, dbsession, sample_report_for_components, repository, mocker + self, dbsession, sample_report_for_components, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=False, + ) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report( @@ -402,7 +412,7 @@ def test_commit_measurement_insert_components( dbsession.add(commit) dbsession.flush() - get_repo_yaml = mocker.patch("services.timeseries.get_repo_yaml") + get_repo_yaml = mocker.patch("tasks.save_commit_measurements.get_repo_yaml") yaml_dict = { "component_management": { "default_rules": { @@ -444,7 +454,7 @@ def test_commit_measurement_insert_components( } } get_repo_yaml.return_value = UserYaml(yaml_dict) - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) # 1 for coverage, 3 for flags, 4 for valid components assert len(dbsession.query(Measurement).all()) == 8 @@ -572,10 +582,82 @@ def test_commit_measurement_insert_components( ) assert empty_path_measurements is None + def test_commit_measurement_update_component_parallel( + self, + sample_report_for_components, + repository, + dataset_names, + mocker, + mock_repo_provider, + ): + dbsession = repository.get_db_session() + mocker.patch.object(dbsession, "close") + mocker.patch("tasks.base.get_db_session", return_value=dbsession) + mocker.patch.object(group, "apply_async", group.apply) + + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=True, + ) + + mocker.patch( + "services.report.ReportService.get_existing_report_for_commit", + return_value=ReadOnlyReport.create_from_report( + sample_report_for_components + ), + ) + + commit = CommitFactory.create(branch="foo", repository=repository) + dbsession.add(commit) + dbsession.flush() + + get_repo_yaml = mocker.patch("tasks.save_commit_measurements.get_repo_yaml") + get_current_yaml = mocker.patch("tasks.upsert_component.get_repo_yaml") + yaml_dict = { + "component_management": { + "individual_components": [ + { + "component_id": "test-component-123", + "name": "test component", + "flag_regexes": ["random-flago-987"], + "paths": [r"folder/*"], + }, + ], + } + } + get_repo_yaml.return_value = UserYaml(yaml_dict) + get_current_yaml.return_value = UserYaml(yaml_dict) + + save_commit_measurements(commit, dataset_names=dataset_names) + + # Want to commit here to have the results persisted properly. + # Otherwise the results aren't going to be reflected in the select below. + # dbsession.commit() + + measurements = ( + dbsession.query(Measurement) + .filter_by(name=MeasurementName.component_coverage.value) + .all() + ) + + assert len(measurements) == 1 + dbsession.add(commit) + assert measurements[0].name == MeasurementName.component_coverage.value + assert measurements[0].owner_id == commit.repository.ownerid + assert measurements[0].repo_id == commit.repoid + assert measurements[0].measurable_id == "test-component-123" + assert measurements[0].commit_sha == commit.commitid + assert measurements[0].timestamp.replace( + tzinfo=timezone.utc + ) == commit.timestamp.replace(tzinfo=timezone.utc) + def test_commit_measurement_update_component( - self, dbsession, sample_report_for_components, repository, mocker + self, dbsession, sample_report_for_components, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=False, + ) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report( @@ -587,7 +669,7 @@ def test_commit_measurement_update_component( dbsession.add(commit) dbsession.flush() - get_repo_yaml = mocker.patch("services.timeseries.get_repo_yaml") + get_repo_yaml = mocker.patch("tasks.save_commit_measurements.get_repo_yaml") yaml_dict = { "component_management": { "individual_components": [ @@ -615,7 +697,7 @@ def test_commit_measurement_update_component( dbsession.add(measurement) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) # Want to commit here to have the results persisted properly. # Otherwise the results aren't going to be reflected in the select below. @@ -644,8 +726,11 @@ def test_commit_measurement_update_component( assert measurement.branch == "foo" assert measurement.value == 50.0 - def test_commit_measurement_no_datasets(self, dbsession, mocker): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + def test_commit_measurement_no_datasets(self, dbsession, dataset_names, mocker): + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=False, + ) repository = RepositoryFactory.create() dbsession.add(repository) @@ -655,7 +740,7 @@ def test_commit_measurement_no_datasets(self, dbsession, mocker): dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=[]) assert dbsession.query(Measurement).count() == 0 @@ -711,6 +796,10 @@ def test_repository_datasets_query(self, repository): ] def test_backfill_batch_size(self, repository, mocker): + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=False, + ) dbsession = repository.get_db_session() coverage_dataset = ( dbsession.query(Dataset.name) @@ -785,8 +874,9 @@ def test_backfill_batch_size(self, repository, mocker): batch_size = backfill_batch_size(repository, component_coverage_dataset) assert batch_size == 100 - def test_delete_repository_data(self, dbsession, sample_report, repository, mocker): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + def test_delete_repository_data( + self, dbsession, sample_report, repository, dataset_names, mocker + ): mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -795,11 +885,11 @@ def test_delete_repository_data(self, dbsession, sample_report, repository, mock commit = CommitFactory.create(branch="foo", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) commit = CommitFactory.create(branch="bar", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) assert ( dbsession.query(Dataset).filter_by(repository_id=repository.repoid).count() @@ -823,9 +913,12 @@ def test_delete_repository_data(self, dbsession, sample_report, repository, mock ) def test_delete_repository_data_side_effects( - self, dbsession, sample_report, repository, mocker + self, dbsession, sample_report, repository, dataset_names, mocker ): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=False, + ) mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report(sample_report), @@ -834,22 +927,35 @@ def test_delete_repository_data_side_effects( commit = CommitFactory.create(branch="foo", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) commit = CommitFactory.create(branch="bar", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) # Another unrelated repository, make sure that this one isn't deleted as a side effect other_repository = _create_repository(dbsession) other_commit = CommitFactory.create(branch="foo", repository=other_repository) dbsession.add(other_commit) dbsession.flush() - save_commit_measurements(other_commit) + save_commit_measurements(other_commit, dataset_names=dataset_names) other_commit = CommitFactory.create(branch="bar", repository=other_repository) dbsession.add(other_commit) dbsession.flush() - save_commit_measurements(other_commit) + save_commit_measurements(other_commit, dataset_names=dataset_names) + + assert ( + dbsession.query(Dataset) + .filter_by(repository_id=other_repository.repoid) + .count() + != 0 + ) + assert ( + dbsession.query(Measurement) + .filter_by(repo_id=other_repository.repoid) + .count() + != 0 + ) delete_repository_data(repository) @@ -878,7 +984,13 @@ def test_delete_repository_data_side_effects( ) def test_delete_repository_data_measurements_only( - self, dbsession, sample_report_for_components, repository, mocker + self, + dbsession, + sample_report_for_components, + repository, + dataset_names, + mocker, + mock_repo_provider, ): def validate_invariants(repository, other_repository): assert ( @@ -901,7 +1013,15 @@ def validate_invariants(repository, other_repository): == 16 ) - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) + mocker.patch( + "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value", + return_value=True, + ) + dbsession = repository.get_db_session() + mocker.patch.object(dbsession, "close") + mocker.patch("tasks.base.get_db_session", return_value=dbsession) + mocker.patch.object(group, "apply_async", group.apply) + mocker.patch( "services.report.ReportService.get_existing_report_for_commit", return_value=ReadOnlyReport.create_from_report( @@ -909,7 +1029,8 @@ def validate_invariants(repository, other_repository): ), ) - get_repo_yaml = mocker.patch("services.timeseries.get_repo_yaml") + get_repo_yaml = mocker.patch("tasks.save_commit_measurements.get_repo_yaml") + get_current_yaml = mocker.patch("tasks.upsert_component.get_repo_yaml") yaml_dict = { "component_management": { "default_rules": { @@ -933,26 +1054,27 @@ def validate_invariants(repository, other_repository): } } get_repo_yaml.return_value = UserYaml(yaml_dict) + get_current_yaml.return_value = UserYaml(yaml_dict) commit = CommitFactory.create(branch="foo", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) commit = CommitFactory.create(branch="bar", repository=repository) dbsession.add(commit) dbsession.flush() - save_commit_measurements(commit) + save_commit_measurements(commit, dataset_names=dataset_names) # Another unrelated repository, make sure that this one isn't deleted as a side effect other_repository = _create_repository(dbsession) other_commit = CommitFactory.create(branch="foo", repository=other_repository) dbsession.add(other_commit) dbsession.flush() - save_commit_measurements(other_commit) + save_commit_measurements(other_commit, dataset_names=dataset_names) other_commit = CommitFactory.create(branch="bar", repository=other_repository) dbsession.add(other_commit) dbsession.flush() - save_commit_measurements(other_commit) + save_commit_measurements(other_commit, dataset_names=dataset_names) flag_ids = set( [ @@ -966,6 +1088,8 @@ def validate_invariants(repository, other_repository): ] ) + m = dbsession.query(Measurement).filter_by(repo_id=repository.repoid).all() + # 2x(1 coverage, 3 flag coverage, 4 component coverage) = 16 assert ( dbsession.query(Measurement).filter_by(repo_id=repository.repoid).count() diff --git a/services/timeseries.py b/services/timeseries.py index 5047ad17d..8530e4be2 100644 --- a/services/timeseries.py +++ b/services/timeseries.py @@ -4,7 +4,6 @@ from shared.components import Component from shared.reports.readonly import ReadOnlyReport -from shared.timeseries.helpers import is_timeseries_enabled from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session @@ -12,43 +11,11 @@ from database.models.core import Repository from database.models.reports import RepositoryFlag from helpers.timeseries import backfill_max_batch_size -from services.report import ReportService -from services.yaml import get_repo_yaml +from services.yaml import UserYaml, get_repo_yaml log = logging.getLogger(__name__) -def save_commit_measurements( - commit: Commit, dataset_names: Iterable[str] = None -) -> None: - if not is_timeseries_enabled(): - return - - if dataset_names is None: - dataset_names = [ - dataset.name for dataset in repository_datasets_query(commit.repository) - ] - if len(dataset_names) == 0: - return - - current_yaml = get_repo_yaml(commit.repository) - report_service = ReportService(current_yaml) - report = report_service.get_existing_report_for_commit( - commit, report_class=ReadOnlyReport - ) - - if report is None: - return - - db_session = commit.get_db_session() - - maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report) - maybe_upsert_components_measurements( - commit, current_yaml, dataset_names, db_session, report - ) - maybe_upsert_flag_measurements(commit, dataset_names, db_session, report) - - def maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report): if MeasurementName.coverage.value in dataset_names: if report.totals.coverage is not None: @@ -105,67 +72,48 @@ def maybe_upsert_flag_measurements(commit, dataset_names, db_session, report): upsert_measurements(db_session, measurements) -def maybe_upsert_components_measurements( - commit, current_yaml, dataset_names, db_session, report +def upsert_components_measurements( + commit: Commit, + current_yaml: UserYaml, + db_session: Session, + report: ReadOnlyReport, ): - if MeasurementName.component_coverage.value in dataset_names: - components = current_yaml.get_components() - if components: - component_measurements = dict() - - for component in components: - if component.paths or component.flag_regexes: - report_and_component_matching_flags = component.get_matching_flags( - report.flags.keys() - ) - filtered_report = report.filter( - flags=report_and_component_matching_flags, paths=component.paths - ) - if filtered_report.totals.coverage is not None: - # This measurement key is being used to check for measurement existence and log the warning. - # TODO: see if we can remove this warning message as it's necessary to emit this warning. - # We're currently not doing anything with this information. - measurement_key = create_component_measurement_key( - commit, component - ) - if ( - existing_measurement := component_measurements.get( - measurement_key - ) - ) is not None: - log.warning( - "Duplicate measurement keys being added to measurements", - extra=dict( - repoid=commit.repoid, - commit_id=commit.id_, - commitid=commit.commitid, - measurement_key=measurement_key, - existing_value=existing_measurement.get("value"), - new_value=float(filtered_report.totals.coverage), - ), - ) - - component_measurements[measurement_key] = ( - create_measurement_dict( - MeasurementName.component_coverage.value, - commit, - measurable_id=f"{component.component_id}", - value=float(filtered_report.totals.coverage), - ) - ) - - measurements = list(component_measurements.values()) - if len(measurements) > 0: - upsert_measurements(db_session, measurements) - log.info( - "Upserted component coverage measurements", - extra=dict( - repoid=commit.repoid, - commit_id=commit.id_, - count=len(measurements), - ), + component_measurements = dict() + components = current_yaml.get_components() + for component in components: + if component.paths or component.flag_regexes: + report_and_component_matching_flags = component.get_matching_flags( + list(report.flags.keys()) + ) + filtered_report = report.filter( + flags=report_and_component_matching_flags, + paths=component.paths, + ) + if filtered_report.totals.coverage is not None: + # This measurement key is being used to check for measurement existence and log the warning. + # TODO: see if we can remove this warning message as it's necessary to emit this warning. + # We're currently not doing anything with this information. + measurement_key = create_component_measurement_key(commit, component) + + component_measurements[measurement_key] = create_measurement_dict( + MeasurementName.component_coverage.value, + commit, + measurable_id=f"{component.component_id}", + value=float(filtered_report.totals.coverage), ) + measurements = list(component_measurements.values()) + if len(measurements) > 0: + upsert_measurements(db_session, measurements) + log.info( + "Upserted component coverage measurements", + extra=dict( + repoid=commit.repoid, + commit_id=commit.id_, + count=len(measurements), + ), + ) + def create_measurement_dict( name: str, commit: Commit, measurable_id: str, value: float @@ -250,7 +198,6 @@ def repository_datasets_query( def repository_flag_ids(repository: Repository) -> Mapping[str, int]: db_session = repository.get_db_session() - repo_flags = ( db_session.query(RepositoryFlag).filter_by(repository=repository).yield_per(100) ) diff --git a/services/yaml/__init__.py b/services/yaml/__init__.py index 6120cb378..1da8fcae9 100644 --- a/services/yaml/__init__.py +++ b/services/yaml/__init__.py @@ -30,7 +30,7 @@ def get_repo_yaml(repository: Repository): ) -async def get_current_yaml(commit: Commit, repository_service) -> dict: +async def get_current_yaml(commit: Commit, repository_service) -> UserYaml: """ Fetches what the current yaml is supposed to be diff --git a/tasks/__init__.py b/tasks/__init__.py index 3518f6acf..c9d9d474a 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -65,3 +65,4 @@ from tasks.upload import upload_task from tasks.upload_finisher import upload_finisher_task from tasks.upload_processor import upload_processor_task +from tasks.upsert_component import upsert_component_task diff --git a/tasks/save_commit_measurements.py b/tasks/save_commit_measurements.py index 4a240b417..2cec8958c 100644 --- a/tasks/save_commit_measurements.py +++ b/tasks/save_commit_measurements.py @@ -1,17 +1,80 @@ import logging -from typing import Iterable +from typing import Sequence +from celery import group from shared.celery_config import timeseries_save_commit_measurements_task_name -from sqlalchemy.orm.session import Session +from shared.reports.readonly import ReadOnlyReport +from sqlalchemy.orm import Session from app import celery_app -from database.models.core import Commit -from services.timeseries import save_commit_measurements +from database.models import Commit, MeasurementName +from rollouts import PARALLEL_COMPONENT_COMPARISON +from services.report import ReportService +from services.timeseries import ( + maybe_upsert_coverage_measurement, + maybe_upsert_flag_measurements, + upsert_components_measurements, +) +from services.yaml import get_repo_yaml from tasks.base import BaseCodecovTask +from tasks.upsert_component import upsert_component_task log = logging.getLogger(__name__) +def save_commit_measurements(commit: Commit, dataset_names: Sequence[str]) -> None: + if len(dataset_names) == 0: + log.debug( + "No datasets found for commit", + extra=dict(commitid=commit.commitid, repoid=commit.repoid), + ) + return + + current_yaml = get_repo_yaml(commit.repository) + report_service = ReportService(current_yaml) + report = report_service.get_existing_report_for_commit( + commit, report_class=ReadOnlyReport + ) + + if report is None: + log.warning( + "No report found for commit", + extra=dict(commitid=commit.commitid, repoid=commit.repoid), + ) + return + + db_session = commit.get_db_session() + + maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report) + if MeasurementName.component_coverage.value in dataset_names: + components = current_yaml.get_components() + if components: + if PARALLEL_COMPONENT_COMPARISON.check_value(commit.repository.repoid): + task_signatures = [] + components = current_yaml.get_components() + for component in components: + if component.paths or component.flag_regexes: + report_and_component_matching_flags = ( + component.get_matching_flags(list(report.flags.keys())) + ) + task_signatures.append( + upsert_component_task.s( + commit.commitid, + commit.repoid, + component.component_id, + report_and_component_matching_flags, + component.paths, + ) + ) + if task_signatures: + task_group = group(task_signatures) + task_group.apply_async() + else: + upsert_components_measurements(commit, current_yaml, db_session, report) + + maybe_upsert_flag_measurements(commit, dataset_names, db_session, report) + + class SaveCommitMeasurementsTask( BaseCodecovTask, name=timeseries_save_commit_measurements_task_name ): @@ -20,7 +83,7 @@ def run_impl( db_session: Session, commitid: str, repoid: int, - dataset_names: Iterable[int] = None, + dataset_names: Sequence[str], *args, **kwargs, ): diff --git a/tasks/tests/integration/test_timeseries_backfill.py b/tasks/tests/integration/test_timeseries_backfill.py index d33e9effd..c44530e58 100644 --- a/tasks/tests/integration/test_timeseries_backfill.py +++ b/tasks/tests/integration/test_timeseries_backfill.py @@ -10,7 +10,6 @@ @pytest.mark.integration def test_backfill_dataset_run_impl(dbsession, mocker, mock_storage): - mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True) mocker.patch("tasks.timeseries_backfill.is_timeseries_enabled", return_value=True) mocked_app = mocker.patch.object( TimeseriesBackfillCommitsTask, diff --git a/tasks/tests/unit/test_save_commit_measurements.py b/tasks/tests/unit/test_save_commit_measurements.py index 7a8d97f48..8c20600e7 100644 --- a/tasks/tests/unit/test_save_commit_measurements.py +++ b/tasks/tests/unit/test_save_commit_measurements.py @@ -1,4 +1,5 @@ from database.tests.factories.core import CommitFactory, OwnerFactory, RepositoryFactory +from services.timeseries import MeasurementName from tasks.save_commit_measurements import SaveCommitMeasurementsTask @@ -21,10 +22,20 @@ def test_save_commit_measurements_success(self, dbsession, mocker): task = SaveCommitMeasurementsTask() assert task.run_impl( - dbsession, commitid=commit.commitid, repoid=commit.repoid + dbsession, + commitid=commit.commitid, + repoid=commit.repoid, + dataset_names=[ + MeasurementName.coverage.value, + MeasurementName.flag_coverage.value, + ], ) == {"successful": True} save_commit_measurements_mock.assert_called_with( - commit=commit, dataset_names=None + commit=commit, + dataset_names=[ + MeasurementName.coverage.value, + MeasurementName.flag_coverage.value, + ], ) def test_save_commit_measurements_no_commit(self, dbsession): @@ -59,7 +70,13 @@ def test_save_commit_measurements_exception(self, mocker, dbsession): task = SaveCommitMeasurementsTask() assert task.run_impl( - dbsession, commitid=commit.commitid, repoid=commit.repoid + dbsession, + commitid=commit.commitid, + repoid=commit.repoid, + dataset_names=[ + MeasurementName.coverage.value, + MeasurementName.flag_coverage.value, + ], ) == { "successful": False, "error": "exception", diff --git a/tasks/tests/unit/test_upload_finisher_task.py b/tasks/tests/unit/test_upload_finisher_task.py index e720f7193..6c11a2798 100644 --- a/tasks/tests/unit/test_upload_finisher_task.py +++ b/tasks/tests/unit/test_upload_finisher_task.py @@ -11,12 +11,14 @@ from database.models.reports import CommitReport from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory from database.tests.factories.core import UploadFactory +from database.tests.factories.timeseries import DatasetFactory from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.log_context import LogContext, set_log_context from services.processing.merging import get_joined_flag, update_uploads from services.processing.types import MergeResult, ProcessingResult +from services.timeseries import MeasurementName from tasks.upload_finisher import ( ReportService, ShouldCallNotifyResult, @@ -616,6 +618,8 @@ def test_upload_finisher_task_calls_save_commit_measurements_task( ): mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) mocker.patch("tasks.upload_finisher.update_uploads") + + mocker.patch("tasks.upload_finisher.is_timeseries_enabled", return_value=True) mocked_app = mocker.patch.object( UploadFinisherTask, "app", @@ -630,6 +634,24 @@ def test_upload_finisher_task_calls_save_commit_measurements_task( dbsession.add(commit) dbsession.flush() + mocker.patch( + "tasks.upload_finisher.repository_datasets_query", + return_value=[ + DatasetFactory.create( + repository_id=commit.repository.repoid, + name=MeasurementName.coverage.value, + ), + DatasetFactory.create( + repository_id=commit.repository.repoid, + name=MeasurementName.flag_coverage.value, + ), + DatasetFactory.create( + repository_id=commit.repository.repoid, + name=MeasurementName.component_coverage.value, + ), + ], + ) + previous_results = [{"upload_id": 0, "arguments": {}, "successful": True}] UploadFinisherTask().run_impl( dbsession, @@ -645,7 +667,11 @@ def test_upload_finisher_task_calls_save_commit_measurements_task( kwargs={ "commitid": commit.commitid, "repoid": commit.repoid, - "dataset_names": None, + "dataset_names": [ + MeasurementName.coverage.value, + MeasurementName.flag_coverage.value, + MeasurementName.component_coverage.value, + ], } ) diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 91968617b..8aad5561a 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -16,6 +16,7 @@ upload_finisher_task_name, ) from shared.reports.resources import Report +from shared.timeseries.helpers import is_timeseries_enabled from shared.torngit.exceptions import TorngitError from shared.yaml import UserYaml @@ -40,6 +41,7 @@ from services.redis import get_redis_connection from services.report import ReportService from services.repository import get_repo_provider_service +from services.timeseries import repository_datasets_query from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.upload_processor import MAX_RETRIES, UPLOAD_PROCESSING_LOCK_NAME @@ -152,11 +154,19 @@ def run_impl( processing_results, report_code, ) - self.app.tasks[ - timeseries_save_commit_measurements_task_name - ].apply_async( - kwargs=dict(commitid=commitid, repoid=repoid, dataset_names=None) - ) + dataset_names = [ + dataset.name for dataset in repository_datasets_query(repository) + ] + if is_timeseries_enabled(): + self.app.tasks[ + timeseries_save_commit_measurements_task_name + ].apply_async( + kwargs=dict( + commitid=commitid, + repoid=repoid, + dataset_names=dataset_names, + ) + ) # Mark the repository as updated so it will appear earlier in the list # of recently-active repositories now = datetime.now(tz=timezone.utc) diff --git a/tasks/upsert_component.py b/tasks/upsert_component.py new file mode 100644 index 000000000..a2edfd432 --- /dev/null +++ b/tasks/upsert_component.py @@ -0,0 +1,67 @@ +import logging + +from shared.reports.readonly import ReadOnlyReport +from sqlalchemy.orm import Session + +from app import celery_app +from database.models import Commit, MeasurementName +from services.report import ReportService +from services.timeseries import create_measurement_dict, upsert_measurements +from services.yaml import get_repo_yaml +from tasks.base import BaseCodecovTask + +log = logging.getLogger(__name__) + + +class UpsertComponentTask(BaseCodecovTask): + def run_impl( + self, + db_session: Session, + commitid: str, + repoid: int, + component_id: str, + flags: list[str], + paths: list[str], + *args, + **kwargs, + ): + log.info("Upserting component", extra=dict(commitid=commitid, repoid=repoid)) + + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + + current_yaml = get_repo_yaml(commit.repository) + + report_service = ReportService(current_yaml) + report = report_service.get_existing_report_for_commit( + commit, report_class=ReadOnlyReport + ) + + if report is None: + log.warning( + "Upsert Component: No report found for commit", + extra=dict( + component_id=component_id, + commitid=commitid, + repoid=repoid, + ), + ) + return + + filtered_report = report.filter(flags=flags, paths=paths) + if filtered_report.totals.coverage is not None: + measurement = create_measurement_dict( + MeasurementName.component_coverage.value, + commit, + measurable_id=f"{component_id}", + value=float(filtered_report.totals.coverage), + ) + + upsert_measurements(db_session, [measurement]) + + +registered_task = celery_app.register_task(UpsertComponentTask()) +upsert_component_task = celery_app.tasks[registered_task.name]