Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parallel component computation to timeseries #1054

Merged
merged 7 commits into from
Jan 31, 2025

Conversation

joseph-sentry
Copy link
Contributor

similar to what i just did with the compute component task
when it comes time to upserting measurements in the save_commit_measurements
task, i added a feature flag and behind that feature flag i altered the
behaviour so that instead of upserting the measurements for each component
sequentially it will queue up a task for each component so that they can upsert
in parallel

I had to reorganize the code to avoid some circular imports hence save_commit_measurements
the function, being moved to tasks/save_commit_measurements.py

@joseph-sentry joseph-sentry requested a review from a team January 30, 2025 22:24
@codecov-staging
Copy link

codecov-staging bot commented Jan 30, 2025

Codecov Report

Attention: Patch coverage is 98.74214% with 2 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
tasks/upsert_component.py 92.30% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link

codecov bot commented Jan 30, 2025

Codecov Report

Attention: Patch coverage is 98.74214% with 2 lines in your changes missing coverage. Please review.

Project coverage is 97.53%. Comparing base (6302730) to head (a24a0ee).
Report is 4 commits behind head on main.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
tasks/upsert_component.py 92.30% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1054      +/-   ##
==========================================
+ Coverage   97.52%   97.53%   +0.01%     
==========================================
  Files         462      463       +1     
  Lines       37902    37986      +84     
==========================================
+ Hits        36963    37049      +86     
+ Misses        939      937       -2     
Flag Coverage Δ
integration 42.88% <27.67%> (-0.01%) ⬇️
unit 90.20% <98.74%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

⚠️ Impact Analysis from Codecov is deprecated and will be sunset on Jan 31 2025. See more

@codecov-qa
Copy link

codecov-qa bot commented Jan 30, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1795 1 1794 9
View the top 1 failed tests by shortest run time
services/tests/test_timeseries.py::TestTimeseriesService::test_delete_repository_data_side_effects
Stack Traces | 0.078s run time
self = <worker.services.tests.test_timeseries.TestTimeseriesService object at 0x7f5190bfe9c0>
dbsession = <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>
sample_report = <Report files=2>, repository = Repo<699>
mocker = <pytest_mock.plugin.MockFixture object at 0x7f518add7b90>

    def test_delete_repository_data_side_effects(
        self, dbsession, sample_report, repository, mocker
    ):
        mocker.patch(
            "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value",
            return_value=False,
        )
        mocker.patch(
            "services.report.ReportService.get_existing_report_for_commit",
            return_value=ReadOnlyReport.create_from_report(sample_report),
        )
    
        commit = CommitFactory.create(branch="foo", repository=repository)
        dbsession.add(commit)
        dbsession.flush()
        save_commit_measurements(commit)
        commit = CommitFactory.create(branch="bar", repository=repository)
        dbsession.add(commit)
        dbsession.flush()
        save_commit_measurements(commit)
    
        # Another unrelated repository, make sure that this one isn't deleted as a side effect
        other_repository = _create_repository(dbsession)
        other_commit = CommitFactory.create(branch="foo", repository=other_repository)
        dbsession.add(other_commit)
        dbsession.flush()
        save_commit_measurements(other_commit)
        other_commit = CommitFactory.create(branch="bar", repository=other_repository)
        dbsession.add(other_commit)
        dbsession.flush()
        save_commit_measurements(other_commit)
    
        assert (
            dbsession.query(Dataset)
            .filter_by(repository_id=other_repository.repoid)
            .count()
            != 0
        )
>       assert (
            dbsession.query(Measurement)
            .filter_by(repo_id=other_repository.repoid)
            .count()
            != 0
        )
E       assert 0 != 0
E        +  where 0 = <bound method Query.count of <sqlalchemy.orm.query.Query object at 0x7f518add6390>>()
E        +    where <bound method Query.count of <sqlalchemy.orm.query.Query object at 0x7f518add6390>> = <sqlalchemy.orm.query.Query object at 0x7f518add6390>.count
E        +      where <sqlalchemy.orm.query.Query object at 0x7f518add6390> = <bound method Query.filter_by of <sqlalchemy.orm.query.Query object at 0x7f518add6f90>>(repo_id=700)
E        +        where <bound method Query.filter_by of <sqlalchemy.orm.query.Query object at 0x7f518add6f90>> = <sqlalchemy.orm.query.Query object at 0x7f518add6f90>.filter_by
E        +          where <sqlalchemy.orm.query.Query object at 0x7f518add6f90> = <bound method Session.query of <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>>(Measurement)
E        +            where <bound method Session.query of <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>> = <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>.query
E        +        and   700 = Repo<700>.repoid

services/tests/test_timeseries.py:973: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link

codecov-public-qa bot commented Jan 30, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1795 1 1794 9
View the top 1 failed tests by shortest run time
services/tests/test_timeseries.py::TestTimeseriesService::test_delete_repository_data_side_effects
Stack Traces | 0.078s run time
self = <worker.services.tests.test_timeseries.TestTimeseriesService object at 0x7f5190bfe9c0>
dbsession = <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>
sample_report = <Report files=2>, repository = Repo<699>
mocker = <pytest_mock.plugin.MockFixture object at 0x7f518add7b90>

    def test_delete_repository_data_side_effects(
        self, dbsession, sample_report, repository, mocker
    ):
        mocker.patch(
            "tasks.save_commit_measurements.PARALLEL_COMPONENT_COMPARISON.check_value",
            return_value=False,
        )
        mocker.patch(
            "services.report.ReportService.get_existing_report_for_commit",
            return_value=ReadOnlyReport.create_from_report(sample_report),
        )
    
        commit = CommitFactory.create(branch="foo", repository=repository)
        dbsession.add(commit)
        dbsession.flush()
        save_commit_measurements(commit)
        commit = CommitFactory.create(branch="bar", repository=repository)
        dbsession.add(commit)
        dbsession.flush()
        save_commit_measurements(commit)
    
        # Another unrelated repository, make sure that this one isn't deleted as a side effect
        other_repository = _create_repository(dbsession)
        other_commit = CommitFactory.create(branch="foo", repository=other_repository)
        dbsession.add(other_commit)
        dbsession.flush()
        save_commit_measurements(other_commit)
        other_commit = CommitFactory.create(branch="bar", repository=other_repository)
        dbsession.add(other_commit)
        dbsession.flush()
        save_commit_measurements(other_commit)
    
        assert (
            dbsession.query(Dataset)
            .filter_by(repository_id=other_repository.repoid)
            .count()
            != 0
        )
