Skip to content

Commit fca98b5

Browse files
Merge branch 'main' into probes
2 parents 1c2f908 + 05b9593 commit fca98b5

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

.changeset/sixty-melons-bow.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-core-tests': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
[Postgres Storage] Fix op_id_sequence initialization edge case

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ export class PostgresSyncRulesStorage
336336

337337
await callback(batch);
338338
await batch.flush();
339-
if (batch.last_flushed_op) {
339+
if (batch.last_flushed_op != null) {
340340
return { flushed_op: batch.last_flushed_op };
341341
} else {
342342
return null;

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,7 @@ export class PostgresBucketBatch
208208
return null;
209209
}
210210

211-
const currentSequence = await this.db.sql`
212-
SELECT
213-
LAST_VALUE AS value
214-
FROM
215-
op_id_sequence;
216-
`.first<{ value: bigint }>();
217-
return currentSequence!.value;
211+
return this.getLastOpIdSequence(this.db);
218212
}
219213

220214
async drop(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
@@ -262,13 +256,7 @@ export class PostgresBucketBatch
262256
const lastOp = await this.withReplicationTransaction(async (db) => {
263257
resumeBatch = await this.replicateBatch(db, batch);
264258

265-
const sequence = await db.sql`
266-
SELECT
267-
LAST_VALUE AS value
268-
FROM
269-
op_id_sequence;
270-
`.first<{ value: bigint }>();
271-
return sequence!.value;
259+
return this.getLastOpIdSequence(db);
272260
});
273261

274262
// null if done, set if we need another flush
@@ -895,6 +883,23 @@ export class PostgresBucketBatch
895883
`.execute();
896884
}
897885
}
886+
887+
private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
888+
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
889+
// To cater for this case, we check is_called, and default to 0 if no value has been generated.
890+
const sequence = await db.sql`
891+
SELECT
892+
(
893+
CASE
894+
WHEN is_called THEN last_value
895+
ELSE 0
896+
END
897+
) AS value
898+
FROM
899+
op_id_sequence;
900+
`.first<{ value: bigint }>();
901+
return sequence!.value;
902+
}
898903
}
899904

900905
/**

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1878,4 +1878,54 @@ bucket_definitions:
18781878
}
18791879
});
18801880
});
1881+
1882+
test('op_id initialization edge case', async () => {
1883+
// Test syncing a batch of data that is small in count,
1884+
// but large enough in size to be split over multiple returned chunks.
1885+
// Similar to the above test, but splits over 1MB chunks.
1886+
const sync_rules = test_utils.testRules(
1887+
`
1888+
bucket_definitions:
1889+
global:
1890+
data:
1891+
- SELECT id FROM test
1892+
- SELECT id FROM test_ignore WHERE false
1893+
`
1894+
);
1895+
await using factory = await generateStorageFactory();
1896+
const bucketStorage = factory.getInstance(sync_rules);
1897+
1898+
const sourceTable = test_utils.makeTestTable('test', ['id']);
1899+
const sourceTableIgnore = test_utils.makeTestTable('test_ignore', ['id']);
1900+
1901+
const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1902+
// This saves a record to current_data, but not bucket_data.
1903+
// This causes a checkpoint to be created without increasing the op_id sequence.
1904+
await batch.save({
1905+
sourceTable: sourceTableIgnore,
1906+
tag: storage.SaveOperationTag.INSERT,
1907+
after: {
1908+
id: 'test1'
1909+
},
1910+
afterReplicaId: test_utils.rid('test1')
1911+
});
1912+
});
1913+
1914+
const checkpoint1 = result1!.flushed_op;
1915+
1916+
const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
1917+
await batch.save({
1918+
sourceTable: sourceTable,
1919+
tag: storage.SaveOperationTag.INSERT,
1920+
after: {
1921+
id: 'test2'
1922+
},
1923+
afterReplicaId: test_utils.rid('test2')
1924+
});
1925+
});
1926+
1927+
const checkpoint2 = result2!.flushed_op;
1928+
// we expect 0n and 1n, or 1n and 2n.
1929+
expect(checkpoint2).toBeGreaterThan(checkpoint1);
1930+
});
18811931
}

0 commit comments

Comments
 (0)