Skip to content

Sync progress #555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sharp-singers-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Report progress information about downloaded rows.
6 changes: 1 addition & 5 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -146,11 +147,6 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
return typeof test == 'object' && isSQLOpenOptions(test.database);
};

/**
* The priority used by the core extension to indicate that a full sync was completed.
*/
const FULL_SYNC_PRIORITY = 2147483647;

export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
/**
* Transactions should be queued in the DBAdapter, but we also want to prevent
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/client/SQLOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface SQLOpenOptions {
dbFilename: string;
/**
* Directory where the database file is located.
*
*
* When set, the directory must exist when the database is opened, it will
* not be created automatically.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ export interface SyncLocalDatabaseResult {
checkpointFailures?: string[];
}

export type BucketOperationProgress = Record<
string,
{
atLast: number;
sinceLast: number;
}
>;

export interface BucketChecksum {
bucket: string;
priority?: number;
Expand Down Expand Up @@ -65,6 +73,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
startSession(): void;

getBucketStates(): Promise<BucketState[]>;
getBucketOperationProgress(): Promise<BucketOperationProgress>;

syncLocalDatabase(
checkpoint: Checkpoint,
Expand Down
24 changes: 23 additions & 1 deletion packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BaseObserver } from '../../../utils/BaseObserver.js';
import { MAX_OP_ID } from '../../constants.js';
import {
BucketChecksum,
BucketOperationProgress,
BucketState,
BucketStorageAdapter,
BucketStorageListener,
Expand Down Expand Up @@ -91,6 +92,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return result;
}

async getBucketOperationProgress(): Promise<BucketOperationProgress> {
const rows = await this.db.getAll<{ name: string; count_at_last: number; count_since_last: number }>(
'SELECT name, count_at_last, count_since_last FROM ps_buckets'
);
return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }]));
}

async saveSyncData(batch: SyncDataBatch) {
await this.writeTransaction(async (tx) => {
let count = 0;
Expand Down Expand Up @@ -199,7 +207,21 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
'sync_local',
arg
]);
return result == 1;
if (result == 1) {
if (priority == null) {
const bucketToCount = Object.fromEntries(checkpoint.buckets.map((b) => [b.bucket, b.count]));
// The two parameters could be replaced with one, but: https://github.com/powersync-ja/better-sqlite3/pull/6
const jsonBucketCount = JSON.stringify(bucketToCount);
await tx.execute(
'UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?->name WHERE ?->name IS NOT NULL',
[jsonBucketCount, jsonBucketCount]
);
}

return true;
} else {
return false;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
isStreamingSyncData
} from './streaming-sync-types.js';
import { DataStream } from 'src/utils/DataStream.js';
import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -164,21 +165,23 @@ export abstract class AbstractStreamingSyncImplementation
private pendingCrudUpload?: Promise<void>;

syncStatus: SyncStatus;
private syncStatusOptions: SyncStatusOptions;
triggerCrudUpload: () => void;

constructor(options: AbstractStreamingSyncImplementationOptions) {
super();
this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options };

this.syncStatus = new SyncStatus({
this.syncStatusOptions = {
connected: false,
connecting: false,
lastSyncedAt: undefined,
dataFlow: {
uploading: false,
downloading: false
}
});
};
this.syncStatus = new SyncStatus(this.syncStatusOptions);
this.abortController = null;

this.triggerCrudUpload = throttleLeadingTrailing(() => {
Expand Down Expand Up @@ -424,7 +427,8 @@ The next upload iteration will be delayed.`);
connected: false,
connecting: false,
dataFlow: {
downloading: false
downloading: false,
downloadProgress: null
}
});
});
Expand Down Expand Up @@ -580,6 +584,7 @@ The next upload iteration will be delayed.`);
bucketMap = newBuckets;
await this.options.adapter.removeBuckets([...bucketsToDelete]);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);
} else if (isStreamingSyncCheckpointComplete(line)) {
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
if (result.endIteration) {
Expand Down Expand Up @@ -640,6 +645,7 @@ The next upload iteration will be delayed.`);
write_checkpoint: diff.write_checkpoint
};
targetCheckpoint = newCheckpoint;
await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint);

bucketMap = new Map();
newBuckets.forEach((checksum, name) =>
Expand All @@ -657,9 +663,23 @@ The next upload iteration will be delayed.`);
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
} else if (isStreamingSyncData(line)) {
const { data } = line;
const previousProgress = this.syncStatusOptions.dataFlow?.downloadProgress;
let updatedProgress: InternalProgressInformation | null = null;
if (previousProgress) {
updatedProgress = { ...previousProgress };
const progressForBucket = updatedProgress[data.bucket];
if (progressForBucket) {
updatedProgress[data.bucket] = {
...progressForBucket,
sinceLast: progressForBucket.sinceLast + data.data.length
};
}
}

this.updateSyncStatus({
dataFlow: {
downloading: true
downloading: true,
downloadProgress: updatedProgress
}
});
await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] });
Expand Down Expand Up @@ -705,6 +725,30 @@ The next upload iteration will be delayed.`);
});
}

private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) {
const localProgress = await this.options.adapter.getBucketOperationProgress();
const progress: InternalProgressInformation = {};

for (const bucket of checkpoint.buckets) {
const savedProgress = localProgress[bucket.bucket];
progress[bucket.bucket] = {
// The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service
// will use by default.
priority: bucket.priority ?? 3,
atLast: savedProgress?.atLast ?? 0,
sinceLast: savedProgress?.sinceLast ?? 0,
targetCount: bucket.count ?? 0
};
}

this.updateSyncStatus({
dataFlow: {
downloading: true,
downloadProgress: progress
}
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;
Expand Down Expand Up @@ -739,6 +783,7 @@ The next upload iteration will be delayed.`);
lastSyncedAt: new Date(),
dataFlow: {
downloading: false,
downloadProgress: null,
downloadError: undefined
}
});
Expand All @@ -763,6 +808,7 @@ The next upload iteration will be delayed.`);
});

if (!this.syncStatus.isEqual(updatedStatus)) {
Object.assign(this.syncStatusOptions, options);
this.syncStatus = updatedStatus;
// Only trigger this is there was a change
this.iterateListeners((cb) => cb.statusChanged?.(updatedStatus));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export interface StreamingSyncCheckpointDiff {
last_op_id: OpId;
updated_buckets: BucketChecksum[];
removed_buckets: string[];
write_checkpoint: string;
write_checkpoint?: string;
};
}

Expand Down
104 changes: 104 additions & 0 deletions packages/common/src/db/crud/SyncProgress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import type { SyncStatus } from './SyncStatus.js';

// (bucket, progress) pairs
/** @internal */
export type InternalProgressInformation = Record<
string,
{
priority: number; // Priority of the associated buckets
atLast: number; // Total ops at last completed sync, or 0
sinceLast: number; // Total ops _since_ the last completed sync.
targetCount: number; // Total opcount for next checkpoint as indicated by service.
}
>;

/**
* @internal The priority used by the core extension to indicate that a full sync was completed.
*/
export const FULL_SYNC_PRIORITY = 2147483647;

/**
* Information about a progressing download made by the PowerSync SDK.
*
* To obtain these values, use {@link SyncProgress}, available through
* {@link SyncStatus#downloadProgress}.
*/
export interface ProgressWithOperations {
/**
* The total amount of operations to download for the current sync iteration
* to complete.
*/
totalOperations: number;
/**
* The amount of operations that have already been downloaded.
*/
downloadedOperations: number;

/**
* Relative progress, as {@link downloadedOperations} of {@link totalOperations}.
*
* This will be a number between `0.0` and `1.0`.
*/
downloadedFraction: number;
}

/**
* Provides realtime progress on how PowerSync is downloading rows.
*
* The progress until the next complete sync is available through the fields on {@link ProgressWithOperations},
* which this class implements.
* Additionally, the {@link SyncProgress.untilPriority} method can be used to otbain progress towards
* a specific priority (instead of the progress for the entire download).
*
* The reported progress always reflects the status towards th end of a sync iteration (after
* which a consistent snapshot of all buckets is available locally).
*
* In rare cases (in particular, when a [compacting](https://docs.powersync.com/usage/lifecycle-maintenance/compacting-buckets)
* operation takes place between syncs), it's possible for the returned numbers to be slightly
* inaccurate. For this reason, {@link SyncProgress} should be seen as an approximation of progress.
* The information returned is good enough to build progress bars, but not exact enough to track
* individual download counts.
*
* Also note that data is downloaded in bulk, which means that individual counters are unlikely
* to be updated one-by-one.
*/
export class SyncProgress implements ProgressWithOperations {

totalOperations: number;
downloadedOperations: number;
downloadedFraction: number;

constructor(protected internal: InternalProgressInformation) {
const untilCompletion = this.untilPriority(FULL_SYNC_PRIORITY);

this.totalOperations = untilCompletion.totalOperations;
this.downloadedOperations = untilCompletion.downloadedOperations;
this.downloadedFraction = untilCompletion.downloadedFraction;
}

/**
* Returns download progress towards all data up until the specified priority being received.
*
* The returned {@link ProgressWithOperations} tracks the target amount of operations that need
* to be downloaded in total and how many of them have already been received.
*/
untilPriority(priority: number): ProgressWithOperations {
let total = 0;
let downloaded = 0;

for (const progress of Object.values(this.internal)) {
// Include higher-priority buckets, which are represented by lower numbers.
if (progress.priority <= priority) {
downloaded += progress.sinceLast;
total += progress.targetCount - progress.atLast;
}
}

let progress = total == 0 ? 0.0 : downloaded / total;
return {
totalOperations: total,
downloadedOperations: downloaded,
downloadedFraction: progress
};
}
}
23 changes: 23 additions & 0 deletions packages/common/src/db/crud/SyncStatus.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { InternalProgressInformation, SyncProgress } from './SyncProgress.js';

export type SyncDataFlowStatus = Partial<{
downloading: boolean;
uploading: boolean;
Expand All @@ -12,6 +14,12 @@ export type SyncDataFlowStatus = Partial<{
* Cleared on the next successful upload.
*/
uploadError?: Error;
/**
* Internal information about how far we are downloading operations in buckets.
*
* Please use the {@link SyncStatus#downloadProgress} property to track sync progress.
*/
downloadProgress: InternalProgressInformation | null;
}>;

export interface SyncPriorityStatus {
Expand Down Expand Up @@ -104,6 +112,21 @@ export class SyncStatus {
return (this.options.priorityStatusEntries ?? []).slice().sort(SyncStatus.comparePriorities);
}

/**
* A realtime progress report on how many operations have been downloaded and
* how many are necessary in total to complete the next sync iteration.
*
* This field is only set when {@link SyncDataFlowStatus#downloading} is also true.
*/
get downloadProgress(): SyncProgress | null {
const internalProgress = this.options.dataFlow?.downloadProgress;
if (internalProgress == null) {
return null;
}

return new SyncProgress(internalProgress);
}

/**
* Reports the sync status (a pair of {@link SyncStatus#hasSynced} and {@link SyncStatus#lastSyncedAt} fields)
* for a specific bucket priority level.
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './client/sync/stream/AbstractStreamingSyncImplementation.js';
export * from './client/sync/stream/streaming-sync-types.js';
export { MAX_OP_ID } from './client/constants.js';

export { ProgressWithOperations, SyncProgress } from './db/crud/SyncProgress.js';
export * from './db/crud/SyncStatus.js';
export * from './db/crud/UploadQueueStatus.js';
export * from './db/schema/Schema.js';
Expand Down
Loading
Loading