Skip to content

Commit 699ae01

Browse files
committed
Ensure queued updates aren't skipped if occur quicker than ControlledExecutor can run them. Ensure resources and listeners are disposed.
1 parent f20f6c7 commit 699ae01

File tree

1 file changed

+76
-53
lines changed

1 file changed

+76
-53
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 76 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ import {
77
QueryResult,
88
Transaction,
99
UpdateNotification,
10+
convertToBatchedUpdateNotification,
11+
convertToUpdateNotifications,
1012
isBatchedUpdateNotification
1113
} from '../db/DBAdapter.js';
1214
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
1315
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
1416
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1517
import { Schema } from '../db/schema/Schema.js';
1618
import { BaseObserver } from '../utils/BaseObserver.js';
17-
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
18-
import { throttleTrailing } from '../utils/async.js';
19+
import { DisposeManager } from '../utils/DisposeManager.js';
1920
import { mutexRunExclusive } from '../utils/mutex.js';
2021
import { ConnectionManager } from './ConnectionManager.js';
2122
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
@@ -86,7 +87,8 @@ export interface SQLWatchOptions {
8687
}
8788

8889
export interface WatchOnChangeEvent {
89-
changedTables: string[];
90+
changedTables: string[]; // kept for backwards compatibility
91+
update: BatchedUpdateNotification;
9092
}
9193

9294
export interface WatchHandler {
@@ -1038,7 +1040,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
10381040
* @returns A dispose function to stop watching for changes
10391041
*/
10401042
onChangeWithCallback(handler?: WatchOnChangeHandler, options?: SQLWatchOptions): () => void {
1041-
const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
1043+
const { onChange, onError = (error: Error) => this.options.logger?.error(error) } = handler ?? {};
10421044
if (!onChange) {
10431045
throw new Error('onChange is required');
10441046
}
@@ -1047,40 +1049,88 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
10471049
const watchedTables = new Set<string>(
10481050
(resolvedOptions?.tables ?? []).flatMap((table) => [table, `ps_data__${table}`, `ps_data_local__${table}`])
10491051
);
1050-
1051-
const changedTables = new Set<string>();
1052+
const updatedTables = new Array<UpdateNotification>();
10521053
const throttleMs = resolvedOptions.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS;
10531054

1054-
const executor = new ControlledExecutor(async (e: WatchOnChangeEvent) => {
1055-
await onChange(e);
1056-
});
1055+
const disposeManager = new DisposeManager();
10571056

1058-
const flushTableUpdates = throttleTrailing(
1059-
() =>
1060-
this.handleTableChanges(changedTables, watchedTables, (intersection) => {
1061-
if (resolvedOptions?.signal?.aborted) return;
1062-
executor.schedule({ changedTables: intersection });
1063-
}),
1064-
throttleMs
1065-
);
1057+
const dispose = () => disposeManager.dispose();
1058+
1059+
if (resolvedOptions.signal?.aborted || this.closed) {
1060+
return dispose;
1061+
}
1062+
1063+
// Periodically flush the accumulated updates from the db listener.
1064+
let isFlushing = false;
1065+
const flushIntervalId = setInterval(async () => {
1066+
// Skip if we're already flushing.
1067+
// Will retry in the next interval.
1068+
if (isFlushing) {
1069+
return;
1070+
}
1071+
try {
1072+
// Prevent concurrent flushes.
1073+
isFlushing = true;
1074+
await flushTableUpdates();
1075+
} catch (error) {
1076+
onError?.(error);
1077+
} finally {
1078+
// Allow future flush attempts.
1079+
isFlushing = false;
1080+
}
1081+
}, throttleMs);
1082+
1083+
const flushTableUpdates = async () => {
1084+
// Get snapshot of the updated tables to avoid race conditions
1085+
// between async operations here and the listener that adds updates.
1086+
const updatesToFlush = [...updatedTables];
1087+
// Reset the queue to begin collecting new updates by the listener.
1088+
updatedTables.length = 0;
1089+
// Skip if we're already disposed.
1090+
if (disposeManager.isDisposed()) {
1091+
return;
1092+
}
1093+
// Dispose then skip if we're closed.
1094+
if (this.closed) {
1095+
disposeManager.dispose();
1096+
return;
1097+
}
1098+
// Broadcast the updates.
1099+
const update = convertToBatchedUpdateNotification(updatesToFlush);
1100+
if (update.tables.length > 0) {
1101+
await onChange({ changedTables: update.tables, update });
1102+
}
1103+
};
10661104

1067-
const dispose = this.database.registerListener({
1068-
tablesUpdated: async (update) => {
1105+
const disposeListener = this.database.registerListener({
1106+
tablesUpdated: (update) => {
10691107
try {
1070-
this.processTableUpdates(update, changedTables);
1071-
flushTableUpdates();
1108+
if (isBatchedUpdateNotification(update)) {
1109+
const rawUpdates = convertToUpdateNotifications(update);
1110+
for (const rawUpdate of rawUpdates) {
1111+
if (watchedTables.has(rawUpdate.table)) {
1112+
updatedTables.push(rawUpdate);
1113+
}
1114+
}
1115+
} else {
1116+
if (watchedTables.has(update.table)) {
1117+
updatedTables.push(update);
1118+
}
1119+
}
10721120
} catch (error) {
10731121
onError?.(error);
10741122
}
10751123
}
10761124
});
10771125

1078-
resolvedOptions.signal?.addEventListener('abort', () => {
1079-
executor.dispose();
1080-
dispose();
1081-
});
1126+
disposeManager.add(() => disposeListener());
1127+
disposeManager.add(() => clearInterval(flushIntervalId));
1128+
1129+
if (resolvedOptions.signal) {
1130+
disposeManager.disposeOnAbort(resolvedOptions.signal);
1131+
}
10821132

1083-
return () => dispose();
1133+
return dispose;
10841134
}
10851135

10861136
/**
@@ -1119,33 +1169,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
11191169
});
11201170
}
11211171

1122-
private handleTableChanges(
1123-
changedTables: Set<string>,
1124-
watchedTables: Set<string>,
1125-
onDetectedChanges: (changedTables: string[]) => void
1126-
): void {
1127-
if (changedTables.size > 0) {
1128-
const intersection = Array.from(changedTables.values()).filter((change) => watchedTables.has(change));
1129-
if (intersection.length) {
1130-
onDetectedChanges(intersection);
1131-
}
1132-
}
1133-
changedTables.clear();
1134-
}
1135-
1136-
private processTableUpdates(
1137-
updateNotification: BatchedUpdateNotification | UpdateNotification,
1138-
changedTables: Set<string>
1139-
): void {
1140-
const tables = isBatchedUpdateNotification(updateNotification)
1141-
? updateNotification.tables
1142-
: [updateNotification.table];
1143-
1144-
for (const table of tables) {
1145-
changedTables.add(table);
1146-
}
1147-
}
1148-
11491172
/**
11501173
* @ignore
11511174
*/

0 commit comments

Comments
 (0)