Skip to content

Avoid change streams on the storage database #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 17, 2025
Merged

Conversation

rkistner
Copy link
Contributor

@rkistner rkistner commented Jun 12, 2025

Background

The MongoDB storage adapter relied on change streams to detect changes to:

  1. Read checkpoints.
  2. Write checkpoints (implemented in Optimize write checkpoints lookups #230).

The idea is that each API process is "notified" of changes, so that:

  1. There is low overhead when the instance is idle.
  2. Latency is short.

The issue

The issue is that change streams can have high overhead on a cluster. A change stream is effectively reading all changes in the oplog, then filtering it to the watched collection/pipeline.

This is fine when you only have a small number of open change streams, or low volumes of writes. However, we have cases where there are 100+ change streams open at a time. And even though those collections are not modified often, when you also have a 20k/s write rate (could happen when reprocessing sync rules), you suddenly end up with 100k document scans/s, even though very few documents are returned.

I cannot find any good documentation on this - this performance impact is not mentioned in the docs. But this script demonstrates the issue quite clearly: https://gist.github.com/rkistner/0d898880b0a0a48d1557f64e01992795

I also suspect that MongoDB has an optimization for this issue in Flex clusters, but that code is private unfortunately.

The fix

The fix is to not use watch/change streams in the storage adapter. The actual implementation is different for read and write checkpoints.

Read checkpoints

Diff for this part

We implement this similar to the NOTIFY functionality we use for Postgres storage:

  1. Each time a read checkpoint is committed, we write an empty document to a new checkpoint_events capped collection.
  2. The API processes watch this collection for changes, by using a tailable cursor.
  3. When a change is seen, it fetches the latest state from the sync_rules collection.

An alternative would be to just use polling on sync_rules. However, method has lower latency, and reduced overhead when the instance is mostly idle.

Tailable cursors are an under-documented feature, but it does appear to work well for this case. It gives functionality similar to change streams, with better efficiency, at the cost of requiring explicit writes to the collection.

Write checkpoints

Diff for this part

For write checkpoints, we now use the same mechanism as for bucket_data and parameter_data: On each new read checkpoint, we read all the write checkpoints created after the previous read checkpoint.

What makes this a larger change is that:

  1. We did not previously record sufficient info to look up the write checkpoints between two read checkpoints.
  2. Managed write checkpoints are persisted in a completely different way from custom write checkpoints, so this requires separate implementations for each.
    a. Custom write checkpoints are now persisted in the same batch/transaction as other data, and gets a matching op_id.
    b. Manged write checkpoints gets a processed_at_lsn field, populated when a read checkpoints are committed. We may change this to also use an op_id in the future, but that would complicate the current implementation a bit.

This reverts big parts of #230, but does not go back to the old approach. This actually results in less code and a simpler architecture overall.

@rkistner rkistner requested a review from stevensJourney June 12, 2025 15:02
Copy link

changeset-bot bot commented Jun 12, 2025

🦋 Changeset detected

Latest commit: d5abe45

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 11 packages
Name Type
@powersync/service-module-postgres-storage Minor
@powersync/service-module-mongodb-storage Minor
@powersync/service-core-tests Minor
@powersync/service-module-postgres Minor
@powersync/service-core Minor
@powersync/service-schema Minor
@powersync/service-module-mongodb Patch
@powersync/service-module-mysql Patch
@powersync/service-image Minor
@powersync/service-module-core Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@rkistner rkistner requested a review from Copilot June 17, 2025 11:18
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

The primary purpose of this PR is to remove change streams from the storage adapter and adopt a more efficient, capped-collection/tailable-cursor–based mechanism for checkpoint notifications. Key changes include the complete removal of Demultiplexer and its tests, refactoring of the write checkpoint APIs (in both Postgres and MongoDB), and updates in test and migration files to support the new notification mechanism.

Reviewed Changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated no comments.

Show a summary per file
File Description
packages/service-core/test/src/demultiplexer.test.ts Removed Demultiplexer tests as the functionality has been deprecated.
packages/service-core/src/streams/streams-index.ts Removed export of Demultiplexer to reflect its removal.
packages/service-core/src/storage/WriteCheckpointAPI.ts Dropped unused interfaces/methods relating to change stream-based checkpoint watching.
modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts Refactored methods to replace watch methods with a new checkpoint change API and updated error types.
modules/module-mongodb-storage/src/storage/implementation/db.ts Added methods for checkpoint notifications and creation of a capped collection for events.
Other files Various test and migration files updated to use the new batching and notification mechanisms.
Comments suppressed due to low confidence (2)

modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts:60

  • Consider using a more explicit boolean check for the existence of 'sync_rules_id'. For example, replace the condition with 'if (!('sync_rules_id' in filters))' for improved clarity.
if (false == 'sync_rules_id' in filters) {

modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts:65

  • Consider replacing 'if (false == 'heads' in filters)' with 'if (!('heads' in filters))' to clearly express the intended check.
if (false == 'heads' in filters) {

@rkistner rkistner marked this pull request as ready for review June 17, 2025 11:42
Copy link
Collaborator

@stevensJourney stevensJourney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here looks good to me.

@rkistner rkistner merged commit d235f7b into main Jun 17, 2025
21 checks passed
@rkistner rkistner deleted the write-checkpoint-polling branch June 17, 2025 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants