Skip to content

Commit f07274f

Browse files
authored
Merge pull request #616 from powersync-ja/fix-progress-around-compaction
Fix reported progress around compaction
2 parents 658e439 + b046ebe commit f07274f

File tree

3 files changed

+167
-56
lines changed

3 files changed

+167
-56
lines changed

.changeset/poor-donuts-call.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix reported progress around compactions / defrags on the sync service.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,10 @@ The next upload iteration will be delayed.`);
666666
if (progressForBucket) {
667667
updatedProgress[data.bucket] = {
668668
...progressForBucket,
669-
sinceLast: progressForBucket.sinceLast + data.data.length
669+
sinceLast: Math.min(
670+
progressForBucket.sinceLast + data.data.length,
671+
progressForBucket.targetCount - progressForBucket.atLast
672+
)
670673
};
671674
}
672675
}
@@ -730,17 +733,36 @@ The next upload iteration will be delayed.`);
730733
private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) {
731734
const localProgress = await this.options.adapter.getBucketOperationProgress();
732735
const progress: InternalProgressInformation = {};
736+
let invalidated = false;
733737

734738
for (const bucket of checkpoint.buckets) {
735739
const savedProgress = localProgress[bucket.bucket];
740+
const atLast = savedProgress?.atLast ?? 0;
741+
const sinceLast = savedProgress?.sinceLast ?? 0;
742+
736743
progress[bucket.bucket] = {
737744
// The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service
738745
// will use by default.
739746
priority: bucket.priority ?? 3,
740-
atLast: savedProgress?.atLast ?? 0,
741-
sinceLast: savedProgress?.sinceLast ?? 0,
747+
atLast: atLast,
748+
sinceLast: sinceLast,
742749
targetCount: bucket.count ?? 0
743750
};
751+
752+
if (bucket.count != null && bucket.count < atLast + sinceLast) {
753+
// Either due to a defrag / sync rule deploy or a compaction operation, the size
754+
// of the bucket shrank so much that the local ops exceed the ops in the updated
755+
// bucket. We can't prossibly report progress in this case (it would overshoot 100%).
756+
invalidated = true;
757+
}
758+
}
759+
760+
if (invalidated) {
761+
for (const bucket in progress) {
762+
const bucketProgress = progress[bucket];
763+
bucketProgress.atLast = 0;
764+
bucketProgress.sinceLast = 0;
765+
}
744766
}
745767

746768
this.updateSyncStatus({

packages/node/tests/sync.test.ts

Lines changed: 137 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -169,77 +169,124 @@ describe('Sync', () => {
169169
});
170170

171171
mockSyncServiceTest('different priorities', async ({ syncService }) => {
172-
let database = await syncService.createDatabase();
173-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
174-
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
175-
176-
syncService.pushLine({
177-
checkpoint: {
178-
last_op_id: '10',
179-
buckets: [
180-
bucket('a', 5, {priority: 0}),
181-
bucket('b', 5, {priority: 2}),
182-
]
183-
}
184-
});
185-
186-
// Should be at 0/10 for total progress (which is the same as the progress for prio 2), and a 0/5 towards prio 0.
187-
await waitForProgress(database, [0, 10], [[0, [0, 5]], [2, [0, 10]]]);
188-
189-
pushDataLine(syncService, 'a', 5);
190-
await waitForProgress(database, [5, 10], [[0, [5, 5]], [2, [5, 10]]]);
191-
192-
pushCheckpointComplete(syncService, 0);
193-
await waitForProgress(database, [5, 10], [[0, [5, 5]], [2, [5, 10]]]);
172+
let database = await syncService.createDatabase();
173+
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
174+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
194175

195-
pushDataLine(syncService, 'b', 2);
196-
await waitForProgress(database, [7, 10], [[0, [5, 5]], [2, [7, 10]]]);
176+
syncService.pushLine({
177+
checkpoint: {
178+
last_op_id: '10',
179+
buckets: [bucket('a', 5, { priority: 0 }), bucket('b', 5, { priority: 2 })]
180+
}
181+
});
197182

198-
// Before syncing b fully, send a new checkpoint
199-
syncService.pushLine({
200-
checkpoint: {
201-
last_op_id: '14',
202-
buckets: [
203-
bucket('a', 8, {priority: 0}),
204-
bucket('b', 6, {priority: 2}),
205-
]
206-
}
207-
});
208-
await waitForProgress(database, [7, 14], [[0, [5, 8]], [2, [7, 14]]]);
183+
// Should be at 0/10 for total progress (which is the same as the progress for prio 2), and a 0/5 towards prio 0.
184+
await waitForProgress(
185+
database,
186+
[0, 10],
187+
[
188+
[0, [0, 5]],
189+
[2, [0, 10]]
190+
]
191+
);
209192

210-
pushDataLine(syncService, 'a', 3);
211-
await waitForProgress(database, [10, 14], [[0, [8, 8]], [2, [10, 14]]]);
193+
pushDataLine(syncService, 'a', 5);
194+
await waitForProgress(
195+
database,
196+
[5, 10],
197+
[
198+
[0, [5, 5]],
199+
[2, [5, 10]]
200+
]
201+
);
212202

213-
pushCheckpointComplete(syncService, 0);
214-
await waitForProgress(database, [10, 14], [[0, [8, 8]], [2, [10, 14]]]);
203+
pushCheckpointComplete(syncService, 0);
204+
await waitForProgress(
205+
database,
206+
[5, 10],
207+
[
208+
[0, [5, 5]],
209+
[2, [5, 10]]
210+
]
211+
);
212+
213+
pushDataLine(syncService, 'b', 2);
214+
await waitForProgress(
215+
database,
216+
[7, 10],
217+
[
218+
[0, [5, 5]],
219+
[2, [7, 10]]
220+
]
221+
);
222+
223+
// Before syncing b fully, send a new checkpoint
224+
syncService.pushLine({
225+
checkpoint: {
226+
last_op_id: '14',
227+
buckets: [bucket('a', 8, { priority: 0 }), bucket('b', 6, { priority: 2 })]
228+
}
229+
});
230+
await waitForProgress(
231+
database,
232+
[7, 14],
233+
[
234+
[0, [5, 8]],
235+
[2, [7, 14]]
236+
]
237+
);
238+
239+
pushDataLine(syncService, 'a', 3);
240+
await waitForProgress(
241+
database,
242+
[10, 14],
243+
[
244+
[0, [8, 8]],
245+
[2, [10, 14]]
246+
]
247+
);
215248

216-
pushDataLine(syncService, 'b', 4);
217-
await waitForProgress(database, [14, 14], [[0, [8, 8]], [2, [14, 14]]]);
249+
pushCheckpointComplete(syncService, 0);
250+
await waitForProgress(
251+
database,
252+
[10, 14],
253+
[
254+
[0, [8, 8]],
255+
[2, [10, 14]]
256+
]
257+
);
258+
259+
pushDataLine(syncService, 'b', 4);
260+
await waitForProgress(
261+
database,
262+
[14, 14],
263+
[
264+
[0, [8, 8]],
265+
[2, [14, 14]]
266+
]
267+
);
218268

219-
pushCheckpointComplete(syncService);
220-
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
269+
pushCheckpointComplete(syncService);
270+
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
221271
});
222272

223-
mockSyncServiceTest('uses correct state when reconnecting', async ({syncService}) => {
273+
mockSyncServiceTest('uses correct state when reconnecting', async ({ syncService }) => {
224274
let database = await syncService.createDatabase();
225275
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
226276
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
227277

228278
syncService.pushLine({
229279
checkpoint: {
230280
last_op_id: '10',
231-
buckets: [
232-
bucket('a', 5, {priority: 0}),
233-
bucket('b', 5, {priority: 3}),
234-
]
281+
buckets: [bucket('a', 5, { priority: 0 }), bucket('b', 5, { priority: 3 })]
235282
}
236283
});
237284

238285
// Sync priority 0 completely, start with rest
239286
pushDataLine(syncService, 'a', 5);
240287
pushDataLine(syncService, 'b', 1);
241288
pushCheckpointComplete(syncService, 0);
242-
await database.waitForFirstSync({priority: 0});
289+
await database.waitForFirstSync({ priority: 0 });
243290

244291
await database.close();
245292
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
@@ -248,19 +295,56 @@ describe('Sync', () => {
248295
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
249296

250297
expect(syncService.connectedListeners[0].buckets).toStrictEqual([
251-
{"name": "a", "after": "10"},
252-
{"name": "b", "after": "6"},
298+
{ name: 'a', after: '10' },
299+
{ name: 'b', after: '6' }
253300
]);
254301
});
302+
303+
mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => {
304+
let database = await syncService.createDatabase();
305+
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
306+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
307+
308+
syncService.pushLine({
309+
checkpoint: {
310+
last_op_id: '10',
311+
buckets: [bucket('a', 10)]
312+
}
313+
});
314+
315+
await waitForProgress(database, [0, 10]);
316+
pushDataLine(syncService, 'a', 5);
317+
await waitForProgress(database, [5, 10]);
318+
319+
// Re-open database
320+
await database.close();
321+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
322+
database = await syncService.createDatabase();
323+
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
324+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
325+
326+
// A sync rule deploy could reset buckets, making the new bucket smaller than the existing one.
327+
syncService.pushLine({
328+
checkpoint: {
329+
last_op_id: '14',
330+
buckets: [bucket('a', 4)]
331+
}
332+
});
333+
334+
// In this special case, don't report 5/4 as progress.
335+
await waitForProgress(database, [0, 4]);
336+
pushCheckpointComplete(syncService);
337+
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
338+
});
255339
});
256340
});
257341

258-
function bucket(name: string, count: number, options: {priority: number} = {priority: 3}): BucketChecksum {
342+
function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {
259343
return {
260344
bucket: name,
261345
count,
262346
checksum: 0,
263-
priority: options.priority,
347+
priority: options.priority
264348
};
265349
}
266350

0 commit comments

Comments
 (0)