diff --git a/.changeset/rich-donuts-cheat.md b/.changeset/rich-donuts-cheat.md new file mode 100644 index 00000000..a73cd059 --- /dev/null +++ b/.changeset/rich-donuts-cheat.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-core': minor +--- + +[MongoDB Storage] Remove change streams on bucket storage database due to performance overhead. diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts new file mode 100644 index 00000000..d2b400ab --- /dev/null +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1749720702136-checkpoint-events.ts @@ -0,0 +1,50 @@ +import { migrations } from '@powersync/service-core'; +import * as storage from '../../../storage/storage-index.js'; +import { MongoStorageConfig } from '../../../types/types.js'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.createCheckpointEventsCollection(); + + await db.write_checkpoints.createIndex( + { + processed_at_lsn: 1 + }, + { name: 'processed_at_lsn' } + ); + + await db.custom_write_checkpoints.createIndex( + { + op_id: 1 + }, + { name: 'op_id' } + ); + } finally { + await db.client.close(); + } +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + if (await db.write_checkpoints.indexExists('processed_at_lsn')) { + await db.write_checkpoints.dropIndex('processed_at_lsn'); + } + if (await db.custom_write_checkpoints.indexExists('op_id')) { + await db.custom_write_checkpoints.dropIndex('op_id'); + } + await db.db.dropCollection('checkpoint_events'); + } finally { + await db.client.close(); + } +}; diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index ed924b38..82bbdf0b 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -119,6 +119,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } else if (next == null && active?.id == sync_rules_group_id) { // Slot removed for "active" sync rules, while there is no "next" one. await this.updateSyncRules({ @@ -141,6 +142,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } else if (next != null && active?.id == sync_rules_group_id) { // Already have next sync rules, but need to stop replicating the active one. @@ -155,6 +157,7 @@ export class MongoBucketStorage } } ); + await this.db.notifyCheckpoint(); } } @@ -217,6 +220,7 @@ export class MongoBucketStorage last_keepalive_ts: null }; await this.db.sync_rules.insertOne(doc); + await this.db.notifyCheckpoint(); rules = new MongoPersistedSyncRulesContent(this.db, doc); if (options.lock) { const lock = await rules.lock(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 38520670..24fb9aca 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -139,8 +139,6 @@ export class MongoBucketBatch result = r; } } - await batchCreateCustomWriteCheckpoints(this.db, this.write_checkpoint_batch); - this.write_checkpoint_batch = []; return result; } @@ -156,6 +154,11 @@ export class MongoBucketBatch await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => { resumeBatch = await this.replicateBatch(session, batch, opSeq, options); + if (this.write_checkpoint_batch.length > 0) { + await batchCreateCustomWriteCheckpoints(this.db, session, this.write_checkpoint_batch, opSeq.next()); + this.write_checkpoint_batch = []; + } + last_op = opSeq.last(); }); @@ -627,6 +630,7 @@ export class MongoBucketBatch }, { session } ); + // We don't notify checkpoint here - we don't make any checkpoint updates directly }); } @@ -674,6 +678,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); // Cannot create a checkpoint yet - return false return false; @@ -698,6 +703,23 @@ export class MongoBucketBatch update.last_checkpoint = this.persisted_op; } + // Mark relevant write checkpoints as "processed". + // This makes it easier to identify write checkpoints that are "valid" in order. + await this.db.write_checkpoints.updateMany( + { + processed_at_lsn: null, + 'lsns.1': { $lte: lsn } + }, + { + $set: { + processed_at_lsn: lsn + } + }, + { + session: this.session + } + ); + await this.db.sync_rules.updateOne( { _id: this.group_id @@ -708,6 +730,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); this.persisted_op = null; this.last_checkpoint_lsn = lsn; return true; @@ -730,6 +753,21 @@ export class MongoBucketBatch return await this.commit(lsn); } + await this.db.write_checkpoints.updateMany( + { + processed_at_lsn: null, + 'lsns.1': { $lte: lsn } + }, + { + $set: { + processed_at_lsn: lsn + } + }, + { + session: this.session + } + ); + await this.db.sync_rules.updateOne( { _id: this.group_id @@ -745,6 +783,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.db.notifyCheckpoint(); this.last_checkpoint_lsn = lsn; return true; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index e46c2bf1..a22bb371 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -15,13 +15,11 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, - mergeAsyncIterables, ProtocolOpId, ReplicationCheckpoint, storage, utils, - WatchWriteCheckpointOptions, - WriteCheckpointResult + WatchWriteCheckpointOptions } from '@powersync/service-core'; import { JSONBig } from '@powersync/service-jsonbig'; import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; @@ -36,8 +34,7 @@ import { BucketStateDocument, SourceKey, SourceTableDocument, - SyncRuleCheckpointState, - SyncRuleDocument + SyncRuleCheckpointState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; @@ -56,7 +53,7 @@ export class MongoSyncBucketStorage }); private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; - private writeCheckpointAPI: storage.WriteCheckpointAPI; + private writeCheckpointAPI: MongoWriteCheckpointAPI; constructor( public readonly factory: MongoBucketStorage, @@ -82,12 +79,6 @@ export class MongoSyncBucketStorage this.writeCheckpointAPI.setWriteCheckpointMode(mode); } - batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( - checkpoints.map((checkpoint) => ({ ...checkpoint, sync_rules_id: this.group_id })) - ); - } - createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); } @@ -527,6 +518,7 @@ export class MongoSyncBucketStorage } } ); + await this.db.notifyCheckpoint(); } async getStatus(): Promise { @@ -662,6 +654,7 @@ export class MongoSyncBucketStorage }, { session } ); + await this.db.notifyCheckpoint(); } }); }); @@ -679,6 +672,7 @@ export class MongoSyncBucketStorage } } ); + await this.db.notifyCheckpoint(); } async compact(options?: storage.CompactOptions) { @@ -696,21 +690,27 @@ export class MongoSyncBucketStorage * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { - // Use this form instead of (doc: SyncRuleCheckpointState | null = null), - // otherwise we get weird "doc: never" issues. - let doc = null as SyncRuleCheckpointState | null; - let clusterTime = null as mongo.Timestamp | null; - const syncRulesId = this.group_id; + const stream = this.checkpointChangesStream(signal); - await this.db.client.withSession(async (session) => { - doc = await this.db.sync_rules.findOne( + if (signal.aborted) { + return; + } + + // We only watch changes to the active sync rules. + // If it changes to inactive, we abort and restart with the new sync rules. + let lastOp: storage.ReplicationCheckpoint | null = null; + + for await (const _ of stream) { + if (signal.aborted) { + break; + } + + const doc = await this.db.sync_rules.findOne( { - _id: syncRulesId, + _id: this.group_id, state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } }, { - session, - sort: { _id: -1 }, limit: 1, projection: { _id: 1, @@ -720,70 +720,17 @@ export class MongoSyncBucketStorage } } ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new ServiceError(ErrorCode.PSYNC_S2401, 'Could not get clusterTime'); - } - - if (signal.aborted) { - return; - } - - if (doc == null) { - // Sync rules not present or not active. - // Abort the connections - clients will have to retry later. - throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); - } - yield this.makeActiveCheckpoint(doc); - - // We only watch changes to the active sync rules. - // If it changes to inactive, we abort and restart with the new sync rules. - - const pipeline = this.getChangeStreamPipeline(); - - const stream = this.db.sync_rules.watch(pipeline, { - // Start at the cluster time where we got the initial doc, to make sure - // we don't skip any updates. - // This may result in the first operation being a duplicate, but we filter - // it out anyway. - startAtOperationTime: clusterTime - }); - - signal.addEventListener( - 'abort', - () => { - stream.close(); - }, - { once: true } - ); - - let lastOp: storage.ReplicationCheckpoint | null = null; - let lastDoc: SyncRuleCheckpointState | null = doc; - - for await (const update of stream.stream()) { - if (signal.aborted) { - break; - } - if (update.operationType != 'insert' && update.operationType != 'update' && update.operationType != 'replace') { - continue; - } - - const doc = await this.getOperationDoc(lastDoc, update as lib_mongo.mongo.ChangeStreamDocument); if (doc == null) { - // Irrelevant update - continue; - } - if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { + // Sync rules not present or not active. + // Abort the connections - clients will have to retry later. + throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); + } else if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. // We do a soft close of the stream here - no error break; } - lastDoc = doc; - const op = this.makeActiveCheckpoint(doc); // Check for LSN / checkpoint changes - ignore other metadata changes if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { @@ -802,61 +749,32 @@ export class MongoSyncBucketStorage * User-specific watch on the latest checkpoint and/or write checkpoint. */ async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { - const { signal } = options; - let lastCheckpoint: utils.InternalOpId | null = null; - let lastWriteCheckpoint: bigint | null = null; - let lastWriteCheckpointDoc: WriteCheckpointResult | null = null; - let nextWriteCheckpoint: bigint | null = null; - let lastCheckpointEvent: ReplicationCheckpoint | null = null; - let receivedWriteCheckpoint = false; - - const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({ - user_id: options.user_id, - signal, - sync_rules_id: this.group_id - }); - const iter = mergeAsyncIterables( - [this.sharedIter, writeCheckpointIter], - signal - ); + let lastCheckpoint: ReplicationCheckpoint | null = null; - for await (const event of iter) { - if ('checkpoint' in event) { - lastCheckpointEvent = event; - } else { - lastWriteCheckpointDoc = event; - receivedWriteCheckpoint = true; - } - - if (lastCheckpointEvent == null || !receivedWriteCheckpoint) { - // We need to wait until we received at least on checkpoint, and one write checkpoint. - continue; - } + const iter = this.sharedIter[Symbol.asyncIterator](options.signal); + let writeCheckpoint: bigint | null = null; + for await (const nextCheckpoint of iter) { // lsn changes are not important by itself. // What is important is: // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - const lsn = lastCheckpointEvent?.lsn; + if (nextCheckpoint.lsn != null) { + writeCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + sync_rules_id: this.group_id, + user_id: options.user_id, + heads: { + '1': nextCheckpoint.lsn + } + }); + } if ( - lastWriteCheckpointDoc != null && - (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn)) + lastCheckpoint != null && + lastCheckpoint.checkpoint == nextCheckpoint.checkpoint && + lastCheckpoint.lsn == nextCheckpoint.lsn ) { - const writeCheckpoint = lastWriteCheckpointDoc.id; - if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) { - nextWriteCheckpoint = writeCheckpoint; - } - // We used the doc - clear it - lastWriteCheckpointDoc = null; - } - - const { checkpoint } = lastCheckpointEvent; - - const currentWriteCheckpoint = nextWriteCheckpoint; - - if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) { // No change - wait for next one // In some cases, many LSNs may be produced in a short time. // Add a delay to throttle the write checkpoint lookup a bit. @@ -864,75 +782,106 @@ export class MongoSyncBucketStorage continue; } - const updates: CheckpointChanges = - lastCheckpoint == null - ? CHECKPOINT_INVALIDATE_ALL - : await this.getCheckpointChanges({ - lastCheckpoint: lastCheckpoint, - nextCheckpoint: checkpoint - }); - - lastWriteCheckpoint = currentWriteCheckpoint; - lastCheckpoint = checkpoint; - - yield { - base: lastCheckpointEvent, - writeCheckpoint: currentWriteCheckpoint, - update: updates - }; - } - } + if (lastCheckpoint == null) { + yield { + base: nextCheckpoint, + writeCheckpoint, + update: CHECKPOINT_INVALIDATE_ALL + }; + } else { + const updates = await this.getCheckpointChanges({ + lastCheckpoint, + nextCheckpoint + }); + + let updatedWriteCheckpoint = updates.updatedWriteCheckpoints.get(options.user_id) ?? null; + if (updates.invalidateWriteCheckpoints) { + updatedWriteCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + sync_rules_id: this.group_id, + user_id: options.user_id, + heads: { + '1': nextCheckpoint.lsn! + } + }); + } + if (updatedWriteCheckpoint != null && (writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint)) { + writeCheckpoint = updatedWriteCheckpoint; + } - private async getOperationDoc( - lastDoc: SyncRuleCheckpointState, - update: lib_mongo.mongo.ChangeStreamDocument - ): Promise { - if (update.operationType == 'insert' || update.operationType == 'replace') { - return update.fullDocument; - } else if (update.operationType == 'update') { - const updatedFields = update.updateDescription.updatedFields ?? {}; - if (lastDoc._id != update.documentKey._id) { - throw new ServiceAssertionError(`Sync rules id mismatch: ${lastDoc._id} != ${update.documentKey._id}`); + yield { + base: nextCheckpoint, + writeCheckpoint, + update: { + updatedDataBuckets: updates.updatedDataBuckets, + invalidateDataBuckets: updates.invalidateDataBuckets, + updatedParameterLookups: updates.updatedParameterLookups, + invalidateParameterBuckets: updates.invalidateParameterBuckets + } + }; } - const mergedDoc: SyncRuleCheckpointState = { - _id: lastDoc._id, - last_checkpoint: updatedFields.last_checkpoint ?? lastDoc.last_checkpoint, - last_checkpoint_lsn: updatedFields.last_checkpoint_lsn ?? lastDoc.last_checkpoint_lsn, - state: updatedFields.state ?? lastDoc.state - }; - - return mergedDoc; - } else { - // Unknown event type - return null; + lastCheckpoint = nextCheckpoint; } } - private getChangeStreamPipeline() { - const syncRulesId = this.group_id; - const pipeline: mongo.Document[] = [ - { - $match: { - 'documentKey._id': syncRulesId, - operationType: { $in: ['insert', 'update', 'replace'] } + /** + * This watches the checkpoint_events capped collection for new documents inserted, + * and yields whenever one or more documents are inserted. + * + * The actual checkpoint must be queried on the sync_rules collection after this. + */ + private async *checkpointChangesStream(signal: AbortSignal): AsyncGenerator { + if (signal.aborted) { + return; + } + + const query = () => { + return this.db.checkpoint_events.find( + {}, + { tailable: true, awaitData: true, maxAwaitTimeMS: 10_000, batchSize: 1000 } + ); + }; + + let cursor = query(); + + signal.addEventListener('abort', () => { + cursor.close().catch(() => {}); + }); + + // Yield once on start, regardless of whether there are documents in the cursor. + // This is to ensure that the first iteration of the generator yields immediately. + yield; + + try { + while (!signal.aborted) { + const doc = await cursor.tryNext().catch((e) => { + if (lib_mongo.isMongoServerError(e) && e.codeName === 'CappedPositionLost') { + // Cursor position lost, potentially due to a high rate of notifications + cursor = query(); + // Treat as an event found, before querying the new cursor again + return {}; + } else { + return Promise.reject(e); + } + }); + if (cursor.closed) { + return; } - }, - { - $project: { - operationType: 1, - 'documentKey._id': 1, - 'updateDescription.updatedFields.state': 1, - 'updateDescription.updatedFields.last_checkpoint': 1, - 'updateDescription.updatedFields.last_checkpoint_lsn': 1, - 'fullDocument._id': 1, - 'fullDocument.state': 1, - 'fullDocument.last_checkpoint': 1, - 'fullDocument.last_checkpoint_lsn': 1 + // Skip buffered documents, if any. We don't care about the contents, + // we only want to know when new documents are inserted. + cursor.readBufferedDocuments(); + if (doc != null) { + yield; } } - ]; - return pipeline; + } catch (e) { + if (signal.aborted) { + return; + } + throw e; + } finally { + await cursor.close(); + } } private async getDataBucketChanges( @@ -944,7 +893,7 @@ export class MongoSyncBucketStorage { // We have an index on (_id.g, last_op). '_id.g': this.group_id, - last_op: { $gt: BigInt(options.lastCheckpoint) } + last_op: { $gt: options.lastCheckpoint.checkpoint } }, { projection: { @@ -973,7 +922,7 @@ export class MongoSyncBucketStorage const parameterUpdates = await this.db.bucket_parameters .find( { - _id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }, + _id: { $gt: options.lastCheckpoint.checkpoint, $lte: options.nextCheckpoint.checkpoint }, 'key.g': this.group_id }, { @@ -1001,7 +950,11 @@ export class MongoSyncBucketStorage // TODO (later): // We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do // more efficient lookups in some cases. - private checkpointChangesCache = new LRUCache({ + private checkpointChangesCache = new LRUCache< + string, + InternalCheckpointChanges, + { options: GetCheckpointChangesOptions } + >({ // Limit to 50 cache entries, or 10MB, whichever comes first. // Some rough calculations: // If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have @@ -1009,31 +962,39 @@ export class MongoSyncBucketStorage // That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys // would likely be clustered around a few values, rather than spread over all 400 potential values. max: 50, - maxSize: 10 * 1024 * 1024, - sizeCalculation: (value: CheckpointChanges) => { + maxSize: 12 * 1024 * 1024, + sizeCalculation: (value: InternalCheckpointChanges) => { // Estimate of memory usage const paramSize = [...value.updatedParameterLookups].reduce((a, b) => a + b.length, 0); const bucketSize = [...value.updatedDataBuckets].reduce((a, b) => a + b.length, 0); - return 100 + paramSize + bucketSize; + const writeCheckpointSize = value.updatedWriteCheckpoints.size * 30; // estiamte for user_id + bigint + return 100 + paramSize + bucketSize + writeCheckpointSize; }, fetchMethod: async (_key, _staleValue, options) => { return this.getCheckpointChangesInternal(options.context.options); } }); - async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise { - const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`; + async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise { + const key = `${options.lastCheckpoint.checkpoint}_${options.lastCheckpoint.lsn}__${options.nextCheckpoint.checkpoint}_${options.nextCheckpoint.lsn}`; const result = await this.checkpointChangesCache.fetch(key, { context: { options } }); return result!; } - private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { + private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise { const dataUpdates = await this.getDataBucketChanges(options); const parameterUpdates = await this.getParameterBucketChanges(options); + const writeCheckpointUpdates = await this.writeCheckpointAPI.getWriteCheckpointChanges(options); return { ...dataUpdates, - ...parameterUpdates + ...parameterUpdates, + ...writeCheckpointUpdates }; } } + +interface InternalCheckpointChanges extends CheckpointChanges { + updatedWriteCheckpoints: Map; + invalidateWriteCheckpoints: boolean; +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts index 2fa11dfc..a7457b43 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts @@ -16,6 +16,9 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag await db.db.createCollection('bucket_parameters'); } + // Full migrations are not currently run for tests, so we manually create this + await db.createCheckpointEventsCollection(); + if (!options?.doNotClear) { await db.clear(); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 785898cf..4431cd99 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,14 +1,7 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { - Demultiplexer, - DemultiplexerValue, - storage, - WatchUserWriteCheckpointOptions, - WriteCheckpointResult -} from '@powersync/service-core'; +import { GetCheckpointChangesOptions, InternalOpId, storage } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; -import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -35,13 +28,9 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { this._mode = mode; } - async batchCreateCustomWriteCheckpoints(checkpoints: storage.CustomWriteCheckpointOptions[]): Promise { - return batchCreateCustomWriteCheckpoints(this.db, checkpoints); - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { - throw new framework.errors.ValidationError( + throw new framework.ServiceAssertionError( `Attempting to create a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` ); } @@ -53,7 +42,8 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { }, { $set: { - lsns + lsns, + processed_at_lsn: null }, $inc: { client_id: 1n @@ -68,12 +58,12 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: if (false == 'sync_rules_id' in filters) { - throw new framework.errors.ValidationError(`Sync rules ID is required for custom Write Checkpoint filtering`); + throw new framework.ServiceAssertionError(`Sync rules ID is required for custom Write Checkpoint filtering`); } return this.lastCustomWriteCheckpoint(filters); case storage.WriteCheckpointMode.MANAGED: if (false == 'heads' in filters) { - throw new framework.errors.ValidationError( + throw new framework.ServiceAssertionError( `Replication HEAD is required for managed Write Checkpoint filtering` ); } @@ -81,231 +71,15 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + async getWriteCheckpointChanges(options: GetCheckpointChangesOptions) { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: - return this.watchCustomWriteCheckpoint(options); + return this.getCustomWriteCheckpointChanges(options); case storage.WriteCheckpointMode.MANAGED: - return this.watchManagedWriteCheckpoint(options); - default: - throw new Error('Invalid write checkpoint mode'); - } - } - - private sharedManagedIter = new Demultiplexer((signal) => { - const clusterTimePromise = this.getClusterTime(); - - return { - iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal), - getFirstValue: async (user_id: string) => { - // Potential race conditions we cater for: - - // Case 1: changestream is behind. - // We get a doc now, then the same or older doc again later. - // No problem! - - // Case 2: Query is behind. I.e. doc has been created, and emitted on the changestream, but the query doesn't see it yet. - // Not possible luckily, but can we make sure? - - // Case 3: changestream delays openeing. A doc is created after our query here, but before the changestream is opened. - // Awaiting clusterTimePromise should be sufficient here, but as a sanity check we also confirm that our query - // timestamp is > the startClusterTime. - - const changeStreamStart = await clusterTimePromise; - - let doc = null as WriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; - - await this.db.client.withSession(async (session) => { - doc = await this.db.write_checkpoints.findOne( - { - user_id: user_id - }, - { - session - } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); - } - - if (clusterTime.lessThan(changeStreamStart)) { - throw new framework.ServiceAssertionError( - 'clusterTime for write checkpoint is older than changestream start' - ); - } - - if (doc == null) { - return { - id: null, - lsn: null - }; - } - - return { - id: doc.client_id, - lsn: doc.lsns['1'] - }; - } - }; - }); - - private async *watchAllManagedWriteCheckpoints( - clusterTimePromise: Promise, - signal: AbortSignal - ): AsyncGenerator> { - const clusterTime = await clusterTimePromise; - - const stream = this.db.write_checkpoints.watch( - [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }], - { - fullDocument: 'updateLookup', - startAtOperationTime: clusterTime - } - ); - - signal.onabort = () => { - stream.close(); - }; - - if (signal.aborted) { - stream.close(); - return; - } - - for await (let event of stream) { - if (!('fullDocument' in event) || event.fullDocument == null) { - continue; - } - - const user_id = event.fullDocument.user_id; - yield { - key: user_id, - value: { - id: event.fullDocument.client_id, - lsn: event.fullDocument.lsns['1'] - } - }; + return this.getManagedWriteCheckpointChanges(options); } } - watchManagedWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { - const stream = this.sharedManagedIter.subscribe(options.user_id, options.signal); - return this.orderedStream(stream); - } - - private sharedCustomIter = new Demultiplexer((signal) => { - const clusterTimePromise = this.getClusterTime(); - - return { - iterator: this.watchAllCustomWriteCheckpoints(clusterTimePromise, signal), - getFirstValue: async (user_id: string) => { - // We cater for the same potential race conditions as for managed write checkpoints. - - const changeStreamStart = await clusterTimePromise; - - let doc = null as CustomWriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; - - await this.db.client.withSession(async (session) => { - doc = await this.db.custom_write_checkpoints.findOne( - { - user_id: user_id, - sync_rules_id: this.sync_rules_id - }, - { - session - } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); - } - - if (clusterTime.lessThan(changeStreamStart)) { - throw new framework.ServiceAssertionError( - 'clusterTime for write checkpoint is older than changestream start' - ); - } - - if (doc == null) { - // No write checkpoint, but we still need to return a result - return { - id: null, - lsn: null - }; - } - - return { - id: doc.checkpoint, - // custom write checkpoints are not tied to a LSN - lsn: null - }; - } - }; - }); - - private async *watchAllCustomWriteCheckpoints( - clusterTimePromise: Promise, - signal: AbortSignal - ): AsyncGenerator> { - const clusterTime = await clusterTimePromise; - - const stream = this.db.custom_write_checkpoints.watch( - [ - { - $match: { - 'fullDocument.sync_rules_id': this.sync_rules_id, - operationType: { $in: ['insert', 'update', 'replace'] } - } - } - ], - { - fullDocument: 'updateLookup', - startAtOperationTime: clusterTime - } - ); - - signal.onabort = () => { - stream.close(); - }; - - if (signal.aborted) { - stream.close(); - return; - } - - for await (let event of stream) { - if (!('fullDocument' in event) || event.fullDocument == null) { - continue; - } - - const user_id = event.fullDocument.user_id; - yield { - key: user_id, - value: { - id: event.fullDocument.checkpoint, - // Custom write checkpoints are not tied to a specific LSN - lsn: null - } - }; - } - } - - watchCustomWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { - if (options.sync_rules_id != this.sync_rules_id) { - throw new framework.ServiceAssertionError('sync_rules_id does not match'); - } - - const stream = this.sharedCustomIter.subscribe(options.user_id, options.signal); - return this.orderedStream(stream); - } - protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) { const { user_id, sync_rules_id } = filters; const lastWriteCheckpoint = await this.db.custom_write_checkpoints.findOne({ @@ -330,34 +104,70 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return lastWriteCheckpoint?.client_id ?? null; } - private async getClusterTime(): Promise { - const hello = await this.db.db.command({ hello: 1 }); - // Note: This is not valid on sharded clusters. - const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; - return startClusterTime; - } + private async getManagedWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + const limit = 1000; + const changes = await this.db.write_checkpoints + .find( + { + processed_at_lsn: { $gt: options.lastCheckpoint.lsn, $lte: options.nextCheckpoint.lsn } + }, + { + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true + } + ) + .toArray(); + const invalidate = changes.length > limit; + + const updatedWriteCheckpoints = new Map(); + if (!invalidate) { + for (let c of changes) { + updatedWriteCheckpoints.set(c.user_id, c.client_id); + } + } - /** - * Makes a write checkpoint stream an orderered one - any out-of-order events are discarded. - */ - private async *orderedStream(stream: AsyncIterable) { - let lastId = -1n; + return { + invalidateWriteCheckpoints: invalidate, + updatedWriteCheckpoints + }; + } - for await (let event of stream) { - // Guard against out-of-order events - if (lastId == -1n || (event.id != null && event.id > lastId)) { - yield event; - if (event.id != null) { - lastId = event.id; + private async getCustomWriteCheckpointChanges(options: GetCheckpointChangesOptions) { + const limit = 1000; + const changes = await this.db.custom_write_checkpoints + .find( + { + op_id: { $gt: options.lastCheckpoint.checkpoint, $lte: options.nextCheckpoint.checkpoint } + }, + { + limit: limit + 1, + batchSize: limit + 1, + singleBatch: true } + ) + .toArray(); + const invalidate = changes.length > limit; + + const updatedWriteCheckpoints = new Map(); + if (!invalidate) { + for (let c of changes) { + updatedWriteCheckpoints.set(c.user_id, c.checkpoint); } } + + return { + invalidateWriteCheckpoints: invalidate, + updatedWriteCheckpoints + }; } } export async function batchCreateCustomWriteCheckpoints( db: PowerSyncMongo, - checkpoints: storage.CustomWriteCheckpointOptions[] + session: mongo.ClientSession, + checkpoints: storage.CustomWriteCheckpointOptions[], + opId: InternalOpId ): Promise { if (checkpoints.length == 0) { return; @@ -370,12 +180,13 @@ export async function batchCreateCustomWriteCheckpoints( update: { $set: { checkpoint: checkpointOptions.checkpoint, - sync_rules_id: checkpointOptions.sync_rules_id + sync_rules_id: checkpointOptions.sync_rules_id, + op_id: opId } }, upsert: true } })), - {} + { session } ); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 00ccd544..694da1b7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -7,6 +7,7 @@ import { BucketDataDocument, BucketParameterDocument, BucketStateDocument, + CheckpointEventDocument, CurrentDataDocument, CustomWriteCheckpointDocument, IdSequenceDocument, @@ -35,6 +36,7 @@ export class PowerSyncMongo { readonly instance: mongo.Collection; readonly locks: mongo.Collection; readonly bucket_state: mongo.Collection; + readonly checkpoint_events: mongo.Collection; readonly client: mongo.MongoClient; readonly db: mongo.Db; @@ -58,6 +60,7 @@ export class PowerSyncMongo { this.instance = db.collection('instance'); this.locks = this.db.collection('locks'); this.bucket_state = this.db.collection('bucket_state'); + this.checkpoint_events = this.db.collection('checkpoint_events'); } /** @@ -85,6 +88,45 @@ export class PowerSyncMongo { async drop() { await this.db.dropDatabase(); } + + /** + * Call this after every checkpoint or sync rules status update. Rather call too often than too rarely. + * + * This is used in a similar way to the Postgres NOTIFY functionality. + */ + async notifyCheckpoint() { + await this.checkpoint_events.insertOne({} as any, { forceServerObjectId: true }); + } + + /** + * Only use in migrations and tests. + */ + async createCheckpointEventsCollection() { + // We cover the case where the replication process was started before running this migration. + const existingCollections = await this.db + .listCollections({ name: 'checkpoint_events' }, { nameOnly: false }) + .toArray(); + const collection = existingCollections[0]; + if (collection != null) { + if (!collection.options?.capped) { + // Collection was auto-created but not capped, so we need to drop it + await this.db.dropCollection('checkpoint_events'); + } else { + // Collection previously created somehow - ignore + return; + } + } + + await this.db.createCollection('checkpoint_events', { + capped: true, + // We want a small size, since opening a tailable cursor scans this entire collection. + // On the other hand, if we fill this up faster than a process can read it, it will + // invalidate the cursor. We do handle cursor invalidation events, but don't want + // that to happen too often. + size: 50 * 1024, // size in bytes + max: 50 // max number of documents + }); + } } export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) { diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 00d95236..e3e03ca5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -1,4 +1,4 @@ -import { storage, TableSnapshotStatus } from '@powersync/service-core'; +import { InternalOpId, storage } from '@powersync/service-core'; import { SqliteJsonValue } from '@powersync/service-sync-rules'; import * as bson from 'bson'; @@ -173,6 +173,10 @@ export interface SyncRuleDocument { content: string; } +export interface CheckpointEventDocument { + _id: bson.ObjectId; +} + export type SyncRuleCheckpointState = Pick< SyncRuleDocument, 'last_checkpoint' | 'last_checkpoint_lsn' | '_id' | 'state' @@ -183,6 +187,13 @@ export interface CustomWriteCheckpointDocument { user_id: string; checkpoint: bigint; sync_rules_id: number; + /** + * Unlike managed write checkpoints, custom write checkpoints are flushed together with + * normal ops. This means we can assign an op_id for ordering / correlating with read checkpoints. + * + * This is not unique - multiple write checkpoints can have the same op_id. + */ + op_id?: InternalOpId; } export interface WriteCheckpointDocument { @@ -190,6 +201,12 @@ export interface WriteCheckpointDocument { user_id: string; lsns: Record; client_id: bigint; + /** + * This is set to the checkpoint lsn when the checkpoint lsn >= this lsn. + * This is used to make it easier to determine what write checkpoints have been processed + * between two checkpoints. + */ + processed_at_lsn: string | null; } export interface InstanceDocument { diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 4fa48eaf..7ab896ca 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -107,12 +107,6 @@ export class PostgresSyncRulesStorage return new PostgresCompactor(this.db, this.group_id, options).compact(); } - batchCreateCustomWriteCheckpoints(checkpoints: storage.BatchedCustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( - checkpoints.map((c) => ({ ...c, sync_rules_id: this.group_id })) - ); - } - lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { return this.writeCheckpointAPI.lastWriteCheckpoint({ ...filters, diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index 913894f2..aba98b9a 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -1,7 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as framework from '@powersync/lib-services-framework'; -import { storage, sync } from '@powersync/service-core'; -import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { InternalOpId, storage } from '@powersync/service-core'; import { models } from '../../types/types.js'; export type PostgresCheckpointAPIOptions = { @@ -26,7 +25,10 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { this._mode = mode; } - async batchCreateCustomWriteCheckpoints(checkpoints: storage.CustomWriteCheckpointOptions[]): Promise { + async batchCreateCustomWriteCheckpoints( + checkpoints: storage.CustomWriteCheckpointOptions[], + op_id: InternalOpId + ): Promise { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } @@ -58,13 +60,6 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return row!.write_checkpoint; } - watchUserWriteCheckpoint( - options: storage.WatchUserWriteCheckpointOptions - ): AsyncIterable { - // Not used for Postgres currently - throw new Error('Method not implemented.'); - } - async lastWriteCheckpoint(filters: storage.LastWriteCheckpointFilters): Promise { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index a0446ca1..fbae9f5e 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -1,7 +1,7 @@ import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js'; -import { checkpointUserId, createWriteCheckpoint } from '@powersync/service-core'; +import { checkpointUserId, createWriteCheckpoint, TestStorageFactory } from '@powersync/service-core'; import { describe, test } from 'vitest'; -import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; +import { describeWithStorage } from './util.js'; import { WalStreamTestContext } from './wal_stream_utils.js'; import timers from 'node:timers/promises'; @@ -12,8 +12,11 @@ const BASIC_SYNC_RULES = `bucket_definitions: - SELECT id, description, other FROM "test_data"`; describe('checkpoint tests', () => { + describeWithStorage({}, checkpointTests); +}); + +const checkpointTests = (factory: TestStorageFactory) => { test('write checkpoints', { timeout: 50_000 }, async () => { - const factory = INITIALIZED_MONGO_STORAGE_FACTORY; await using context = await WalStreamTestContext.open(factory); await context.updateSyncRules(BASIC_SYNC_RULES); @@ -78,4 +81,4 @@ describe('checkpoint tests', () => { controller.abort(); } }); -}); +}; diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 850f37ed..e8ff90fd 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1769,14 +1769,12 @@ bucket_definitions: .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) [Symbol.asyncIterator](); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.addCustomWriteCheckpoint({ checkpoint: 5n, user_id: 'user1' - } - ]); - - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); + await batch.flush(); await batch.keepalive('5/0'); }); @@ -1785,7 +1783,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n, lsn: '5/0' }, writeCheckpoint: 5n @@ -1822,23 +1819,18 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n, lsn: '5/0' }, writeCheckpoint: null } }); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ checkpoint: 6n, user_id: 'user1' - } - ]); - // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. - // This is what is effetively triggered with RouteAPI.createReplicationHead(). - // MongoDB storage doesn't explicitly need this anymore. - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); + await batch.flush(); await batch.keepalive('6/0'); }); @@ -1847,7 +1839,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints // lsn: '6/0' }, @@ -1855,13 +1846,12 @@ bucket_definitions: } }); - await bucketStorage.batchCreateCustomWriteCheckpoints([ - { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + batch.addCustomWriteCheckpoint({ checkpoint: 7n, user_id: 'user1' - } - ]); - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + }); + await batch.flush(); await batch.keepalive('7/0'); }); @@ -1870,7 +1860,6 @@ bucket_definitions: done: false, value: { base: { - checkpoint: 0n // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints // lsn: '7/0' }, diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 6bf9a2f5..99e98951 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -519,12 +519,12 @@ bucket_definitions: await batch.commit('0/2'); }); + } - if (sentRows >= 1000 && sentRows <= 2001) { - // pause for a bit to give the stream time to process interruptions. - // This covers the data batch above and the next one. - await timers.setTimeout(50); - } + if (sentRows >= 1000 && sentRows <= 2001) { + // pause for a bit to give the stream time to process interruptions. + // This covers the data batch above and the next one. + await timers.setTimeout(50); } } if ('checkpoint_complete' in next) { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index c86d7106..62bd608f 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -266,8 +266,8 @@ export interface StorageCheckpointUpdate extends WriteCheckpoint { } export interface GetCheckpointChangesOptions { - lastCheckpoint: util.InternalOpId; - nextCheckpoint: util.InternalOpId; + lastCheckpoint: ReplicationCheckpoint; + nextCheckpoint: ReplicationCheckpoint; } export interface CheckpointChanges { diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index b9d14ce9..6aa61368 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -50,37 +50,11 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti heads: Record; } -export interface WriteCheckpointResult { - /** - * Write checkpoint id (also referred to as client_id). - * - * If null, there is no write checkpoint for the client. - */ - id: bigint | null; - - /** - * LSN for the checkpoint. - * - * This will change when we support multiple connections. - * - * For managed write checkpoints, this LSN must be exceeded by the checkpoint / replication head to be valid. - * - * For custom write checkpoints, this will be null, and the write checkpoint is valid for all LSNs. - */ - lsn: string | null; -} - export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; -export interface WatchUserWriteCheckpointOptions { - user_id: string; - sync_rules_id: number; - signal: AbortSignal; -} - export interface BaseWriteCheckpointAPI { readonly writeCheckpointMode: WriteCheckpointMode; setWriteCheckpointMode(mode: WriteCheckpointMode): void; @@ -93,7 +67,6 @@ export interface BaseWriteCheckpointAPI { * sync rules id. */ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { - batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise; lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise; } @@ -102,10 +75,7 @@ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { * sync rules identifiers for custom write checkpoints. */ export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { - batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; - - watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable; } export const DEFAULT_WRITE_CHECKPOINT_MODE = WriteCheckpointMode.MANAGED; diff --git a/packages/service-core/src/streams/BroadcastIterable.ts b/packages/service-core/src/streams/BroadcastIterable.ts index 2922c709..b48e1b8b 100644 --- a/packages/service-core/src/streams/BroadcastIterable.ts +++ b/packages/service-core/src/streams/BroadcastIterable.ts @@ -97,7 +97,7 @@ export class BroadcastIterable implements AsyncIterable { } } - async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterator { + async *[Symbol.asyncIterator](signal?: AbortSignal): AsyncIterableIterator { const sink = new LastValueSink(this.last); this.addSink(sink); try { diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts deleted file mode 100644 index 86ebcc40..00000000 --- a/packages/service-core/src/streams/Demultiplexer.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { AbortError } from 'ix/aborterror.js'; -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; -import { LastValueSink } from './LastValueSink.js'; - -export interface DemultiplexerValue { - /** - * The key used for demultiplexing, for example the user id. - */ - key: string; - /** - * The stream value. - */ - value: T; -} - -export interface DemultiplexerSource { - /** - * The async iterator providing a stream of values. - */ - iterator: AsyncIterable>; - - /** - * Fetches the first value for a given key. - * - * This is used to get an initial value for each subscription. - */ - getFirstValue(key: string): Promise; -} - -export type DemultiplexerSourceFactory = (signal: AbortSignal) => DemultiplexerSource; - -/** - * Takes a multiplexed stream (e.g. a changestream covering many individual users), - * and allows subscribing to individual streams. - * - * The source subscription is lazy: - * 1. We only start subscribing when there is a downstream subscriber. - * 2. When all downstream subscriptions have ended, we end the source subscription. - * - * For each subscriber, if backpressure builds up, we only keep the _last_ value. - */ -export class Demultiplexer { - private subscribers: Map>> | undefined = undefined; - private abortController: AbortController | undefined = undefined; - private currentSource: DemultiplexerSource | undefined = undefined; - - constructor(private source: DemultiplexerSourceFactory) {} - - private start(filter: string, sink: LastValueSink) { - const abortController = new AbortController(); - const listeners = new Map(); - listeners.set(filter, new Set([sink])); - - this.abortController = abortController; - this.subscribers = listeners; - - const source = this.source(abortController.signal); - this.currentSource = source; - this.loop(source, abortController, listeners); - return source; - } - - private async loop( - source: DemultiplexerSource, - abortController: AbortController, - sinks: Map>> - ) { - try { - for await (let doc of source.iterator) { - if (abortController.signal.aborted || sinks.size == 0) { - throw new AbortError(); - } - const key = doc.key; - const keySinks = sinks.get(key); - if (keySinks == null) { - continue; - } - - for (let sink of keySinks) { - sink.write(doc.value); - } - } - - // End of stream - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.end(); - } - } - } catch (e) { - // Just in case the error is not from the source - abortController.abort(); - - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.error(e); - } - } - } finally { - // Clear state, so that a new subscription may be started - if (this.subscribers === sinks) { - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - } - - private removeSink(key: string, sink: LastValueSink) { - const existing = this.subscribers?.get(key); - if (existing == null) { - return; - } - existing.delete(sink); - if (existing.size == 0) { - this.subscribers!.delete(key); - } - - if (this.subscribers?.size == 0) { - // This is not immediate - there may be a delay until it is fully stopped, - // depending on the underlying source. - this.abortController?.abort(); - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - - private addSink(key: string, sink: LastValueSink) { - if (this.currentSource == null) { - return this.start(key, sink); - } else { - const existing = this.subscribers!.get(key); - if (existing != null) { - existing.add(sink); - } else { - this.subscribers!.set(key, new Set([sink])); - } - return this.currentSource; - } - } - - /** - * Subscribe to a specific stream. - * - * @param key The key used for demultiplexing, e.g. user id. - * @param signal - */ - async *subscribe(key: string, signal: AbortSignal): AsyncIterable { - const sink = new LastValueSink(undefined); - // Important that we register the sink before calling getFirstValue(). - const source = this.addSink(key, sink); - try { - const firstValue = await source.getFirstValue(key); - yield firstValue; - yield* sink.withSignal(signal); - } finally { - this.removeSink(key, sink); - } - } - - get active() { - return this.subscribers != null; - } -} diff --git a/packages/service-core/src/streams/streams-index.ts b/packages/service-core/src/streams/streams-index.ts index 0d8eafa4..c25f752e 100644 --- a/packages/service-core/src/streams/streams-index.ts +++ b/packages/service-core/src/streams/streams-index.ts @@ -1,4 +1,3 @@ export * from './merge.js'; -export * from './Demultiplexer.js'; export * from './LastValueSink.js'; export * from './BroadcastIterable.js'; diff --git a/packages/service-core/test/src/demultiplexer.test.ts b/packages/service-core/test/src/demultiplexer.test.ts deleted file mode 100644 index 69ba9c87..00000000 --- a/packages/service-core/test/src/demultiplexer.test.ts +++ /dev/null @@ -1,205 +0,0 @@ -// Vitest Unit Tests -import { Demultiplexer, DemultiplexerSource, DemultiplexerSourceFactory, DemultiplexerValue } from '@/index.js'; -import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; -import { take } from 'ix/asynciterable/operators/take.js'; -import { toArray } from 'ix/asynciterable/toarray.js'; -import * as timers from 'node:timers/promises'; -import { describe, expect, it } from 'vitest'; - -describe('Demultiplexer', () => { - it('should start subscription lazily and provide first value', async () => { - const mockSource: DemultiplexerSourceFactory = (signal: AbortSignal) => { - const iterator = (async function* (): AsyncIterable> {})(); - return { - iterator, - getFirstValue: async (key: string) => `first-${key}` - }; - }; - - const demux = new Demultiplexer(mockSource); - const signal = new AbortController().signal; - - const iter = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const result = await iter.next(); - expect(result.value).toBe('first-user1'); - }); - - it('should handle multiple subscribers to the same key', async () => { - const iter = (async function* () { - yield { key: 'user1', value: 'value1' }; - yield { key: 'user1', value: 'value2' }; - })(); - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skipped - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - - expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should handle multiple subscribers to the same key (2)', async () => { - const p1 = Promise.withResolvers(); - const p2 = Promise.withResolvers(); - const p3 = Promise.withResolvers(); - - const iter = (async function* () { - await p1.promise; - yield { key: 'user1', value: 'value1' }; - await p2.promise; - yield { key: 'user1', value: 'value2' }; - await p3.promise; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skilled - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); - p1.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - expect(await iter2.next()).toEqual({ value: 'value1', done: false }); - p2.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value2', done: false }); - p3.resolve(); - - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should handle multiple subscribers to different keys', async () => { - const p1 = Promise.withResolvers(); - const p2 = Promise.withResolvers(); - const p3 = Promise.withResolvers(); - - const iter = (async function* () { - await p1.promise; - yield { key: 'user1', value: 'value1' }; - await p2.promise; - yield { key: 'user2', value: 'value2' }; - await p3.promise; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const signal = new AbortController().signal; - - const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); - const iter2 = demux.subscribe('user2', signal)[Symbol.asyncIterator](); - - // Due to only keeping the last value, some values are skilled - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - expect(await iter2.next()).toEqual({ value: 'first-user2', done: false }); - p1.resolve(); - - expect(await iter1.next()).toEqual({ value: 'value1', done: false }); - p2.resolve(); - - expect(await iter2.next()).toEqual({ value: 'value2', done: false }); - p3.resolve(); - - expect(await iter1.next()).toEqual({ value: undefined, done: true }); - expect(await iter2.next()).toEqual({ value: undefined, done: true }); - }); - - it('should abort', async () => { - const iter = (async function* () { - yield { key: 'user1', value: 'value1' }; - yield { key: 'user1', value: 'value2' }; - })(); - - const source: DemultiplexerSource = { - iterator: iter, - getFirstValue: async (key: string) => `first-${key}` - }; - - const demux = new Demultiplexer(() => source); - const controller = new AbortController(); - - const iter1 = demux.subscribe('user1', controller.signal)[Symbol.asyncIterator](); - - expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); - controller.abort(); - - await expect(iter1.next()).rejects.toThrow('The operation has been aborted'); - }); - - it('should handle errors on multiple subscribers', async () => { - let sourceIndex = 0; - const sourceFn = async function* (signal: AbortSignal): AsyncIterable> { - // Test value out by 1000 means it may have used the wrong iteration of the source - const base = (sourceIndex += 1000); - const abortedPromise = new Promise((resolve) => { - signal.addEventListener('abort', resolve, { once: true }); - }); - for (let i = 0; !signal.aborted; i++) { - if (base + i == 1005) { - throw new Error('simulated failure'); - } - yield { key: 'u1', value: base + i }; - await Promise.race([abortedPromise, timers.setTimeout(1)]); - } - // Test value out by 100 means this wasn't reached - sourceIndex += 100; - }; - - const sourceFactory: DemultiplexerSourceFactory = (signal) => { - const source: DemultiplexerSource = { - iterator: sourceFn(signal), - getFirstValue: async (key: string) => -1 - }; - return source; - }; - const demux = new Demultiplexer(sourceFactory); - - const controller = new AbortController(); - - const delayed1 = delayEach(9)(demux.subscribe('u1', controller.signal)); - const delayed2 = delayEach(10)(demux.subscribe('u1', controller.signal)); - expect(demux.active).toBe(false); - const results1Promise = toArray(take(5)(delayed1)) as Promise; - const results2Promise = toArray(take(5)(delayed2)) as Promise; - - const [r1, r2] = await Promise.allSettled([results1Promise, results2Promise]); - - expect(r1).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); - expect(r2).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); - - expect(demux.active).toBe(false); - - // This starts a new source - const delayed3 = delayEach(10)(demux.subscribe('u1', controller.signal)); - const results3 = await toArray(take(6)(delayed3)); - expect(results3.length).toEqual(6); - expect(results3[0]).toEqual(-1); // Initial value - // There should be approximately 10ms between each value, but we allow for some slack - expect(results3[5]).toBeGreaterThan(2005); - expect(results3[5]).toBeLessThan(2200); - }); -});