Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parallel component computation to timeseries #1054

Merged
merged 7 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 168 additions & 44 deletions services/tests/test_timeseries.py

Large diffs are not rendered by default.

133 changes: 40 additions & 93 deletions services/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,18 @@

from shared.components import Component
from shared.reports.readonly import ReadOnlyReport
from shared.timeseries.helpers import is_timeseries_enabled
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from database.models import Commit, Dataset, Measurement, MeasurementName
from database.models.core import Repository
from database.models.reports import RepositoryFlag
from helpers.timeseries import backfill_max_batch_size
from services.report import ReportService
from services.yaml import get_repo_yaml
from services.yaml import UserYaml, get_repo_yaml

log = logging.getLogger(__name__)


def save_commit_measurements(
commit: Commit, dataset_names: Iterable[str] = None
) -> None:
if not is_timeseries_enabled():
return

if dataset_names is None:
dataset_names = [
dataset.name for dataset in repository_datasets_query(commit.repository)
]
if len(dataset_names) == 0:
return

current_yaml = get_repo_yaml(commit.repository)
report_service = ReportService(current_yaml)
report = report_service.get_existing_report_for_commit(
commit, report_class=ReadOnlyReport
)

if report is None:
return

db_session = commit.get_db_session()

maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report)
maybe_upsert_components_measurements(
commit, current_yaml, dataset_names, db_session, report
)
maybe_upsert_flag_measurements(commit, dataset_names, db_session, report)


def maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report):
if MeasurementName.coverage.value in dataset_names:
if report.totals.coverage is not None:
Expand Down Expand Up @@ -105,67 +72,48 @@ def maybe_upsert_flag_measurements(commit, dataset_names, db_session, report):
upsert_measurements(db_session, measurements)