>       assert (
            dbsession.query(Measurement)
            .filter_by(repo_id=other_repository.repoid)
            .count()
            != 0
        )
E       assert 0 != 0
E        +  where 0 = <bound method Query.count of <sqlalchemy.orm.query.Query object at 0x7f518add6390>>()
E        +    where <bound method Query.count of <sqlalchemy.orm.query.Query object at 0x7f518add6390>> = <sqlalchemy.orm.query.Query object at 0x7f518add6390>.count
E        +      where <sqlalchemy.orm.query.Query object at 0x7f518add6390> = <bound method Query.filter_by of <sqlalchemy.orm.query.Query object at 0x7f518add6f90>>(repo_id=700)
E        +        where <bound method Query.filter_by of <sqlalchemy.orm.query.Query object at 0x7f518add6f90>> = <sqlalchemy.orm.query.Query object at 0x7f518add6f90>.filter_by
E        +          where <sqlalchemy.orm.query.Query object at 0x7f518add6f90> = <bound method Session.query of <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>>(Measurement)
E        +            where <bound method Session.query of <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>> = <sqlalchemy.orm.session.Session object at 0x7f5189e31b80>.query
E        +        and   700 = Repo<700>.repoid

services/tests/test_timeseries.py:973: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link

github-actions bot commented Jan 30, 2025

✅ All tests successful. No failed tests were found.

📣 Thoughts on this report? Let Codecov know! | Powered by Codecov

@joseph-sentry joseph-sentry force-pushed the joseph/parallel-timeseries branch 3 times, most recently from 9856c08 to 3ac01a8 Compare January 31, 2025 00:20
similar to what i just did with the compute component task
when it comes time to upserting measurements in the save_commit_measurements
task, i added a feature flag and behind that feature flag i altered the
behaviour so that instead of upserting the measurements for each component
sequentially it will queue up a task for each component so that they can upsert
in parallel

I had to reorganize the code to avoid some circular imports hence save_commit_measurements
the function, being moved to tasks/save_commit_measurements.py
@joseph-sentry joseph-sentry force-pushed the joseph/parallel-timeseries branch from 3ac01a8 to e61e74d Compare January 31, 2025 00:21
)
g.apply_async()
else:
upsert_components_measurements(commit, current_yaml, db_session, report)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we changed the maybe_upsert_component_measurements to this

@@ -105,67 +72,66 @@ def maybe_upsert_flag_measurements(commit, dataset_names, db_session, report):
upsert_measurements(db_session, measurements)


def maybe_upsert_components_measurements(
commit, current_yaml, dataset_names, db_session, report
def find_duplicate_component_ids(components: list[Component], commit: Commit):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we moved this outside the maybe fn and that's what we extracted, now forming the upsert_components_measurements fn

Comment on lines 119 to 125
if component.paths or component.flag_regexes:
report_and_component_matching_flags = component.get_matching_flags(
list(report.flags.keys())
)
filtered_report = report.filter(
flags=report_and_component_matching_flags, paths=component.paths
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the primary slowness comes from the report.filter call, which is the thing we would like to parallelize.

All the rest should be trivial and fast. And I would imagine that the overhead of task spawning, fetching the report, and fetching the yaml is quite a bit of overhead.

How about:
You do the deduplication, resolution and filtering of these components in the main task.
As the main task is fetching the report anyway, you can end up with a list of component_id, flags, paths which are already filtered down to only the relevant ones.

Then when you have that filtered and resolved list, you either iterate over that list, or spawn individual tasks.

- [save_commit_measurements] move work of checking timeseries_enabled
  to the caller
- [save_commit_measurements] move work of getting the dataset names to
  the caller
- [upsert_component, save_commit_measurements] move work of finding
  relevant flags and paths to save_commit_measurements, since work does
  not need to be duplicated in parallel tasks
- [upsert_component, timeseries] remove find_duplicate_component_ids
- update tests
@joseph-sentry joseph-sentry added this pull request to the merge queue Jan 31, 2025
Merged via the queue into main with commit 1b11603 Jan 31, 2025
26 of 27 checks passed
@joseph-sentry joseph-sentry deleted the joseph/parallel-timeseries branch January 31, 2025 18:57
Copy link

sentry-io bot commented Jan 31, 2025

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ TypeError: object of type 'NoneType' has no len() app.tasks.timeseries.save_commit_measurements View Issue
  • ‼️ OperationalError: (psycopg2.OperationalError) connection to server at "prod-timescaledb.codecov.dev" (10.25.32.2), ... app.tasks.timeseries.save_commit_measurements View Issue
  • ‼️ OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly app.tasks.timeseries.save_commit_measurements View Issue
  • ‼️ OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly app.tasks.timeseries.save_commit_measurements View Issue
  • ‼️ OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly app.tasks.timeseries.save_commit_measurements View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants