From 8932c5aa43de0336c93996e6facbd7e9292495cd Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 11:51:07 +0200 Subject: [PATCH 01/15] Add checkpoint_events capped collection. --- .../1749720702136-checkpoint-events.ts | 49 +++++++++++++++++++ .../src/storage/MongoBucketStorage.ts | 4 ++ .../implementation/MongoBucketBatch.ts | 5 ++ .../implementation/MongoSyncBucketStorage.ts | 3 ++ .../src/storage/implementation/db.ts | 12 +++++ .../src/storage/implementation/models.ts | 4 ++ 6 files changed, 77 insertions(+) create mode 100644 modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts new file mode 100644 index 00000000..bc5e55eb --- /dev/null +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -0,0 +1,49 @@ +import { migrations } from '@powersync/service-core'; +import * as storage from '../../../storage/storage-index.js'; +import { MongoStorageConfig } from '../../../types/types.js'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + // We cover the case where the replication process was started before running this migration. + const existingCollections = await db.db + .listCollections({ name: 'checkpoint_events' }, { nameOnly: false }) + .toArray(); + const collection = existingCollections[0]; + if (collection != null) { + if (!collection.options?.capped) { + // Collection was auto-created but not capped, so we need to drop it + await db.db.dropCollection('checkpoint_events'); + } else { + // Collection previously created somehow - ignore + return; + } + } + + await db.db.createCollection('checkpoint_events', { + capped: true, + size: 10 * 1024, // 10 KB + max: 10 // 10 documents + }); + } finally { + await db.client.close(); + } +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.db.dropCollection('checkpoint_events'); + } finally { + await db.client.close(); + } +}; diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 7d4f9217..c8562853 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -119,6 +119,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ @@ -141,6 +142,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } else if (next != null && active?.id == sync_rules_group_id) { // Already have next sync rules, but need to stop replicating the active one. @@ -155,6 +157,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } } @@ -216,6 +219,7 @@ export class MongoBucketStorage last_keepalive_ts: null }; await this.db.sync_rules.insertOne(doc); + await this.db.notifyCheckpoint(); rules = new MongoPersistedSyncRulesContent(this.db, doc); if (options.lock) { const lock = await rules.lock(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index b72f9761..58bcd4c4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -602,6 +602,8 @@ export class MongoBucketBatch { session } ); }); + // Must be _after_ the transaction. + await this.db.notifyCheckpoint(); } async [Symbol.asyncDispose]() { @@ -648,6 +650,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); // Cannot create a checkpoint yet - return false return false; @@ -681,6 +684,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); this.persisted_op = null; this.last_checkpoint_lsn = lsn; return true; @@ -717,6 +721,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); this.last_checkpoint_lsn = lsn; return true; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 354b4aab..0a7184ce 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -510,6 +510,7 @@ export class MongoSyncBucketStorage } } ); + await this.db.notifyCheckpoint(); } async getStatus(): Promise { @@ -640,6 +641,7 @@ export class MongoSyncBucketStorage }, { session } ); + await this.db.notifyCheckpoint(); } }); }); @@ -657,6 +659,7 @@ export class MongoSyncBucketStorage } } ); + await this.db.notifyCheckpoint(); } async compact(options?: storage.CompactOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 00ccd544..af44adf7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -7,6 +7,7 @@ import { BucketDataDocument, BucketParameterDocument, BucketStateDocument, + CheckpointEventDocument, CurrentDataDocument, CustomWriteCheckpointDocument, IdSequenceDocument, @@ -35,6 +36,7 @@ export class PowerSyncMongo { readonly instance: mongo.Collection; readonly locks: mongo.Collection; readonly bucket_state: mongo.Collection; + readonly checkpoint_events: mongo.Collection; readonly client: mongo.MongoClient; readonly db: mongo.Db; @@ -58,6 +60,7 @@ export class PowerSyncMongo { this.instance = db.collection('instance'); this.locks = this.db.collection('locks'); this.bucket_state = this.db.collection('bucket_state'); + this.checkpoint_events = this.db.collection('checkpoint_events'); } /** @@ -85,6 +88,15 @@ export class PowerSyncMongo { async drop() { await this.db.dropDatabase(); } + + /** + * Call this after every checkpoint or sync rules status update. Rather call too often than too rarely. + * + * This is used in a similar way to the Postgres NOTIFY functionality. + */ + async notifyCheckpoint() { + await this.checkpoint_events.insertOne({} as any, { forceServerObjectId: true }); + } } export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index de36aaba..1335bc2e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -159,6 +159,10 @@ export interface SyncRuleDocument { content: string; } +export interface CheckpointEventDocument { + _id: bson.ObjectId; +} + export type SyncRuleCheckpointState = Pick< SyncRuleDocument, 'last_checkpoint' | 'last_checkpoint_lsn' | '_id' | 'state' From 12da26967aedaa7e94f9775ccdc586b6e9a3ccf3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 12:18:56 +0200 Subject: [PATCH 02/15] Use a tailable cursor instead of change stream for checkpoint events. --- .../implementation/MongoBucketBatch.ts | 3 +- .../implementation/MongoSyncBucketStorage.ts | 161 ++++++------------ 2 files changed, 50 insertions(+), 114 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 58bcd4c4..c8d41cb9 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -601,9 +601,8 @@ export class MongoBucketBatch }, { session } ); + // We don't notify checkpoint here - we don't make any checkpoint updates directly }); - // Must be _after_ the transaction. - await this.db.notifyCheckpoint(); } async [Symbol.asyncDispose]() { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 0a7184ce..f165cfbe 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -677,21 +677,27 @@ export class MongoSyncBucketStorage * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { - // Use this form instead of (doc: SyncRuleCheckpointState | null = null), - // otherwise we get weird "doc: never" issues. - let doc = null as SyncRuleCheckpointState | null; - let clusterTime = null as mongo.Timestamp | null; - const syncRulesId = this.group_id; + const stream = this.checkpointChangesStream(signal); - await this.db.client.withSession(async (session) => { - doc = await this.db.sync_rules.findOne( + if (signal.aborted) { + return; + } + + // We only watch changes to the active sync rules. + // If it changes to inactive, we abort and restart with the new sync rules. + let lastOp: storage.ReplicationCheckpoint | null = null; + + for await (const _ of stream) { + if (signal.aborted) { + break; + } + + const doc = await this.db.sync_rules.findOne( { - _id: syncRulesId, + _id: this.group_id, state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { - session, - sort: { _id: -1 }, limit: 1, projection: { _id: 1, @@ -701,70 +707,17 @@ export class MongoSyncBucketStorage } } ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new ServiceError(ErrorCode.PSYNC_S2401, 'Could not get clusterTime'); - } - if (signal.aborted) { - return; - } - - if (doc == null) { - // Sync rules not present or not active. - // Abort the connections - clients will have to retry later. - throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); - } - - yield this.makeActiveCheckpoint(doc); - - // We only watch changes to the active sync rules. - // If it changes to inactive, we abort and restart with the new sync rules. - - const pipeline = this.getChangeStreamPipeline(); - - const stream = this.db.sync_rules.watch(pipeline, { - // Start at the cluster time where we got the initial doc, to make sure - // we don't skip any updates. - // This may result in the first operation being a duplicate, but we filter - // it out anyway. - startAtOperationTime: clusterTime - }); - - signal.addEventListener( - 'abort', - () => { - stream.close(); - }, - { once: true } - ); - - let lastOp: storage.ReplicationCheckpoint | null = null; - let lastDoc: SyncRuleCheckpointState | null = doc; - - for await (const update of stream.stream()) { - if (signal.aborted) { - break; - } - if (update.operationType != 'insert' && update.operationType != 'update' && update.operationType != 'replace') { - continue; - } - - const doc = await this.getOperationDoc(lastDoc, update as lib_mongo.mongo.ChangeStreamDocument); if (doc == null) { - // Irrelevant update - continue; - } - if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { + // Sync rules not present or not active. + // Abort the connections - clients will have to retry later. + throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); + } else if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. // We do a soft close of the stream here - no error break; } - lastDoc = doc; - const op = this.makeActiveCheckpoint(doc); // Check for LSN / checkpoint changes - ignore other metadata changes if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { @@ -864,56 +817,40 @@ export class MongoSyncBucketStorage } } - private async getOperationDoc( - lastDoc: SyncRuleCheckpointState, - update: lib_mongo.mongo.ChangeStreamDocument - ): Promise { - if (update.operationType == 'insert' || update.operationType == 'replace') { - return update.fullDocument; - } else if (update.operationType == 'update') { - const updatedFields = update.updateDescription.updatedFields ?? {}; - if (lastDoc._id != update.documentKey._id) { - throw new ServiceAssertionError(`Sync rules id mismatch: ${lastDoc._id} != ${update.documentKey._id}`); - } - - const mergedDoc: SyncRuleCheckpointState = { - _id: lastDoc._id, - last_checkpoint: updatedFields.last_checkpoint ?? lastDoc.last_checkpoint, - last_checkpoint_lsn: updatedFields.last_checkpoint_lsn ?? lastDoc.last_checkpoint_lsn, - state: updatedFields.state ?? lastDoc.state - }; - - return mergedDoc; - } else { - // Unknown event type - return null; + private async *checkpointChangesStream(signal: AbortSignal): AsyncGenerator { + if (signal.aborted) { + return; } - } + const cursor = this.db.checkpoint_events.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 10_000 }); + signal.addEventListener('abort', () => { + cursor.close().catch(() => {}); + }); - private getChangeStreamPipeline() { - const syncRulesId = this.group_id; - const pipeline: mongo.Document[] = [ - { - $match: { - 'documentKey._id': syncRulesId, - operationType: { $in: ['insert', 'update', 'replace'] } + // Yield once on start, regardless of whether there are documents in the cursor. + // This is to ensure that the first iteration of the generator yields immediately. + yield; + + try { + while (!signal.aborted) { + const doc = await cursor.tryNext(); + if (cursor.closed) { + return; } - }, - { - $project: { - operationType: 1, - 'documentKey._id': 1, - 'updateDescription.updatedFields.state': 1, - 'updateDescription.updatedFields.last_checkpoint': 1, - 'updateDescription.updatedFields.last_checkpoint_lsn': 1, - 'fullDocument._id': 1, - 'fullDocument.state': 1, - 'fullDocument.last_checkpoint': 1, - 'fullDocument.last_checkpoint_lsn': 1 + // Skip buffered documents, if any. We don't care about the contents, + // we only want to know when new documents are inserted. + cursor.readBufferedDocuments(); + if (doc != null) { + yield; } } - ]; - return pipeline; + } catch (e) { + if (signal.aborted) { + return; + } + throw e; + } finally { + await cursor.close(); + } } private async getDataBucketChanges( From 9b7d02d3b3802c94e9da1620643eb8f813c78a71 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 12:30:09 +0200 Subject: [PATCH 03/15] Handle CappedPositionLost. --- .../1749720702136-checkpoint-events.ts | 8 +++++-- .../implementation/MongoSyncBucketStorage.ts | 23 +++++++++++++++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts index bc5e55eb..4a320363 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -26,8 +26,12 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { await db.db.createCollection('checkpoint_events', { capped: true, - size: 10 * 1024, // 10 KB - max: 10 // 10 documents + // We want a small size, since opening a tailable cursor scans this entire collection. + // On the other hand, if we fill this up faster than a process can read it, it will + // invalidate the cursor. We do handle cursor invalidation events, but don't want + // that to happen too often. + size: 50 * 1024, // size in bytes + max: 50 // max number of documents }); } finally { await db.client.close(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index f165cfbe..08c9a37c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -34,6 +34,7 @@ import { BucketDataDocument, BucketDataKey, BucketStateDocument, + CheckpointEventDocument, SourceKey, SourceTableDocument, SyncRuleCheckpointState, @@ -821,7 +822,16 @@ export class MongoSyncBucketStorage if (signal.aborted) { return; } - const cursor = this.db.checkpoint_events.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 10_000 }); + + const query = () => { + return this.db.checkpoint_events.find( + {}, + { tailable: true, awaitData: true, maxAwaitTimeMS: 10_000, batchSize: 1000 } + ); + }; + + let cursor = query(); + signal.addEventListener('abort', () => { cursor.close().catch(() => {}); }); @@ -832,7 +842,16 @@ export class MongoSyncBucketStorage try { while (!signal.aborted) { - const doc = await cursor.tryNext(); + const doc = await cursor.tryNext().catch((e) => { + if (lib_mongo.isMongoServerError(e) && e.codeName === 'CappedPositionLost') { + // Cursor position lost, potentially due to a high rate of notifications + cursor = query(); + // Treat as an event found, before querying the new cursor again + return {}; + } else { + return Promise.reject(e); + } + }); if (cursor.closed) { return; } From 99de7250e6d884e6b04a310a3cb4daadb0f78d18 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 12:39:06 +0200 Subject: [PATCH 04/15] Create checkpoint_events in tests. --- .../1749720702136-checkpoint-events.ts | 25 +--------------- .../implementation/MongoSyncBucketStorage.ts | 6 ++++ .../MongoTestStorageFactoryGenerator.ts | 3 ++ .../src/storage/implementation/db.ts | 30 +++++++++++++++++++ 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts index 4a320363..f2cf4a23 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -9,30 +9,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); try { - // We cover the case where the replication process was started before running this migration. - const existingCollections = await db.db - .listCollections({ name: 'checkpoint_events' }, { nameOnly: false }) - .toArray(); - const collection = existingCollections[0]; - if (collection != null) { - if (!collection.options?.capped) { - // Collection was auto-created but not capped, so we need to drop it - await db.db.dropCollection('checkpoint_events'); - } else { - // Collection previously created somehow - ignore - return; - } - } - - await db.db.createCollection('checkpoint_events', { - capped: true, - // We want a small size, since opening a tailable cursor scans this entire collection. - // On the other hand, if we fill this up faster than a process can read it, it will - // invalidate the cursor. We do handle cursor invalidation events, but don't want - // that to happen too often. - size: 50 * 1024, // size in bytes - max: 50 // max number of documents - }); + await db.createCheckpointEventsCollection(); } finally { await db.client.close(); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 08c9a37c..c59531a2 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -818,6 +818,12 @@ export class MongoSyncBucketStorage } } + /** + * This watches the checkpoint_events capped collection for new documents inserted, + * and yields whenever one or more documents are inserted. + * + * The actual checkpoint must be queried on the sync_rules collection after this. + */ private async *checkpointChangesStream(signal: AbortSignal): AsyncGenerator { if (signal.aborted) { return; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts index 2fa11dfc..a7457b43 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts @@ -16,6 +16,9 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag await db.db.createCollection('bucket_parameters'); } + // Full migrations are not currently run for tests, so we manually create this + await db.createCheckpointEventsCollection(); + if (!options?.doNotClear) { await db.clear(); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index af44adf7..694da1b7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -97,6 +97,36 @@ export class PowerSyncMongo { async notifyCheckpoint() { await this.checkpoint_events.insertOne({} as any, { forceServerObjectId: true }); } + + /** + * Only use in migrations and tests. + */ + async createCheckpointEventsCollection() { + // We cover the case where the replication process was started before running this migration. + const existingCollections = await this.db + .listCollections({ name: 'checkpoint_events' }, { nameOnly: false }) + .toArray(); + const collection = existingCollections[0]; + if (collection != null) { + if (!collection.options?.capped) { + // Collection was auto-created but not capped, so we need to drop it + await this.db.dropCollection('checkpoint_events'); + } else { + // Collection previously created somehow - ignore + return; + } + } + + await this.db.createCollection('checkpoint_events', { + capped: true, + // We want a small size, since opening a tailable cursor scans this entire collection. + // On the other hand, if we fill this up faster than a process can read it, it will + // invalidate the cursor. We do handle cursor invalidation events, but don't want + // that to happen too often. + size: 50 * 1024, // size in bytes + max: 50 // max number of documents + }); + } } export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) { From ea02d8f173dfe33bdf529b885247ff8f815c3ddb Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 14:59:18 +0200 Subject: [PATCH 05/15] Incrementally lookup changed write checkpoints. --- .../1749720702136-checkpoint-events.ts | 10 + .../implementation/MongoBucketBatch.ts | 17 ++ .../implementation/MongoSyncBucketStorage.ts | 177 +++++++++++------- .../implementation/MongoWriteCheckpointAPI.ts | 3 +- .../src/storage/implementation/models.ts | 8 +- .../src/storage/SyncRulesBucketStorage.ts | 4 +- .../src/streams/BroadcastIterable.ts | 2 +- 7 files changed, 145 insertions(+), 76 deletions(-) diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts index f2cf4a23..208d4a6d 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -10,6 +10,13 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { try { await db.createCheckpointEventsCollection(); + + await db.write_checkpoints.createIndex( + { + processed_at_lsn: 1 + }, + { name: 'processed_at_lsn' } + ); } finally { await db.client.close(); } @@ -23,6 +30,9 @@ export const down: migrations.PowerSyncMigrationFunction = async (context) => { const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); try { + if (await db.write_checkpoints.indexExists('processed_at_lsn')) { + await db.write_checkpoints.dropIndex('processed_at_lsn'); + } await db.db.dropCollection('checkpoint_events'); } finally { await db.client.close(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index c8d41cb9..dc14c9c8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -674,6 +674,23 @@ export class MongoBucketBatch update.last_checkpoint = this.persisted_op; } + // Mark relevant write checkpoints as "processed". + // This makes it easier to identify write checkpoints that are "valid" in order. + await this.db.write_checkpoints.updateMany( + { + processed_at_lsn: null, + 'lsns.1': { $lte: lsn } + }, + { + $set: { + processed_at_lsn: lsn + } + }, + { + session: this.session + } + ); + await this.db.sync_rules.updateOne( { _id: this.group_id diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index c59531a2..bd8156e7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -737,61 +737,32 @@ export class MongoSyncBucketStorage * User-specific watch on the latest checkpoint and/or write checkpoint. */ async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { - const { signal } = options; - let lastCheckpoint: utils.InternalOpId | null = null; - let lastWriteCheckpoint: bigint | null = null; - let lastWriteCheckpointDoc: WriteCheckpointResult | null = null; - let nextWriteCheckpoint: bigint | null = null; - let lastCheckpointEvent: ReplicationCheckpoint | null = null; - let receivedWriteCheckpoint = false; - - const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({ - user_id: options.user_id, - signal, - sync_rules_id: this.group_id - }); - const iter = mergeAsyncIterables( - [this.sharedIter, writeCheckpointIter], - signal - ); + let lastCheckpoint: ReplicationCheckpoint | null = null; - for await (const event of iter) { - if ('checkpoint' in event) { - lastCheckpointEvent = event; - } else { - lastWriteCheckpointDoc = event; - receivedWriteCheckpoint = true; - } - - if (lastCheckpointEvent == null || !receivedWriteCheckpoint) { - // We need to wait until we received at least on checkpoint, and one write checkpoint. - continue; - } + const iter = this.sharedIter[Symbol.asyncIterator](options.signal); + let writeCheckpoint: bigint | null = null; + for await (const nextCheckpoint of iter) { // lsn changes are not important by itself. // What is important is: // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - const lsn = lastCheckpointEvent?.lsn; + if (nextCheckpoint.lsn != null) { + writeCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + sync_rules_id: this.group_id, + user_id: options.user_id, + heads: { + '1': nextCheckpoint.lsn + } + }); + } if ( - lastWriteCheckpointDoc != null && - (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn)) + lastCheckpoint != null && + lastCheckpoint.checkpoint == nextCheckpoint.checkpoint && + lastCheckpoint.lsn == nextCheckpoint.lsn ) { - const writeCheckpoint = lastWriteCheckpointDoc.id; - if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) { - nextWriteCheckpoint = writeCheckpoint; - } - // We used the doc - clear it - lastWriteCheckpointDoc = null; - } - - const { checkpoint } = lastCheckpointEvent; - - const currentWriteCheckpoint = nextWriteCheckpoint; - - if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) { // No change - wait for next one // In some cases, many LSNs may be produced in a short time. // Add a delay to throttle the write checkpoint lookup a bit. @@ -799,22 +770,45 @@ export class MongoSyncBucketStorage continue; } - const updates: CheckpointChanges = - lastCheckpoint == null - ? CHECKPOINT_INVALIDATE_ALL - : await this.getCheckpointChanges({ - lastCheckpoint: lastCheckpoint, - nextCheckpoint: checkpoint - }); - - lastWriteCheckpoint = currentWriteCheckpoint; - lastCheckpoint = checkpoint; - - yield { - base: lastCheckpointEvent, - writeCheckpoint: currentWriteCheckpoint, - update: updates - }; + if (lastCheckpoint == null) { + yield { + base: nextCheckpoint, + writeCheckpoint, + update: CHECKPOINT_INVALIDATE_ALL + }; + } else { + const updates = await this.getCheckpointChanges({ + lastCheckpoint, + nextCheckpoint + }); + + let updatedWriteCheckpoint = updates.updatedWriteCheckpoints.get(options.user_id) ?? null; + if (updates.invalidateWriteCheckpoints) { + updatedWriteCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + sync_rules_id: this.group_id, + user_id: options.user_id, + heads: { + '1': nextCheckpoint.lsn! + } + }); + } + if (updatedWriteCheckpoint != null && (writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint)) { + writeCheckpoint = updatedWriteCheckpoint; + } + + yield { + base: nextCheckpoint, + writeCheckpoint, + update: { + updatedDataBuckets: updates.updatedDataBuckets, + invalidateDataBuckets: updates.invalidateDataBuckets, + updatedParameterLookups: updates.updatedParameterLookups, + invalidateParameterBuckets: updates.invalidateParameterBuckets + } + }; + } + + lastCheckpoint = nextCheckpoint; } } @@ -887,7 +881,7 @@ export class MongoSyncBucketStorage { // We have an index on (_id.g, last_op). '_id.g': this.group_id, - last_op: { $gt: BigInt(options.lastCheckpoint) } + last_op: { $gt: options.lastCheckpoint.checkpoint } }, { projection: { @@ -916,7 +910,7 @@ export class MongoSyncBucketStorage const parameterUpdates = await this.db.bucket_parameters .find( { - _id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }, + _id: { $gt: options.lastCheckpoint.checkpoint, $lte: options.nextCheckpoint.checkpoint }, 'key.g': this.group_id }, { @@ -939,12 +933,45 @@ export class MongoSyncBucketStorage }; } + private async getWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + const limit = 1000; + const changes = await this.db.write_checkpoints + .find( + { + processed_at_lsn: { $gt: options.lastCheckpoint.lsn, $lte: options.nextCheckpoint.lsn } + }, + { + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + const invalidate = changes.length > limit; + + const updatedWriteCheckpoints = new Map(); + if (!invalidate) { + for (let c of changes) { + updatedWriteCheckpoints.set(c.user_id, c.client_id); + } + } + + return { + invalidateWriteCheckpoints: invalidate, + updatedWriteCheckpoints + }; + } + // If we processed all connections together for each checkpoint, we could do a single lookup for all connections. // In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup. // TODO (later): // We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do // more efficient lookups in some cases. - private checkpointChangesCache = new LRUCache({ + private checkpointChangesCache = new LRUCache< + string, + InternalCheckpointChanges, + { options: GetCheckpointChangesOptions } + >({ // Limit to 50 cache entries, or 10MB, whichever comes first. // Some rough calculations: // If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have @@ -952,31 +979,39 @@ export class MongoSyncBucketStorage // That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys // would likely be clustered around a few values, rather than spread over all 400 potential values. max: 50, - maxSize: 10 * 1024 * 1024, - sizeCalculation: (value: CheckpointChanges) => { + maxSize: 12 * 1024 * 1024, + sizeCalculation: (value: InternalCheckpointChanges) => { // Estimate of memory usage const paramSize = [...value.updatedParameterLookups].reduce((a, b) => a + b.length, 0); const bucketSize = [...value.updatedDataBuckets].reduce((a, b) => a + b.length, 0); - return 100 + paramSize + bucketSize; + const writeCheckpointSize = value.updatedWriteCheckpoints.size * 30; // estiamte for user_id + bigint + return 100 + paramSize + bucketSize + writeCheckpointSize; }, fetchMethod: async (_key, _staleValue, options) => { return this.getCheckpointChangesInternal(options.context.options); } }); - async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise { - const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`; + async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise { + const key = `${options.lastCheckpoint.checkpoint}_${options.lastCheckpoint.lsn}__${options.nextCheckpoint.checkpoint}_${options.nextCheckpoint.lsn}`; const result = await this.checkpointChangesCache.fetch(key, { context: { options } }); return result!; } - private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { + private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { const dataUpdates = await this.getDataBucketChanges(options); const parameterUpdates = await this.getParameterBucketChanges(options); + const writeCheckpointUpdates = await this.getWriteCheckpointChanges(options); return { ...dataUpdates, - ...parameterUpdates + ...parameterUpdates, + ...writeCheckpointUpdates }; } } + +interface InternalCheckpointChanges extends CheckpointChanges { + updatedWriteCheckpoints: Map; + invalidateWriteCheckpoints: boolean; +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 785898cf..2cff6e35 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -53,7 +53,8 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { }, { $set: { - lsns + lsns, + processed_at_lsn: null }, $inc: { client_id: 1n diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 1335bc2e..45173027 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -1,4 +1,4 @@ -import { storage } from '@powersync/service-core'; +import { InternalOpId, storage } from '@powersync/service-core'; import { SqliteJsonValue } from '@powersync/service-sync-rules'; import * as bson from 'bson'; @@ -180,6 +180,12 @@ export interface WriteCheckpointDocument { user_id: string; lsns: Record; client_id: bigint; + /** + * This is set to the checkpoint lsn when the checkpoint lsn >= this lsn. + * This is used to make it easier to determine what write checkpoints have been processed + * between two checkpoints. + */ + processed_at_lsn: string | null; } export interface InstanceDocument { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 1d85a3a0..25f0d33a 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -256,8 +256,8 @@ export interface StorageCheckpointUpdate extends WriteCheckpoint { } export interface GetCheckpointChangesOptions { - lastCheckpoint: util.InternalOpId; - nextCheckpoint: util.InternalOpId; + lastCheckpoint: ReplicationCheckpoint; + nextCheckpoint: ReplicationCheckpoint; } export interface CheckpointChanges { diff --git a/packages/service-core/src/streams/BroadcastIterable.ts b/packages/service-core/src/streams/BroadcastIterable.ts index 2922c709..b48e1b8b 100644 --- a/packages/service-core/src/streams/BroadcastIterable.ts +++ b/packages/service-core/src/streams/BroadcastIterable.ts @@ -97,7 +97,7 @@ export class BroadcastIterable implements AsyncIterable { } } - async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { + async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterableIterator { const sink = new LastValueSink(this.last); this.addSink(sink); try { From 58c7b61e777d474afc4da5646228a166631fbcb3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 15:01:50 +0200 Subject: [PATCH 06/15] Remove watch on managed write checkpoints. --- .../implementation/MongoWriteCheckpointAPI.ts | 130 +----------------- .../checkpoints/PostgresWriteCheckpointAPI.ts | 7 - .../src/storage/WriteCheckpointAPI.ts | 8 -- 3 files changed, 7 insertions(+), 138 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 2cff6e35..96dbb92f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,12 +1,6 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { - Demultiplexer, - DemultiplexerValue, - storage, - WatchUserWriteCheckpointOptions, - WriteCheckpointResult -} from '@powersync/service-core'; +import { Demultiplexer, DemultiplexerValue, storage, WriteCheckpointResult } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; @@ -82,122 +76,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { - switch (this.writeCheckpointMode) { - case storage.WriteCheckpointMode.CUSTOM: - return this.watchCustomWriteCheckpoint(options); - case storage.WriteCheckpointMode.MANAGED: - return this.watchManagedWriteCheckpoint(options); - default: - throw new Error('Invalid write checkpoint mode'); - } - } - - private sharedManagedIter = new Demultiplexer((signal) => { - const clusterTimePromise = this.getClusterTime(); - - return { - iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal), - getFirstValue: async (user_id: string) => { - // Potential race conditions we cater for: - - // Case 1: changestream is behind. - // We get a doc now, then the same or older doc again later. - // No problem! - - // Case 2: Query is behind. I.e. doc has been created, and emitted on the changestream, but the query doesn't see it yet. - // Not possible luckily, but can we make sure? - - // Case 3: changestream delays openeing. A doc is created after our query here, but before the changestream is opened. - // Awaiting clusterTimePromise should be sufficient here, but as a sanity check we also confirm that our query - // timestamp is > the startClusterTime. - - const changeStreamStart = await clusterTimePromise; - - let doc = null as WriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; - - await this.db.client.withSession(async (session) => { - doc = await this.db.write_checkpoints.findOne( - { - user_id: user_id - }, - { - session - } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); - } - - if (clusterTime.lessThan(changeStreamStart)) { - throw new framework.ServiceAssertionError( - 'clusterTime for write checkpoint is older than changestream start' - ); - } - - if (doc == null) { - return { - id: null, - lsn: null - }; - } - - return { - id: doc.client_id, - lsn: doc.lsns['1'] - }; - } - }; - }); - - private async *watchAllManagedWriteCheckpoints( - clusterTimePromise: Promise, - signal: AbortSignal - ): AsyncGenerator> { - const clusterTime = await clusterTimePromise; - - const stream = this.db.write_checkpoints.watch( - [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }], - { - fullDocument: 'updateLookup', - startAtOperationTime: clusterTime - } - ); - - signal.onabort = () => { - stream.close(); - }; - - if (signal.aborted) { - stream.close(); - return; - } - - for await (let event of stream) { - if (!('fullDocument' in event) || event.fullDocument == null) { - continue; - } - - const user_id = event.fullDocument.user_id; - yield { - key: user_id, - value: { - id: event.fullDocument.client_id, - lsn: event.fullDocument.lsns['1'] - } - }; - } - } - - watchManagedWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { - const stream = this.sharedManagedIter.subscribe(options.user_id, options.signal); - return this.orderedStream(stream); - } - private sharedCustomIter = new Demultiplexer((signal) => { const clusterTimePromise = this.getClusterTime(); @@ -380,3 +258,9 @@ export async function batchCreateCustomWriteCheckpoints( {} ); } + +interface WatchUserWriteCheckpointOptions { + user_id: string; + sync_rules_id: number; + signal: AbortSignal; +} diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index 913894f2..831bdfd5 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -58,13 +58,6 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return row!.write_checkpoint; } - watchUserWriteCheckpoint( - options: storage.WatchUserWriteCheckpointOptions - ): AsyncIterable { - // Not used for Postgres currently - throw new Error('Method not implemented.'); - } - async lastWriteCheckpoint(filters: storage.LastWriteCheckpointFilters): Promise { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index b9d14ce9..2a684158 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -75,12 +75,6 @@ export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; -export interface WatchUserWriteCheckpointOptions { - user_id: string; - sync_rules_id: number; - signal: AbortSignal; -} - export interface BaseWriteCheckpointAPI { readonly writeCheckpointMode: WriteCheckpointMode; setWriteCheckpointMode(mode: WriteCheckpointMode): void; @@ -104,8 +98,6 @@ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; - - watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable; } export const DEFAULT_WRITE_CHECKPOINT_MODE = WriteCheckpointMode.MANAGED; From dbd1517571e0a7edd3e8aa3a534d6cfcf7eae9c9 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 16:26:20 +0200 Subject: [PATCH 07/15] Refactor custom write checkpoint storage in MongoDB. --- .../1749720702136-checkpoint-events.ts | 10 +++++++ .../implementation/MongoBucketBatch.ts | 7 +++-- .../implementation/MongoSyncBucketStorage.ts | 6 ---- .../implementation/MongoWriteCheckpointAPI.ts | 27 ++++++++++-------- .../src/storage/implementation/models.ts | 7 +++++ .../src/storage/PostgresSyncRulesStorage.ts | 6 ---- .../checkpoints/PostgresWriteCheckpointAPI.ts | 7 +++-- .../src/tests/register-data-storage-tests.ts | 28 ++++++------------- .../src/storage/WriteCheckpointAPI.ts | 4 +-- 9 files changed, 54 insertions(+), 48 deletions(-) diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts index 208d4a6d..d2b400ab 100644 --- a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -17,6 +17,13 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => { }, { name: 'processed_at_lsn' } ); + + await db.custom_write_checkpoints.createIndex( + { + op_id: 1 + }, + { name: 'op_id' } + ); } finally { await db.client.close(); } @@ -33,6 +40,9 @@ export const down: migrations.PowerSyncMigrationFunction = async (context) => { if (await db.write_checkpoints.indexExists('processed_at_lsn')) { await db.write_checkpoints.dropIndex('processed_at_lsn'); } + if (await db.custom_write_checkpoints.indexExists('op_id')) { + await db.custom_write_checkpoints.dropIndex('op_id'); + } await db.db.dropCollection('checkpoint_events'); } finally { await db.client.close(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index dc14c9c8..fb22428f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -122,8 +122,6 @@ export class MongoBucketBatch result = r; } } - await batchCreateCustomWriteCheckpoints(this.db, this.write_checkpoint_batch); - this.write_checkpoint_batch = []; return result; } @@ -139,6 +137,11 @@ export class MongoBucketBatch await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => { resumeBatch = await this.replicateBatch(session, batch, opSeq, options); + if (this.write_checkpoint_batch.length > 0) { + await batchCreateCustomWriteCheckpoints(this.db, session, this.write_checkpoint_batch, opSeq.next()); + this.write_checkpoint_batch = []; + } + last_op = opSeq.last(); }); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index bd8156e7..6fee1281 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -83,12 +83,6 @@ export class MongoSyncBucketStorage this.writeCheckpointAPI.setWriteCheckpointMode(mode); } - batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( - checkpoints.map((checkpoint) => ({ ...checkpoint, sync_rules_id: this.group_id })) - ); - } - createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 96dbb92f..94cdd5d3 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,6 +1,12 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { Demultiplexer, DemultiplexerValue, storage, WriteCheckpointResult } from '@powersync/service-core'; +import { + Demultiplexer, + DemultiplexerValue, + InternalOpId, + storage, + WriteCheckpointResult +} from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; @@ -29,13 +35,9 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { this._mode = mode; } - async batchCreateCustomWriteCheckpoints(checkpoints: storage.CustomWriteCheckpointOptions[]): Promise { - return batchCreateCustomWriteCheckpoints(this.db, checkpoints); - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { - throw new framework.errors.ValidationError( + throw new framework.ServiceAssertionError( `Attempting to create a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` ); } @@ -63,12 +65,12 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: if (false == 'sync_rules_id' in filters) { - throw new framework.errors.ValidationError(`Sync rules ID is required for custom Write Checkpoint filtering`); + throw new framework.ServiceAssertionError(`Sync rules ID is required for custom Write Checkpoint filtering`); } return this.lastCustomWriteCheckpoint(filters); case storage.WriteCheckpointMode.MANAGED: if (false == 'heads' in filters) { - throw new framework.errors.ValidationError( + throw new framework.ServiceAssertionError( `Replication HEAD is required for managed Write Checkpoint filtering` ); } @@ -236,7 +238,9 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { export async function batchCreateCustomWriteCheckpoints( db: PowerSyncMongo, - checkpoints: storage.CustomWriteCheckpointOptions[] + session: mongo.ClientSession, + checkpoints: storage.CustomWriteCheckpointOptions[], + opId: InternalOpId ): Promise { if (checkpoints.length == 0) { return; @@ -249,13 +253,14 @@ export async function batchCreateCustomWriteCheckpoints( update: { $set: { checkpoint: checkpointOptions.checkpoint, - sync_rules_id: checkpointOptions.sync_rules_id + sync_rules_id: checkpointOptions.sync_rules_id, + op_id: opId } }, upsert: true } })), - {} + { session } ); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 45173027..50c29d32 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -173,6 +173,13 @@ export interface CustomWriteCheckpointDocument { user_id: string; checkpoint: bigint; sync_rules_id: number; + /** + * Unlike managed write checkpoints, custom write checkpoints are flushed together with + * normal ops. This means we can assign an op_id for ordering / correlating with read checkpoints. + * + * This is not unique - multiple write checkpoints can have the same op_id. + */ + op_id?: InternalOpId; } export interface WriteCheckpointDocument { diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index e785549e..d8732759 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -107,12 +107,6 @@ export class PostgresSyncRulesStorage return new PostgresCompactor(this.db, this.group_id, options).compact(); } - batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( - checkpoints.map((c) => ({ ...c, sync_rules_id: this.group_id })) - ); - } - lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { return this.writeCheckpointAPI.lastWriteCheckpoint({ ...filters, diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index 831bdfd5..562120a6 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -1,6 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as framework from '@powersync/lib-services-framework'; -import { storage, sync } from '@powersync/service-core'; +import { InternalOpId, storage, sync } from '@powersync/service-core'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { models } from '../../types/types.js'; @@ -26,7 +26,10 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { this._mode = mode; } - async batchCreateCustomWriteCheckpoints(checkpoints: storage.CustomWriteCheckpointOptions[]): Promise { + async batchCreateCustomWriteCheckpoints( + checkpoints: storage.CustomWriteCheckpointOptions[], + op_id: InternalOpId + ): Promise { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 850f37ed..bdb31bc1 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1769,14 +1769,11 @@ bucket_definitions: .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) [Symbol.asyncIterator](); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.addCustomWriteCheckpoint({ checkpoint: 5n, user_id: 'user1' - } - ]); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); await batch.keepalive('5/0'); }); @@ -1829,16 +1826,11 @@ bucket_definitions: } }); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ checkpoint: 6n, user_id: 'user1' - } - ]); - // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. - // This is what is effetively triggered with RouteAPI.createReplicationHead(). - // MongoDB storage doesn't explicitly need this anymore. - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); await batch.keepalive('6/0'); }); @@ -1855,13 +1847,11 @@ bucket_definitions: } }); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ checkpoint: 7n, user_id: 'user1' - } - ]); - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); await batch.keepalive('7/0'); }); diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index 2a684158..81084eed 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -1,3 +1,5 @@ +import { InternalOpId } from '../util/utils.js'; + export enum WriteCheckpointMode { /** * Raw mappings of `user_id` to `write_checkpoint`s should @@ -87,7 +89,6 @@ export interface BaseWriteCheckpointAPI { * sync rules id. */ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { - batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise; lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise; } @@ -96,7 +97,6 @@ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { * sync rules identifiers for custom write checkpoints. */ export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { - batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; } From 916b65359d4caf1a9b7bde1761b03e8b2db6b84f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 16:39:18 +0200 Subject: [PATCH 08/15] Support custom write checkpoints. --- .../implementation/MongoSyncBucketStorage.ts | 33 +--- .../implementation/MongoWriteCheckpointAPI.ts | 180 +++++++----------- .../checkpoints/PostgresWriteCheckpointAPI.ts | 3 +- .../src/tests/register-data-storage-tests.ts | 7 +- 4 files changed, 72 insertions(+), 151 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 6fee1281..a4a491b5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -57,7 +57,7 @@ export class MongoSyncBucketStorage }); private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; - private writeCheckpointAPI: storage.WriteCheckpointAPI; + private writeCheckpointAPI: MongoWriteCheckpointAPI; constructor( public readonly factory: MongoBucketStorage, @@ -927,35 +927,6 @@ export class MongoSyncBucketStorage }; } - private async getWriteCheckpointChanges(options: GetCheckpointChangesOptions) { - const limit = 1000; - const changes = await this.db.write_checkpoints - .find( - { - processed_at_lsn: { $gt: options.lastCheckpoint.lsn, $lte: options.nextCheckpoint.lsn } - }, - { - limit: limit + 1, - batchSize: limit + 1, - singleBatch: true - } - ) - .toArray(); - const invalidate = changes.length > limit; - - const updatedWriteCheckpoints = new Map(); - if (!invalidate) { - for (let c of changes) { - updatedWriteCheckpoints.set(c.user_id, c.client_id); - } - } - - return { - invalidateWriteCheckpoints: invalidate, - updatedWriteCheckpoints - }; - } - // If we processed all connections together for each checkpoint, we could do a single lookup for all connections. // In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup. // TODO (later): @@ -995,7 +966,7 @@ export class MongoSyncBucketStorage private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { const dataUpdates = await this.getDataBucketChanges(options); const parameterUpdates = await this.getParameterBucketChanges(options); - const writeCheckpointUpdates = await this.getWriteCheckpointChanges(options); + const writeCheckpointUpdates = await this.writeCheckpointAPI.getWriteCheckpointChanges(options); return { ...dataUpdates, diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 94cdd5d3..183f4810 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,14 +1,7 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { - Demultiplexer, - DemultiplexerValue, - InternalOpId, - storage, - WriteCheckpointResult -} from '@powersync/service-core'; +import { GetCheckpointChangesOptions, InternalOpId, storage } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; -import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -78,113 +71,13 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - private sharedCustomIter = new Demultiplexer((signal) => { - const clusterTimePromise = this.getClusterTime(); - - return { - iterator: this.watchAllCustomWriteCheckpoints(clusterTimePromise, signal), - getFirstValue: async (user_id: string) => { - // We cater for the same potential race conditions as for managed write checkpoints. - - const changeStreamStart = await clusterTimePromise; - - let doc = null as CustomWriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; - - await this.db.client.withSession(async (session) => { - doc = await this.db.custom_write_checkpoints.findOne( - { - user_id: user_id, - sync_rules_id: this.sync_rules_id - }, - { - session - } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); - } - - if (clusterTime.lessThan(changeStreamStart)) { - throw new framework.ServiceAssertionError( - 'clusterTime for write checkpoint is older than changestream start' - ); - } - - if (doc == null) { - // No write checkpoint, but we still need to return a result - return { - id: null, - lsn: null - }; - } - - return { - id: doc.checkpoint, - // custom write checkpoints are not tied to a LSN - lsn: null - }; - } - }; - }); - - private async *watchAllCustomWriteCheckpoints( - clusterTimePromise: Promise, - signal: AbortSignal - ): AsyncGenerator> { - const clusterTime = await clusterTimePromise; - - const stream = this.db.custom_write_checkpoints.watch( - [ - { - $match: { - 'fullDocument.sync_rules_id': this.sync_rules_id, - operationType: { $in: ['insert', 'update', 'replace'] } - } - } - ], - { - fullDocument: 'updateLookup', - startAtOperationTime: clusterTime - } - ); - - signal.onabort = () => { - stream.close(); - }; - - if (signal.aborted) { - stream.close(); - return; - } - - for await (let event of stream) { - if (!('fullDocument' in event) || event.fullDocument == null) { - continue; - } - - const user_id = event.fullDocument.user_id; - yield { - key: user_id, - value: { - id: event.fullDocument.checkpoint, - // Custom write checkpoints are not tied to a specific LSN - lsn: null - } - }; - } - } - - watchCustomWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { - if (options.sync_rules_id != this.sync_rules_id) { - throw new framework.ServiceAssertionError('sync_rules_id does not match'); + async getWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + switch (this.writeCheckpointMode) { + case storage.WriteCheckpointMode.CUSTOM: + return this.getCustomWriteCheckpointChanges(options); + case storage.WriteCheckpointMode.MANAGED: + return this.getManagedWriteCheckpointChanges(options); } - - const stream = this.sharedCustomIter.subscribe(options.user_id, options.signal); - return this.orderedStream(stream); } protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) { @@ -211,6 +104,65 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return lastWriteCheckpoint?.client_id ?? null; } + private async getManagedWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + const limit = 1000; + const changes = await this.db.write_checkpoints + .find( + { + processed_at_lsn: { $gt: options.lastCheckpoint.lsn, $lte: options.nextCheckpoint.lsn } + }, + { + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + const invalidate = changes.length > limit; + + const updatedWriteCheckpoints = new Map(); + if (!invalidate) { + for (let c of changes) { + updatedWriteCheckpoints.set(c.user_id, c.client_id); + } + } + + return { + invalidateWriteCheckpoints: invalidate, + updatedWriteCheckpoints + }; + } + + private async getCustomWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + const limit = 1000; + const changes = await this.db.custom_write_checkpoints + .find( + { + op_id: { $gt: options.lastCheckpoint.checkpoint, $lte: options.nextCheckpoint.checkpoint } + }, + { + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + console.log('getCustomWriteCheckpointChanges', options, changes); + const invalidate = changes.length > limit; + + const updatedWriteCheckpoints = new Map(); + if (!invalidate) { + for (let c of changes) { + updatedWriteCheckpoints.set(c.user_id, c.checkpoint); + } + } + + return { + invalidateWriteCheckpoints: invalidate, + updatedWriteCheckpoints + }; + } + private async getClusterTime(): Promise { const hello = await this.db.db.command({ hello: 1 }); // Note: This is not valid on sharded clusters. diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index 562120a6..aba98b9a 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -1,7 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as framework from '@powersync/lib-services-framework'; -import { InternalOpId, storage, sync } from '@powersync/service-core'; -import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { InternalOpId, storage } from '@powersync/service-core'; import { models } from '../../types/types.js'; export type PostgresCheckpointAPIOptions = { diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index bdb31bc1..e8ff90fd 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1774,6 +1774,7 @@ bucket_definitions: checkpoint: 5n, user_id: 'user1' }); + await batch.flush(); await batch.keepalive('5/0'); }); @@ -1782,7 +1783,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n, lsn: '5/0' }, writeCheckpoint: 5n @@ -1819,7 +1819,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n, lsn: '5/0' }, writeCheckpoint: null @@ -1831,6 +1830,7 @@ bucket_definitions: checkpoint: 6n, user_id: 'user1' }); + await batch.flush(); await batch.keepalive('6/0'); }); @@ -1839,7 +1839,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints // lsn: '6/0' }, @@ -1852,6 +1851,7 @@ bucket_definitions: checkpoint: 7n, user_id: 'user1' }); + await batch.flush(); await batch.keepalive('7/0'); }); @@ -1860,7 +1860,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints // lsn: '7/0' }, From bb720c2ccb6a6028a2afbbf185b9d27f29f9f1b1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 16:54:30 +0200 Subject: [PATCH 09/15] Remove Demultiplexer. --- .../service-core/src/streams/Demultiplexer.ts | 165 -------------- .../service-core/src/streams/streams-index.ts | 1 - .../test/src/demultiplexer.test.ts | 205 ------------------ 3 files changed, 371 deletions(-) delete mode 100644 packages/service-core/src/streams/Demultiplexer.ts delete mode 100644 packages/service-core/test/src/demultiplexer.test.ts diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts deleted file mode 100644 index 86ebcc40..00000000 --- a/packages/service-core/src/streams/Demultiplexer.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { AbortError } from 'ix/aborterror.js'; -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; -import { LastValueSink } from './LastValueSink.js'; - -export interface DemultiplexerValue { - /** - * The key used for demultiplexing, for example the user id. - */ - key: string; - /** - * The stream value. - */ - value: T; -} - -export interface DemultiplexerSource { - /** - * The async iterator providing a stream of values. - */ - iterator: AsyncIterable>; - - /** - * Fetches the first value for a given key. - * - * This is used to get an initial value for each subscription. - */ - getFirstValue(key: string): Promise; -} - -export type DemultiplexerSourceFactory = (signal: AbortSignal) => DemultiplexerSource; - -/** - * Takes a multiplexed stream (e.g. a changestream covering many individual users), - * and allows subscribing to individual streams. - * - * The source subscription is lazy: - * 1. We only start subscribing when there is a downstream subscriber. - * 2. When all downstream subscriptions have ended, we end the source subscription. - * - * For each subscriber, if backpressure builds up, we only keep the _last_ value. - */ -export class Demultiplexer { - private subscribers: Map>> | undefined = undefined; - private abortController: AbortController | undefined = undefined; - private currentSource: DemultiplexerSource | undefined = undefined; - - constructor(private source: DemultiplexerSourceFactory) {} - - private start(filter: string, sink: LastValueSink) { - const abortController = new AbortController(); - const listeners = new Map(); - listeners.set(filter, new Set([sink])); - - this.abortController = abortController; - this.subscribers = listeners; - - const source = this.source(abortController.signal); - this.currentSource = source; - this.loop(source, abortController, listeners); - return source; - } - - private async loop( - source: DemultiplexerSource, - abortController: AbortController, - sinks: Map>> - ) { - try { - for await (let doc of source.iterator) { - if (abortController.signal.aborted || sinks.size == 0) { - throw new AbortError(); - } - const key = doc.key; - const keySinks = sinks.get(key); - if (keySinks == null) { - continue; - } - - for (let sink of keySinks) { - sink.write(doc.value); - } - } - - // End of stream - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.end(); - } - } - } catch (e) { - // Just in case the error is not from the source - abortController.abort(); - - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.error(e); - } - } - } finally { - // Clear state, so that a new subscription may be started - if (this.subscribers === sinks) { - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - } - - private removeSink(key: string, sink: LastValueSink) { - const existing = this.subscribers?.get(key); - if (existing == null) { - return; - } - existing.delete(sink); - if (existing.size == 0) { - this.subscribers!.delete(key); - } - - if (this.subscribers?.size == 0) { - // This is not immediate - there may be a delay until it is fully stopped, - // depending on the underlying source. - this.abortController?.abort(); - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - - private addSink(key: string, sink: LastValueSink) { - if (this.currentSource == null) { - return this.start(key, sink); - } else { - const existing = this.subscribers!.get(key); - if (existing != null) { - existing.add(sink); - } else { - this.subscribers!.set(key, new Set([sink])); - } - return this.currentSource; - } - } - - /** - * Subscribe to a specific stream. - * - * @param key The key used for demultiplexing, e.g. user id. - * @param signal - */ - async *subscribe(key: string, signal: AbortSignal): AsyncIterable { - const sink = new LastValueSink(undefined); - // Important that we register the sink before calling getFirstValue(). - const source = this.addSink(key, sink); - try { - const firstValue = await source.getFirstValue(key); - yield firstValue; - yield* sink.withSignal(signal); - } finally { - this.removeSink(key, sink); - } - } - - get active() { - return this.subscribers != null; - } -} diff --git a/packages/service-core/src/streams/streams-index.ts b/packages/service-core/src/streams/streams-index.ts index 0d8eafa4..c25f752e 100644 --- a/packages/service-core/src/streams/streams-index.ts +++ b/packages/service-core/src/streams/streams-index.ts @@ -1,4 +1,3 @@ export * from './merge.js'; -export * from './Demultiplexer.js'; export * from './LastValueSink.js'; export * from './BroadcastIterable.js'; diff --git a/packages/service-core/test/src/demultiplexer.test.ts b/packages/service-core/test/src/demultiplexer.test.ts deleted file mode 100644 index 69ba9c87..00000000 --- a/packages/service-core/test/src/demultiplexer.test.ts +++ /dev/null @@ -1,205 +0,0 @@ -// Vitest Unit Tests -import { Demultiplexer, DemultiplexerSource, DemultiplexerSourceFactory, DemultiplexerValue } from '@/index.js'; -import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; -import { take } from 'ix/asynciterable/operators/take.js'; -import { toArray } from 'ix/asynciterable/toarray.js'; -import * as timers from 'node:timers/promises'; -import { describe, expect, it } from 'vitest'; - -describe('Demultiplexer', () => { - it('should start subscription lazily and provide first value', async () => { - const mockSource: DemultiplexerSourceFactory = (signal: AbortSignal) => { - const iterator = (async function* (): AsyncIterable> {})(); - return { - iterator, - getFirstValue: async (key: string) => `first-${key}` - }; - }; - - const demux = new Demultiplexer(mockSource); - const signal = new AbortController().signal; - - const iter = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const result = await iter.next(); - expect(result.value).toBe('first-user1'); - }); - - it('should handle multiple subscribers to the same key', async () => { - const iter = (async function* () { - yield { key: 'user1', value: 'value1' }; - yield { key: 'user1', value: 'value2' }; - })(); - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skipped - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - - expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should handle multiple subscribers to the same key (2)', async () => { - const p1 = Promise.withResolvers(); - const p2 = Promise.withResolvers(); - const p3 = Promise.withResolvers(); - - const iter = (async function* () { - await p1.promise; - yield { key: 'user1', value: 'value1' }; - await p2.promise; - yield { key: 'user1', value: 'value2' }; - await p3.promise; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skilled - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); - p1.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - expect(await iter2.next()).toEqual({ value: 'value1', done: false }); - p2.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value2', done: false }); - p3.resolve(); - - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should handle multiple subscribers to different keys', async () => { - const p1 = Promise.withResolvers(); - const p2 = Promise.withResolvers(); - const p3 = Promise.withResolvers(); - - const iter = (async function* () { - await p1.promise; - yield { key: 'user1', value: 'value1' }; - await p2.promise; - yield { key: 'user2', value: 'value2' }; - await p3.promise; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user2', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skilled - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: 'first-user2', done: false }); - p1.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - p2.resolve(); - - expect(await iter2.next()).toEqual({ value: 'value2', done: false }); - p3.resolve(); - - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should abort', async () => { - const iter = (async function* () { - yield { key: 'user1', value: 'value1' }; - yield { key: 'user1', value: 'value2' }; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const controller = new AbortController(); - - const iter1 = demux.subscribe('user1', controller.signal)[Symbol.asyncIterator](); - - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - controller.abort(); - - await expect(iter1.next()).rejects.toThrow('The operation has been aborted'); - }); - - it('should handle errors on multiple subscribers', async () => { - let sourceIndex = 0; - const sourceFn = async function* (signal: AbortSignal): AsyncIterable> { - // Test value out by 1000 means it may have used the wrong iteration of the source - const base = (sourceIndex += 1000); - const abortedPromise = new Promise((resolve) => { - signal.addEventListener('abort', resolve, { once: true }); - }); - for (let i = 0; !signal.aborted; i++) { - if (base + i == 1005) { - throw new Error('simulated failure'); - } - yield { key: 'u1', value: base + i }; - await Promise.race([abortedPromise, timers.setTimeout(1)]); - } - // Test value out by 100 means this wasn't reached - sourceIndex += 100; - }; - - const sourceFactory: DemultiplexerSourceFactory = (signal) => { - const source: DemultiplexerSource = { - iterator: sourceFn(signal), - getFirstValue: async (key: string) => -1 - }; - return source; - }; - const demux = new Demultiplexer(sourceFactory); - - const controller = new AbortController(); - - const delayed1 = delayEach(9)(demux.subscribe('u1', controller.signal)); - const delayed2 = delayEach(10)(demux.subscribe('u1', controller.signal)); - expect(demux.active).toBe(false); - const results1Promise = toArray(take(5)(delayed1)) as Promise; - const results2Promise = toArray(take(5)(delayed2)) as Promise; - - const [r1, r2] = await Promise.allSettled([results1Promise, results2Promise]); - - expect(r1).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); - expect(r2).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); - - expect(demux.active).toBe(false); - - // This starts a new source - const delayed3 = delayEach(10)(demux.subscribe('u1', controller.signal)); - const results3 = await toArray(take(6)(delayed3)); - expect(results3.length).toEqual(6); - expect(results3[0]).toEqual(-1); // Initial value - // There should be approximately 10ms between each value, but we allow for some slack - expect(results3[5]).toBeGreaterThan(2005); - expect(results3[5]).toBeLessThan(2200); - }); -}); From c97cefd8b173774bb907336f2f73ae61ea180503 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 12 Jun 2025 16:56:41 +0200 Subject: [PATCH 10/15] Further cleanup. --- .../implementation/MongoSyncBucketStorage.ts | 8 ++--- .../implementation/MongoWriteCheckpointAPI.ts | 31 ------------------- .../src/storage/WriteCheckpointAPI.ts | 22 ------------- 3 files changed, 2 insertions(+), 59 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index a4a491b5..86b3bcb5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -15,13 +15,11 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, - mergeAsyncIterables, ProtocolOpId, ReplicationCheckpoint, storage, utils, - WatchWriteCheckpointOptions, - WriteCheckpointResult + WatchWriteCheckpointOptions } from '@powersync/service-core'; import { JSONBig } from '@powersync/service-jsonbig'; import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; @@ -34,11 +32,9 @@ import { BucketDataDocument, BucketDataKey, BucketStateDocument, - CheckpointEventDocument, SourceKey, SourceTableDocument, - SyncRuleCheckpointState, - SyncRuleDocument + SyncRuleCheckpointState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 183f4810..4431cd99 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -147,7 +147,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } ) .toArray(); - console.log('getCustomWriteCheckpointChanges', options, changes); const invalidate = changes.length > limit; const updatedWriteCheckpoints = new Map(); @@ -162,30 +161,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { updatedWriteCheckpoints }; } - - private async getClusterTime(): Promise { - const hello = await this.db.db.command({ hello: 1 }); - // Note: This is not valid on sharded clusters. - const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; - return startClusterTime; - } - - /** - * Makes a write checkpoint stream an orderered one - any out-of-order events are discarded. - */ - private async *orderedStream(stream: AsyncIterable) { - let lastId = -1n; - - for await (let event of stream) { - // Guard against out-of-order events - if (lastId == -1n || (event.id != null && event.id > lastId)) { - yield event; - if (event.id != null) { - lastId = event.id; - } - } - } - } } export async function batchCreateCustomWriteCheckpoints( @@ -215,9 +190,3 @@ export async function batchCreateCustomWriteCheckpoints( { session } ); } - -interface WatchUserWriteCheckpointOptions { - user_id: string; - sync_rules_id: number; - signal: AbortSignal; -} diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index 81084eed..6aa61368 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -1,5 +1,3 @@ -import { InternalOpId } from '../util/utils.js'; - export enum WriteCheckpointMode { /** * Raw mappings of `user_id` to `write_checkpoint`s should @@ -52,26 +50,6 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti heads: Record; } -export interface WriteCheckpointResult { - /** - * Write checkpoint id (also referred to as client_id). - * - * If null, there is no write checkpoint for the client. - */ - id: bigint | null; - - /** - * LSN for the checkpoint. - * - * This will change when we support multiple connections. - * - * For managed write checkpoints, this LSN must be exceeded by the checkpoint / replication head to be valid. - * - * For custom write checkpoints, this will be null, and the write checkpoint is valid for all LSNs. - */ - lsn: string | null; -} - export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; From 7c961bd133e31d514ec96d7cdcbefab7eebae05c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Jun 2025 13:05:20 +0200 Subject: [PATCH 11/15] Process write checkpoints on keepalive. --- .../storage/implementation/MongoBucketBatch.ts | 15 +++++++++++++++ .../module-postgres/test/src/checkpoints.test.ts | 1 - 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index fb22428f..91012327 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -726,6 +726,21 @@ export class MongoBucketBatch return await this.commit(lsn); } + await this.db.write_checkpoints.updateMany( + { + processed_at_lsn: null, + 'lsns.1': { $lte: lsn } + }, + { + $set: { + processed_at_lsn: lsn + } + }, + { + session: this.session + } + ); + await this.db.sync_rules.updateOne( { _id: this.group_id diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index 7ad247ea..a0446ca1 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -3,7 +3,6 @@ import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core import { describe, test } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; import { WalStreamTestContext } from './wal_stream_utils.js'; -import { env } from './env.js'; import timers from 'node:timers/promises'; From a1e213deb426f92f97385aacc73c70bd7e260710 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Jun 2025 13:09:29 +0200 Subject: [PATCH 12/15] Test write checkpoints on both postgres and mongodb storage. --- modules/module-postgres/test/src/checkpoints.test.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index a0446ca1..fbae9f5e 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -1,7 +1,7 @@ import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'; -import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core'; +import { checkpointUserId, createWriteCheckpoint, TestStorageFactory } from '@powersync/service-core'; import { describe, test } from 'vitest'; -import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { describeWithStorage } from './util.js'; import { WalStreamTestContext } from './wal_stream_utils.js'; import timers from 'node:timers/promises'; @@ -12,8 +12,11 @@ const BASIC_SYNC_RULES = `bucket_definitions: - SELECT id, description, other FROM "test_data"`; describe('checkpoint tests', () => { + describeWithStorage({}, checkpointTests); +}); + +const checkpointTests = (factory: TestStorageFactory) => { test('write checkpoints', { timeout: 50_000 }, async () => { - const factory = INITIALIZED_MONGO_STORAGE_FACTORY; await using context = await WalStreamTestContext.open(factory); await context.updateSyncRules(BASIC_SYNC_RULES); @@ -78,4 +81,4 @@ describe('checkpoint tests', () => { controller.abort(); } }); -}); +}; From a36440eaa0ed8d073e1f2e0ae4d8daf0232feba4 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Jun 2025 13:13:45 +0200 Subject: [PATCH 13/15] Increase delay in test. --- packages/service-core-tests/src/tests/register-sync-tests.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 6bf9a2f5..504df19f 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -523,7 +523,7 @@ bucket_definitions: if (sentRows >= 1000 && sentRows <= 2001) { // pause for a bit to give the stream time to process interruptions. // This covers the data batch above and the next one. - await timers.setTimeout(50); + await timers.setTimeout(150); } } } From 9cbe9aa27e5e06e890e11bc4d1400e19e86d8b5a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Jun 2025 13:16:45 +0200 Subject: [PATCH 14/15] Changeset. --- .changeset/rich-donuts-cheat.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/rich-donuts-cheat.md diff --git a/.changeset/rich-donuts-cheat.md b/.changeset/rich-donuts-cheat.md new file mode 100644 index 00000000..a73cd059 --- /dev/null +++ b/.changeset/rich-donuts-cheat.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-core': minor +--- + +[MongoDB Storage] Remove change streams on bucket storage database due to performance overhead. From d5abe45ad18f9868c5d9ec78c388311ea20998a7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 17 Jun 2025 13:29:10 +0200 Subject: [PATCH 15/15] Another attempt at fixing the test. --- .../src/tests/register-sync-tests.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 504df19f..99e98951 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -519,12 +519,12 @@ bucket_definitions: await batch.commit('0/2'); }); + } - if (sentRows >= 1000 && sentRows <= 2001) { - // pause for a bit to give the stream time to process interruptions. - // This covers the data batch above and the next one. - await timers.setTimeout(150); - } + if (sentRows >= 1000 && sentRows <= 2001) { + // pause for a bit to give the stream time to process interruptions. + // This covers the data batch above and the next one. + await timers.setTimeout(50); } } if ('checkpoint_complete' in next) {