diff --git a/.changeset/fresh-laws-join.md b/.changeset/fresh-laws-join.md new file mode 100644 index 00000000..618d7b41 --- /dev/null +++ b/.changeset/fresh-laws-join.md @@ -0,0 +1,8 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +'@powersync/node': minor +'@powersync/web': minor +--- + +Propagate logger from PowerSyncDatabase to streaming sync and remote implementations, and tweak some log messages. diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 80b8b0cc..90bd4601 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -127,7 +127,6 @@ export const DEFAULT_WATCH_THROTTLE_MS = 30; export const DEFAULT_POWERSYNC_DB_OPTIONS = { retryDelayMs: 5000, - logger: Logger.get('PowerSyncDatabase'), crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS }; @@ -176,6 +175,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver cb.schemaChanged?.(schema)); } - get logger() { - return this.options.logger!; - } - /** * Wait for initialization to complete. * While initializing is automatic, this helps to catch and report initialization errors. @@ -876,7 +875,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver this.options.logger?.error(e) } = handler ?? {}; + const { onResult, onError = (e: Error) => this.logger.error(e) } = handler ?? {}; if (!onResult) { throw new Error('onResult is required'); } @@ -1031,7 +1030,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void { - const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {}; + const { onChange, onError = (e: Error) => this.logger.error(e) } = handler ?? {}; if (!onChange) { throw new Error('onChange is required'); } diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index 8cf5a86d..4615db6e 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -92,11 +92,11 @@ export class SqliteBucketStorage extends BaseObserver imp async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean = false) { await this.writeTransaction(async (tx) => { for (const b of batch.buckets) { - const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ + await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ 'save', JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] }) ]); - this.logger.debug('saveSyncData', JSON.stringify(result)); + this.logger.debug(`Saved batch of data for bucket: ${b.bucket}, operations: ${b.data.length}`); } }); } @@ -115,7 +115,7 @@ export class SqliteBucketStorage extends BaseObserver imp await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', ['delete_bucket', bucket]); }); - this.logger.debug('done deleting bucket'); + this.logger.debug(`Done deleting bucket ${bucket}`); } async hasCompletedSync() { @@ -139,6 +139,11 @@ export class SqliteBucketStorage extends BaseObserver imp } return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures }; } + if (priority == null) { + this.logger.debug(`Validated checksums checkpoint ${checkpoint.last_op_id}`); + } else { + this.logger.debug(`Validated checksums for partial checkpoint ${checkpoint.last_op_id}, priority ${priority}`); + } let buckets = checkpoint.buckets; if (priority !== undefined) { @@ -158,7 +163,6 @@ export class SqliteBucketStorage extends BaseObserver imp const valid = await this.updateObjectsFromBuckets(checkpoint, priority); if (!valid) { - this.logger.debug('Not at a consistent checkpoint - cannot update local db'); return { ready: false, checkpointValid: true }; } @@ -221,7 +225,6 @@ export class SqliteBucketStorage extends BaseObserver imp ]); const resultItem = rs.rows?.item(0); - this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem); if (!resultItem) { return { checkpointValid: false, @@ -262,34 +265,32 @@ export class SqliteBucketStorage extends BaseObserver imp const opId = await cb(); - this.logger.debug(`[updateLocalTarget] Updating target to checkpoint ${opId}`); - return this.writeTransaction(async (tx) => { const anyData = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1'); if (anyData.rows?.length) { // if isNotEmpty - this.logger.debug('updateLocalTarget', 'ps crud is not empty'); + this.logger.debug(`New data uploaded since write checkpoint ${opId} - need new write checkpoint`); return false; } const rs = await tx.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); if (!rs.rows?.length) { // assert isNotEmpty - throw new Error('SQlite Sequence should not be empty'); + throw new Error('SQLite Sequence should not be empty'); } const seqAfter: number = rs.rows?.item(0)['seq']; - this.logger.debug('seqAfter', JSON.stringify(rs.rows?.item(0))); if (seqAfter != seqBefore) { - this.logger.debug('seqAfter != seqBefore', seqAfter, seqBefore); + this.logger.debug( + `New data uploaded since write checpoint ${opId} - need new write checkpoint (sequence updated)` + ); + // New crud data may have been uploaded since we got the checkpoint. Abort. return false; } - const response = await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [ - opId - ]); - this.logger.debug(['[updateLocalTarget] Response from updating target_op ', JSON.stringify(response)]); + this.logger.debug(`Updating target write checkpoint to ${opId}`); + await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [opId]); return true; }); } diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index e8e7f069..e3fea79b 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -205,7 +205,6 @@ export const DEFAULT_RETRY_DELAY_MS = 5000; export const DEFAULT_STREAMING_SYNC_OPTIONS = { retryDelayMs: DEFAULT_RETRY_DELAY_MS, - logger: Logger.get('PowerSyncStream'), crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS }; @@ -234,6 +233,7 @@ export abstract class AbstractStreamingSyncImplementation protected abortController: AbortController | null; protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; + protected logger: ILogger; private isUploadingCrud: boolean = false; private notifyCompletedUploads?: () => void; @@ -244,6 +244,7 @@ export abstract class AbstractStreamingSyncImplementation constructor(options: AbstractStreamingSyncImplementationOptions) { super(); this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options }; + this.logger = options.logger ?? Logger.get('PowerSyncStream'); this.syncStatus = new SyncStatus({ connected: false, @@ -318,10 +319,6 @@ export abstract class AbstractStreamingSyncImplementation return this.syncStatus.connected; } - protected get logger() { - return this.options.logger!; - } - async dispose() { this.crudUpdateListener?.(); this.crudUpdateListener = undefined; @@ -337,7 +334,9 @@ export abstract class AbstractStreamingSyncImplementation const clientId = await this.options.adapter.getClientId(); let path = `/write-checkpoint2.json?client_id=${clientId}`; const response = await this.options.remote.get(path); - return response['data']['write_checkpoint'] as string; + const checkpoint = response['data']['write_checkpoint'] as string; + this.logger.debug(`Created write checkpoint: ${checkpoint}`); + return checkpoint; } protected async _uploadAllCrud(): Promise { @@ -379,7 +378,11 @@ The next upload iteration will be delayed.`); }); } else { // Uploading is completed - await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); + const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); + if (neededUpdate == false && checkedCrudItem != null) { + // Only log this if there was something to upload + this.logger.debug('Upload complete, no write checkpoint needed.'); + } break; } } catch (ex) { @@ -1105,20 +1108,20 @@ The next upload iteration will be delayed.`); let result = await this.options.adapter.syncLocalDatabase(checkpoint); if (!result.checkpointValid) { - this.logger.debug('Checksum mismatch in checkpoint, will reconnect'); + this.logger.debug(`Checksum mismatch in checkpoint ${checkpoint.last_op_id}, will reconnect`); // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off await new Promise((resolve) => setTimeout(resolve, 50)); return { applied: false, endIteration: true }; } else if (!result.ready) { this.logger.debug( - 'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.' + `Could not apply checkpoint ${checkpoint.last_op_id} due to local data. We will retry applying the checkpoint after that upload is completed.` ); return { applied: false, endIteration: false }; } - this.logger.debug('validated checkpoint', checkpoint); + this.logger.debug(`Applied checkpoint ${checkpoint.last_op_id}`, checkpoint); this.updateSyncStatus({ connected: true, lastSyncedAt: new Date(), diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index da132594..4ac7969b 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateBucketStorageAdapter(): BucketStorageAdapter { - return new SqliteBucketStorage(this.database); + return new SqliteBucketStorage(this.database, this.logger); } connect( @@ -78,7 +78,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { connector: PowerSyncBackendConnector, options: NodeAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { - const logger = this.options.logger; + const logger = this.logger; const remote = new NodeRemote(connector, logger, { dispatcher: options.dispatcher, ...(this.options as NodePowerSyncDatabaseOptions).remoteOptions diff --git a/packages/react-native/src/db/PowerSyncDatabase.ts b/packages/react-native/src/db/PowerSyncDatabase.ts index 128c9901..ddb51369 100644 --- a/packages/react-native/src/db/PowerSyncDatabase.ts +++ b/packages/react-native/src/db/PowerSyncDatabase.ts @@ -39,14 +39,14 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateBucketStorageAdapter(): BucketStorageAdapter { - return new ReactNativeBucketStorageAdapter(this.database); + return new ReactNativeBucketStorageAdapter(this.database, this.logger); } protected generateSyncStreamImplementation( connector: PowerSyncBackendConnector, options: RequiredAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { - const remote = new ReactNativeRemote(connector); + const remote = new ReactNativeRemote(connector, this.logger); return new ReactNativeStreamingSyncImplementation({ adapter: this.bucketStorageAdapter, @@ -57,7 +57,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { }, retryDelayMs: options.retryDelayMs, crudUploadThrottleMs: options.crudUploadThrottleMs, - identifier: this.database.name + identifier: this.database.name, + logger: this.logger }); } } diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 891a033d..f5534680 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -186,7 +186,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { connector: PowerSyncBackendConnector, options: RequiredAdditionalConnectionOptions ): StreamingSyncImplementation { - const remote = new WebRemote(connector); + const remote = new WebRemote(connector, this.logger); const syncOptions: WebStreamingSyncImplementationOptions = { ...(this.options as {}), retryDelayMs: options.retryDelayMs, @@ -198,7 +198,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { await this.waitForReady(); await connector.uploadData(this); }, - identifier: this.database.name + identifier: this.database.name, + logger: this.logger }; switch (true) {