-
Notifications
You must be signed in to change notification settings - Fork 21
[Postgres + MongoDB] Resumable replication #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
fd8ade8
to
3cb3248
Compare
f1ff6db
to
951c951
Compare
useBigInt64: true, | ||
// We cannot use promoteBuffers: true, since that also converst UUID to Buffer | ||
// Instead, we need to handle bson.Binary when reading data | ||
promoteBuffers: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not actually change anything - just a note to not attempt to change that in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR extends initial snapshot replication to be resumable for both Postgres (chunking by primary key) and MongoDB (chunking by _id
), improves replication logging, and adds necessary storage schema migrations and tests for resume tokens and chunked snapshots.
- Added custom logger prefixes and more consistent error messages in MySQL and MongoDB replication jobs.
- Introduced
ChunkedSnapshotQuery
and storage metadata (snapshot_status
/snapshot_lsn
) for resumable snapshots. - Refactored tests to use a unified
describeWithStorage
helper, and added end-to-end slow tests for resuming and chunked snapshot behavior.
Reviewed Changes
Copilot reviewed 57 out of 57 changed files in this pull request and generated no comments.
File | Description |
---|---|
modules/module-mongodb/test/src/util.ts | describeWithStorage helper introduced (calls undefined factory) |
modules/module-mongodb/test/src/change_stream_utils.ts | getBucketData now loops chunks but drops limit support |
modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts | Typo in comment |
modules/module-mongodb/test/src/resume_token.test.ts | Invalid assertion usage (.true instead of .toBe(true) ) |
Comments suppressed due to low confidence (4)
modules/module-mongodb/test/src/util.ts:28
- The helper calls
INITIALIZED_MONGO_STORAGE_FACTORY
for Mongo storage, but that identifier isn't defined in this file. Import or define the Mongo storage factory (e.g.INITIALIZED_MONGO_STORAGE_FACTORY
) or rename the variable to the correct factory.
fn(INITIALIZED_MONGO_STORAGE_FACTORY);
modules/module-mongodb/test/src/change_stream_utils.ts:161
getBucketData
no longer passes through theoptions.limit
oroptions.chunkLimitBytes
parameters when callinggetBucketDataBatch
. Consider forwarding those options or documenting why they're intentionally omitted.
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts:18
- Typo in comment: replace
us
withuse
.
// We us a custom formatter to process the prefix
modules/module-mongodb/test/src/resume_token.test.ts:48
- Invalid Vitest assertion: use
.toBe(true)
or.toBeTruthy()
instead of.true
.
).true;
Problem Statement
When updating sync rules, we perform initial replication from the source database with the new sync rules. This involves doing an initial snapshot, then streaming incremental changes.
For large source databases, initial replication can take a long time. Historically we did not implement resuming for the initial snapshot queries - only for the initial replication stream. So if the replication process/container crashed/restarted in that process, we had to restart the snapshot from scratch.
We have already implemented partial mitigations for Postgres source databases:
The fix
This implements the same mitigations for MongoDB source databases, and additionally attempts to chunk initial snapshot queries using the primary key, allowing us to fully resume initial replication where we left off. This is basically repeated queries of the form
SELECT * from <table> WHERE id > :lastId LIMIT 10000
, instead of just using a singleSELECT * FROM <table>
query.For Postgres, this is limited to tables with a single primary key column of specific types (text/varchar/uuid/int2/int4/int8). We can add more supported types over time, and potentially support compound primary keys, if we do proper testing. However, these should already cover a large percentage of replicated tables. Tables not covered from this would restart replication when interrupted.
For MongoDB, we support any collection.
Edge cases
Since the snapshot query is no longer performed at a single point-in-time, there are some edge cases we need to consider, when rows are added, updated or removed while snapshotting. Most of these are handled with the proper consistency purely by resuming streaming replication after the snapshot. However, Postgres has a particular test case we need to be careful with:
This means the row is not covered by the snapshot query. It is still covered by streaming replication, but the change event may exclude TOAST values.
To handle this edge case, we detect rows with missing TOAST columns, and re-replicate those.
Limitations
Some notable limitations currently:
Progress inside a table is not implemented for Postgres storage yet.Resuming MongoDB snapshots require storing the snapshot LSN, which isn't implemented for Postgres storage yet.Other changes
Logging
This improves replication logging a bit, by:
In the future we can use this to expose snapshot progress via the diagnostics API, but for now logs can be used to see that.
MongoDB snapshot resume token
Previously, we stored the
clusterTime
when starting a MongoDB snapshot, then resumed streaming from that point afterwards. Now, this is replaced by an actual resume token. This allows for better detection of replication issues after the initial snapshot, such as the oplog window being too small, or switching source databases.Migrations
For Postgres storage, this has a new migration for the additional snapshot metadata. No migration is needed for MongoDB storage.