def maybe_upsert_components_measurements(
commit, current_yaml, dataset_names, db_session, report
def upsert_components_measurements(
commit: Commit,
current_yaml: UserYaml,
db_session: Session,
report: ReadOnlyReport,
):
if MeasurementName.component_coverage.value in dataset_names:
components = current_yaml.get_components()
if components:
component_measurements = dict()

for component in components:
if component.paths or component.flag_regexes:
report_and_component_matching_flags = component.get_matching_flags(
report.flags.keys()
)
filtered_report = report.filter(
flags=report_and_component_matching_flags, paths=component.paths
)
if filtered_report.totals.coverage is not None:
# This measurement key is being used to check for measurement existence and log the warning.
# TODO: see if we can remove this warning message as it's necessary to emit this warning.
# We're currently not doing anything with this information.
measurement_key = create_component_measurement_key(
commit, component
)
if (
existing_measurement := component_measurements.get(
measurement_key
)
) is not None:
log.warning(
"Duplicate measurement keys being added to measurements",
extra=dict(
repoid=commit.repoid,
commit_id=commit.id_,
commitid=commit.commitid,
measurement_key=measurement_key,
existing_value=existing_measurement.get("value"),
new_value=float(filtered_report.totals.coverage),
),
)

component_measurements[measurement_key] = (
create_measurement_dict(
MeasurementName.component_coverage.value,
commit,
measurable_id=f"{component.component_id}",
value=float(filtered_report.totals.coverage),
)
)

measurements = list(component_measurements.values())
if len(measurements) > 0:
upsert_measurements(db_session, measurements)
log.info(
"Upserted component coverage measurements",
extra=dict(
repoid=commit.repoid,
commit_id=commit.id_,
count=len(measurements),
),
component_measurements = dict()
components = current_yaml.get_components()
for component in components:
if component.paths or component.flag_regexes:
report_and_component_matching_flags = component.get_matching_flags(
list(report.flags.keys())
)
filtered_report = report.filter(
flags=report_and_component_matching_flags,
paths=component.paths,
)
if filtered_report.totals.coverage is not None:
# This measurement key is being used to check for measurement existence and log the warning.
# TODO: see if we can remove this warning message as it's necessary to emit this warning.
# We're currently not doing anything with this information.
measurement_key = create_component_measurement_key(commit, component)

component_measurements[measurement_key] = create_measurement_dict(
MeasurementName.component_coverage.value,
commit,
measurable_id=f"{component.component_id}",
value=float(filtered_report.totals.coverage),
)

measurements = list(component_measurements.values())
if len(measurements) > 0:
upsert_measurements(db_session, measurements)
log.info(
"Upserted component coverage measurements",
extra=dict(
repoid=commit.repoid,
commit_id=commit.id_,
count=len(measurements),
),
)


def create_measurement_dict(
name: str, commit: Commit, measurable_id: str, value: float
Expand Down Expand Up @@ -250,7 +198,6 @@ def repository_datasets_query(

def repository_flag_ids(repository: Repository) -> Mapping[str, int]:
db_session = repository.get_db_session()

repo_flags = (
db_session.query(RepositoryFlag).filter_by(repository=repository).yield_per(100)
)
Expand Down
2 changes: 1 addition & 1 deletion services/yaml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_repo_yaml(repository: Repository):
)


async def get_current_yaml(commit: Commit, repository_service) -> dict:
async def get_current_yaml(commit: Commit, repository_service) -> UserYaml:
"""
Fetches what the current yaml is supposed to be

Expand Down
1 change: 1 addition & 0 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@
from tasks.upload import upload_task
from tasks.upload_finisher import upload_finisher_task
from tasks.upload_processor import upload_processor_task
from tasks.upsert_component import upsert_component_task
73 changes: 68 additions & 5 deletions tasks/save_commit_measurements.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,80 @@
import logging
from typing import Iterable
from typing import Sequence

from celery import group
from shared.celery_config import timeseries_save_commit_measurements_task_name
from sqlalchemy.orm.session import Session
from shared.reports.readonly import ReadOnlyReport
from sqlalchemy.orm import Session

from app import celery_app
from database.models.core import Commit
from services.timeseries import save_commit_measurements
from database.models import Commit, MeasurementName
from rollouts import PARALLEL_COMPONENT_COMPARISON
from services.report import ReportService
from services.timeseries import (
maybe_upsert_coverage_measurement,
maybe_upsert_flag_measurements,
upsert_components_measurements,
)
from services.yaml import get_repo_yaml
from tasks.base import BaseCodecovTask
from tasks.upsert_component import upsert_component_task

log = logging.getLogger(__name__)


def save_commit_measurements(commit: Commit, dataset_names: Sequence[str]) -> None:
if len(dataset_names) == 0:
log.debug(
"No datasets found for commit",
extra=dict(commitid=commit.commitid, repoid=commit.repoid),
)
return

current_yaml = get_repo_yaml(commit.repository)
report_service = ReportService(current_yaml)
report = report_service.get_existing_report_for_commit(
commit, report_class=ReadOnlyReport
)

if report is None:
log.warning(
"No report found for commit",
extra=dict(commitid=commit.commitid, repoid=commit.repoid),
)
return

db_session = commit.get_db_session()

maybe_upsert_coverage_measurement(commit, dataset_names, db_session, report)
if MeasurementName.component_coverage.value in dataset_names:
components = current_yaml.get_components()
if components:
if PARALLEL_COMPONENT_COMPARISON.check_value(commit.repository.repoid):
task_signatures = []
components = current_yaml.get_components()
for component in components:
if component.paths or component.flag_regexes:
report_and_component_matching_flags = (
component.get_matching_flags(list(report.flags.keys()))
)
task_signatures.append(
upsert_component_task.s(
commit.commitid,
commit.repoid,
component.component_id,
report_and_component_matching_flags,
component.paths,
)
)
if task_signatures:
task_group = group(task_signatures)
task_group.apply_async()
else:
upsert_components_measurements(commit, current_yaml, db_session, report)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we changed the maybe_upsert_component_measurements to this


maybe_upsert_flag_measurements(commit, dataset_names, db_session, report)


class SaveCommitMeasurementsTask(
BaseCodecovTask, name=timeseries_save_commit_measurements_task_name
):
Expand All @@ -20,7 +83,7 @@ def run_impl(
db_session: Session,
commitid: str,
repoid: int,
dataset_names: Iterable[int] = None,
dataset_names: Sequence[str],
*args,
**kwargs,
):
Expand Down
1 change: 0 additions & 1 deletion tasks/tests/integration/test_timeseries_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

@pytest.mark.integration
def test_backfill_dataset_run_impl(dbsession, mocker, mock_storage):
mocker.patch("services.timeseries.is_timeseries_enabled", return_value=True)
mocker.patch("tasks.timeseries_backfill.is_timeseries_enabled", return_value=True)
mocked_app = mocker.patch.object(
TimeseriesBackfillCommitsTask,
Expand Down
23 changes: 20 additions & 3 deletions tasks/tests/unit/test_save_commit_measurements.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from database.tests.factories.core import CommitFactory, OwnerFactory, RepositoryFactory
from services.timeseries import MeasurementName
from tasks.save_commit_measurements import SaveCommitMeasurementsTask


Expand All @@ -21,10 +22,20 @@ def test_save_commit_measurements_success(self, dbsession, mocker):

task = SaveCommitMeasurementsTask()
assert task.run_impl(
dbsession, commitid=commit.commitid, repoid=commit.repoid
dbsession,
commitid=commit.commitid,
repoid=commit.repoid,
dataset_names=[
MeasurementName.coverage.value,
MeasurementName.flag_coverage.value,
],
) == {"successful": True}
save_commit_measurements_mock.assert_called_with(
commit=commit, dataset_names=None
commit=commit,
dataset_names=[
MeasurementName.coverage.value,
MeasurementName.flag_coverage.value,
],
)

def test_save_commit_measurements_no_commit(self, dbsession):
Expand Down Expand Up @@ -59,7 +70,13 @@ def test_save_commit_measurements_exception(self, mocker, dbsession):

task = SaveCommitMeasurementsTask()
assert task.run_impl(
dbsession, commitid=commit.commitid, repoid=commit.repoid
dbsession,
commitid=commit.commitid,
repoid=commit.repoid,
dataset_names=[
MeasurementName.coverage.value,
MeasurementName.flag_coverage.value,
],
) == {
"successful": False,
"error": "exception",
Expand Down
28 changes: 27 additions & 1 deletion tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
from database.models.reports import CommitReport
from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory
from database.tests.factories.core import UploadFactory
from database.tests.factories.timeseries import DatasetFactory
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.log_context import LogContext, set_log_context
from services.processing.merging import get_joined_flag, update_uploads
from services.processing.types import MergeResult, ProcessingResult
from services.timeseries import MeasurementName
from tasks.upload_finisher import (
ReportService,
ShouldCallNotifyResult,
Expand Down Expand Up @@ -616,6 +618,8 @@ def test_upload_finisher_task_calls_save_commit_measurements_task(
):
mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[])
mocker.patch("tasks.upload_finisher.update_uploads")

mocker.patch("tasks.upload_finisher.is_timeseries_enabled", return_value=True)
mocked_app = mocker.patch.object(
UploadFinisherTask,
"app",
Expand All @@ -630,6 +634,24 @@ def test_upload_finisher_task_calls_save_commit_measurements_task(
dbsession.add(commit)
dbsession.flush()

mocker.patch(
"tasks.upload_finisher.repository_datasets_query",
return_value=[
DatasetFactory.create(
repository_id=commit.repository.repoid,
name=MeasurementName.coverage.value,
),
DatasetFactory.create(
repository_id=commit.repository.repoid,
name=MeasurementName.flag_coverage.value,
),
DatasetFactory.create(
repository_id=commit.repository.repoid,
name=MeasurementName.component_coverage.value,
),
],
)

previous_results = [{"upload_id": 0, "arguments": {}, "successful": True}]
UploadFinisherTask().run_impl(
dbsession,
Expand All @@ -645,7 +667,11 @@ def test_upload_finisher_task_calls_save_commit_measurements_task(
kwargs={
"commitid": commit.commitid,
"repoid": commit.repoid,
"dataset_names": None,
"dataset_names": [
MeasurementName.coverage.value,
MeasurementName.flag_coverage.value,
MeasurementName.component_coverage.value,
],
}
)

Expand Down
Loading
Loading