Skip to content

Commit fa64ce5

Browse files
committed
Emit the data for the BatchedUpdateNotification event that's available in the updateHook rather than emitting a partial object.
1 parent 699ae01 commit fa64ce5

File tree

4 files changed

+76
-30
lines changed

4 files changed

+76
-30
lines changed

packages/node/src/db/AsyncDatabase.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { QueryResult } from '@powersync/common';
1+
import { BatchedUpdateNotification, QueryResult } from '@powersync/common';
22

33
export type ProxiedQueryResult = Omit<QueryResult, 'rows'> & {
44
rows?: {
@@ -20,5 +20,5 @@ export interface AsyncDatabase {
2020
// This happens on the worker because we otherwise get race conditions when wrapping
2121
// callbacks to invoke on the main thread (we need a guarantee that collectCommittedUpdates
2222
// contains entries immediately after calling COMMIT).
23-
collectCommittedUpdates: () => Promise<string[]>;
23+
collectCommittedUpdates: () => Promise<BatchedUpdateNotification>;
2424
}

packages/node/src/db/BetterSQLite3DBAdapter.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,8 @@ export class BetterSQLite3DBAdapter extends BaseObserver<DBAdapterListener> impl
190190
} finally {
191191
const updates = await this.writeConnection.database.collectCommittedUpdates();
192192

193-
if (updates.length > 0) {
194-
const event: BatchedUpdateNotification = {
195-
tables: updates,
196-
groupedUpdates: {},
197-
rawUpdates: []
198-
};
199-
this.iterateListeners((cb) => cb.tablesUpdated?.(event));
193+
if (updates.tables.length > 0) {
194+
this.iterateListeners((cb) => cb.tablesUpdated?.(updates));
200195
}
201196
}
202197
} finally {

packages/node/src/db/SqliteWorker.ts

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as path from 'node:path';
22
import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3';
3+
import { BatchedUpdateNotification, RowUpdateType, TableUpdateOperation, UpdateNotification } from '@powersync/common';
34
import * as Comlink from 'comlink';
45
import { parentPort, threadId } from 'node:worker_threads';
56
import OS from 'node:os';
@@ -9,36 +10,69 @@ import { AsyncDatabase, AsyncDatabaseOpener } from './AsyncDatabase.js';
910
class BlockingAsyncDatabase implements AsyncDatabase {
1011
private readonly db: Database;
1112

12-
private readonly uncommittedUpdatedTables = new Set<string>();
13-
private readonly committedUpdatedTables = new Set<string>();
13+
private readonly uncommittedUpdates = new Array<UpdateNotification>();
14+
private readonly committedUpdates = new Array<UpdateNotification>();
1415

1516
constructor(db: Database) {
1617
this.db = db;
1718

1819
db.function('node_thread_id', () => threadId);
1920
}
2021

21-
collectCommittedUpdates() {
22-
const resolved = Promise.resolve([...this.committedUpdatedTables]);
23-
this.committedUpdatedTables.clear();
24-
return resolved;
22+
async collectCommittedUpdates() {
23+
const rawUpdates: UpdateNotification[] = [];
24+
const groupedUpdates: Record<string, TableUpdateOperation[]> = {};
25+
26+
for (const rawUpdate of this.committedUpdates) {
27+
rawUpdates.push(rawUpdate);
28+
groupedUpdates[rawUpdate.table] ??= [];
29+
groupedUpdates[rawUpdate.table].push(rawUpdate);
30+
}
31+
32+
const result: BatchedUpdateNotification = {
33+
tables: Object.keys(groupedUpdates),
34+
rawUpdates,
35+
groupedUpdates
36+
};
37+
38+
this.committedUpdates.length = 0;
39+
40+
return result;
2541
}
2642

2743
installUpdateHooks() {
28-
this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => {
29-
this.uncommittedUpdatedTables.add(tableName);
30-
});
44+
this.db.updateHook(
45+
(
46+
operation: 'SQLITE_INSERT' | 'SQLITE_UPDATE' | 'SQLITE_DELETE',
47+
_dbName: string,
48+
table: string,
49+
rowId: number
50+
) => {
51+
let opType: RowUpdateType;
52+
switch (operation) {
53+
case 'SQLITE_INSERT':
54+
opType = RowUpdateType.SQLITE_INSERT;
55+
break;
56+
case 'SQLITE_UPDATE':
57+
opType = RowUpdateType.SQLITE_UPDATE;
58+
break;
59+
case 'SQLITE_DELETE':
60+
opType = RowUpdateType.SQLITE_DELETE;
61+
break;
62+
}
3163

32-
this.db.commitHook(() => {
33-
for (const tableName of this.uncommittedUpdatedTables) {
34-
this.committedUpdatedTables.add(tableName);
64+
this.uncommittedUpdates.push({ table, opType, rowId });
3565
}
36-
this.uncommittedUpdatedTables.clear();
66+
);
67+
68+
this.db.commitHook(() => {
69+
this.committedUpdates.push(...this.uncommittedUpdates);
70+
this.uncommittedUpdates.length = 0;
3771
return true;
3872
});
3973

4074
this.db.rollbackHook(() => {
41-
this.uncommittedUpdatedTables.clear();
75+
this.uncommittedUpdates.length = 0;
4276
});
4377
}
4478

packages/node/tests/PowerSyncDatabase.test.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import * as path from 'node:path';
22
import { Worker } from 'node:worker_threads';
3-
43
import { vi, expect, test } from 'vitest';
54
import { AppSchema, databaseTest, tempDirectoryTest } from './utils';
6-
import { PowerSyncDatabase } from '../lib';
5+
import { PowerSyncDatabase, RowUpdateType, WatchOnChangeEvent } from '../lib';
76
import { WorkerOpener } from '../lib/db/options';
87

98
test('validates options', async () => {
@@ -65,28 +64,46 @@ databaseTest('runs queries on multiple threads', async ({ database }) => {
6564
databaseTest('can watch tables', async ({ database }) => {
6665
const fn = vi.fn();
6766
const disposeWatch = database.onChangeWithCallback(
68-
{
69-
onChange: () => {
70-
fn();
71-
}
72-
},
67+
{ onChange: (event: WatchOnChangeEvent) => fn(event) },
7368
{ tables: ['todos'], throttleMs: 0 }
7469
);
7570

7671
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['first']);
7772
await expect.poll(() => fn).toHaveBeenCalledOnce();
73+
expect(fn).toHaveBeenNthCalledWith(1, {
74+
changedTables: ['ps_data__todos'],
75+
update: {
76+
tables: ['ps_data__todos'],
77+
rawUpdates: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 1n }],
78+
groupedUpdates: {
79+
ps_data__todos: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 1n }]
80+
}
81+
}
82+
});
7883

7984
await database.writeTransaction(async (tx) => {
8085
await tx.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['second']);
8186
});
8287
await expect.poll(() => fn).toHaveBeenCalledTimes(2);
88+
expect(fn).toHaveBeenNthCalledWith(2, {
89+
changedTables: ['ps_data__todos'],
90+
update: {
91+
tables: ['ps_data__todos'],
92+
rawUpdates: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 2n }],
93+
groupedUpdates: {
94+
ps_data__todos: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 2n }]
95+
}
96+
}
97+
});
8398

99+
// Assert that rolled back changes aren't emitted
84100
await database.writeTransaction(async (tx) => {
85101
await tx.execute('DELETE FROM todos;');
86102
await tx.rollback();
87103
});
88104
await expect.poll(() => fn).toHaveBeenCalledTimes(2);
89105

106+
// Assert that unwatched changes aren't emitted
90107
disposeWatch();
91108
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']);
92109
await expect.poll(() => fn).toHaveBeenCalledTimes(2);

0 commit comments

Comments
 (0)