Skip to content

Improve logging #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/fresh-laws-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/node': minor
'@powersync/web': minor
---

Propagate logger from PowerSyncDatabase to streaming sync implementation.
15 changes: 7 additions & 8 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelayMs: 5000,
logger: Logger.get('PowerSyncDatabase'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

Expand Down Expand Up @@ -175,6 +174,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

protected runExclusiveMutex: Mutex;

logger: ILogger;

constructor(options: PowerSyncDatabaseOptionsWithDBAdapter);
constructor(options: PowerSyncDatabaseOptionsWithOpenFactory);
constructor(options: PowerSyncDatabaseOptionsWithSettings);
Expand All @@ -198,6 +199,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('The provided `database` option is invalid.');
}

this.logger = options.logger ?? Logger.get(`PowerSyncDatabase[${this._database.name}]`);

this.bucketStorageAdapter = this.generateBucketStorageAdapter();
this.closed = false;
this.currentStatus = new SyncStatus({});
Expand Down Expand Up @@ -418,7 +421,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
try {
schema.validate();
} catch (ex) {
this.options.logger?.warn('Schema validation failed. Unexpected behaviour could occur', ex);
this.logger.warn('Schema validation failed. Unexpected behaviour could occur', ex);
}
this._schema = schema;

Expand All @@ -427,10 +430,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.iterateListeners(async (cb) => cb.schemaChanged?.(schema));
}

get logger() {
return this.options.logger!;
}

/**
* Wait for initialization to complete.
* While initializing is automatic, this helps to catch and report initialization errors.
Expand Down Expand Up @@ -872,7 +871,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @param options Options for configuring watch behavior
*/
watchWithCallback(sql: string, parameters?: any[], handler?: WatchHandler, options?: SQLWatchOptions): void {
const { onResult, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
const { onResult, onError = (e: Error) => this.logger.error(e) } = handler ?? {};
if (!onResult) {
throw new Error('onResult is required');
}
Expand Down Expand Up @@ -1027,7 +1026,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @returns A dispose function to stop watching for changes
*/
onChangeWithCallback(handler?: WatchOnChangeHandler, options?: SQLWatchOptions): () => void {
const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
const { onChange, onError = (e: Error) => this.logger.error(e) } = handler ?? {};
if (!onChange) {
throw new Error('onChange is required');
}
Expand Down
31 changes: 16 additions & 15 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean = false) {
await this.writeTransaction(async (tx) => {
for (const b of batch.buckets) {
const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
'save',
JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] })
]);
this.logger.debug('saveSyncData', JSON.stringify(result));
this.logger.debug(`Saved batch of data for bucket: ${b.bucket}, operations: ${b.data.length}`);
}
});
}
Expand All @@ -115,7 +115,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', ['delete_bucket', bucket]);
});

this.logger.debug('done deleting bucket');
this.logger.debug(`Done deleting bucket ${bucket}`);
}

async hasCompletedSync() {
Expand All @@ -139,6 +139,11 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
}
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
}
if (priority == null) {
this.logger.debug(`Validated checksums checkpoint ${checkpoint.last_op_id}`);
} else {
this.logger.debug(`Validated checksums for partial checkpoint ${checkpoint.last_op_id}, priority ${priority}`);
}

let buckets = checkpoint.buckets;
if (priority !== undefined) {
Expand All @@ -158,7 +163,6 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

const valid = await this.updateObjectsFromBuckets(checkpoint, priority);
if (!valid) {
this.logger.debug('Not at a consistent checkpoint - cannot update local db');
return { ready: false, checkpointValid: true };
}

Expand Down Expand Up @@ -221,7 +225,6 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
]);

const resultItem = rs.rows?.item(0);
this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem);
if (!resultItem) {
return {
checkpointValid: false,
Expand Down Expand Up @@ -262,34 +265,32 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

const opId = await cb();

this.logger.debug(`[updateLocalTarget] Updating target to checkpoint ${opId}`);

return this.writeTransaction(async (tx) => {
const anyData = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
if (anyData.rows?.length) {
// if isNotEmpty
this.logger.debug('updateLocalTarget', 'ps crud is not empty');
this.logger.debug(`New data uploaded since write checkpoint ${opId} - need new write checkpoint`);
return false;
}

const rs = await tx.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.rows?.length) {
// assert isNotEmpty
throw new Error('SQlite Sequence should not be empty');
throw new Error('SQLite Sequence should not be empty');
}

const seqAfter: number = rs.rows?.item(0)['seq'];
this.logger.debug('seqAfter', JSON.stringify(rs.rows?.item(0)));
if (seqAfter != seqBefore) {
this.logger.debug('seqAfter != seqBefore', seqAfter, seqBefore);
this.logger.debug(
`New data uploaded since write checpoint ${opId} - need new write checkpoint (sequence updated)`
);

// New crud data may have been uploaded since we got the checkpoint. Abort.
return false;
}

const response = await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
opId
]);
this.logger.debug(['[updateLocalTarget] Response from updating target_op ', JSON.stringify(response)]);
this.logger.debug(`Updating target write checkpoint to ${opId}`);
await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [opId]);
return true;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ export abstract class AbstractStreamingSyncImplementation
const clientId = await this.options.adapter.getClientId();
let path = `/write-checkpoint2.json?client_id=${clientId}`;
const response = await this.options.remote.get(path);
return response['data']['write_checkpoint'] as string;
const checkpoint = response['data']['write_checkpoint'] as string;
this.logger.debug(`Created write checkpoint: ${checkpoint}`);
return checkpoint;
}

protected async _uploadAllCrud(): Promise<void> {
Expand Down Expand Up @@ -371,7 +373,11 @@ The next upload iteration will be delayed.`);
});
} else {
// Uploading is completed
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
const neededUpdate = await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
if (neededUpdate == false && checkedCrudItem != null) {
// Only log this if there was something to upload
this.logger.debug('Upload complete, no write checkpoint needed.');
}
break;
}
} catch (ex) {
Expand Down Expand Up @@ -1083,20 +1089,20 @@ The next upload iteration will be delayed.`);
let result = await this.options.adapter.syncLocalDatabase(checkpoint);

if (!result.checkpointValid) {
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
this.logger.debug(`Checksum mismatch in checkpoint ${checkpoint.last_op_id}, will reconnect`);
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
await new Promise((resolve) => setTimeout(resolve, 50));
return { applied: false, endIteration: true };
} else if (!result.ready) {
this.logger.debug(
'Could not apply checkpoint due to local data. We will retry applying the checkpoint after that upload is completed.'
`Could not apply checkpoint ${checkpoint.last_op_id} due to local data. We will retry applying the checkpoint after that upload is completed.`
);

return { applied: false, endIteration: false };
}

this.logger.debug('validated checkpoint', checkpoint);
this.logger.debug(`Applied checkpoint ${checkpoint.last_op_id}`, checkpoint);
this.updateSyncStatus({
connected: true,
lastSyncedAt: new Date(),
Expand Down
5 changes: 3 additions & 2 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateBucketStorageAdapter(): BucketStorageAdapter {
return new SqliteBucketStorage(this.database);
return new SqliteBucketStorage(this.database, this.logger);
}

connect(
Expand Down Expand Up @@ -92,7 +92,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
},
retryDelayMs: this.options.retryDelayMs,
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
identifier: this.database.name
identifier: this.database.name,
logger: this.logger
});
}
}
7 changes: 4 additions & 3 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateBucketStorageAdapter(): BucketStorageAdapter {
return new ReactNativeBucketStorageAdapter(this.database);
return new ReactNativeBucketStorageAdapter(this.database, this.logger);
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new ReactNativeRemote(connector);
const remote = new ReactNativeRemote(connector, this.logger);

return new ReactNativeStreamingSyncImplementation({
adapter: this.bucketStorageAdapter,
Expand All @@ -57,7 +57,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
},
retryDelayMs: options.retryDelayMs,
crudUploadThrottleMs: options.crudUploadThrottleMs,
identifier: this.database.name
identifier: this.database.name,
logger: this.logger
});
}
}
5 changes: 3 additions & 2 deletions packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
): StreamingSyncImplementation {
const remote = new WebRemote(connector);
const remote = new WebRemote(connector, this.logger);
const syncOptions: WebStreamingSyncImplementationOptions = {
...(this.options as {}),
retryDelayMs: options.retryDelayMs,
Expand All @@ -198,7 +198,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
identifier: this.database.name
identifier: this.database.name,
logger: this.logger
};

switch (true) {
Expand Down