diff --git a/.changeset/giant-camels-speak.md b/.changeset/giant-camels-speak.md new file mode 100644 index 000000000..ebbc2db68 --- /dev/null +++ b/.changeset/giant-camels-speak.md @@ -0,0 +1,7 @@ +--- +'@powersync/common': minor +'@powersync/node': minor +'@powersync/web': minor +--- + +PowerSyncDatabase.onChange does a best effort to provide the table name, DML operation, and rowid in the change event. Previously, only table names were emitted. diff --git a/.changeset/moody-hats-sleep.md b/.changeset/moody-hats-sleep.md new file mode 100644 index 000000000..269fe0731 --- /dev/null +++ b/.changeset/moody-hats-sleep.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Use addEventListener instead of overwriting the onabort property, preventing interference with outside users also setting the property on the same signal. Remove event listener when race settles to avoid memory leak. diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 590747969..a7d707071 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -7,6 +7,8 @@ import { QueryResult, Transaction, UpdateNotification, + convertToBatchedUpdateNotification, + convertToUpdateNotifications, isBatchedUpdateNotification } from '../db/DBAdapter.js'; import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; @@ -14,8 +16,7 @@ import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js'; import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; -import { ControlledExecutor } from '../utils/ControlledExecutor.js'; -import { throttleTrailing } from '../utils/async.js'; +import { DisposeManager } from '../utils/DisposeManager.js'; import { mutexRunExclusive } from '../utils/mutex.js'; import { ConnectionManager } from './ConnectionManager.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; @@ -34,6 +35,7 @@ import { type PowerSyncConnectionOptions, type RequiredAdditionalConnectionOptions } from './sync/stream/AbstractStreamingSyncImplementation.js'; +import { sleep } from '../utils/async.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -86,7 +88,8 @@ export interface SQLWatchOptions { } export interface WatchOnChangeEvent { - changedTables: string[]; + changedTables: string[]; // kept for backwards compatibility + update: BatchedUpdateNotification; } export interface WatchHandler { @@ -1038,7 +1041,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver void { - const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {}; + const { onChange, onError = (error: Error) => this.options.logger?.error(error) } = handler ?? {}; if (!onChange) { throw new Error('onChange is required'); } @@ -1047,40 +1050,102 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver( (resolvedOptions?.tables ?? []).flatMap((table) => [table, `ps_data__${table}`, `ps_data_local__${table}`]) ); - - const changedTables = new Set(); + const updatedTables = new Array(); const throttleMs = resolvedOptions.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS; - const executor = new ControlledExecutor(async (e: WatchOnChangeEvent) => { - await onChange(e); - }); + const disposeManager = new DisposeManager(); - const flushTableUpdates = throttleTrailing( - () => - this.handleTableChanges(changedTables, watchedTables, (intersection) => { - if (resolvedOptions?.signal?.aborted) return; - executor.schedule({ changedTables: intersection }); - }), - throttleMs - ); + const dispose = () => disposeManager.dispose(); + + if (resolvedOptions.signal?.aborted || this.closed) { + return dispose; + } - const dispose = this.database.registerListener({ - tablesUpdated: async (update) => { + let isFlushing = false; + let lastFlushTime = 0; + + // Don't flush more often than the throttle interval. + const throttleFlush = async () => { + const timeSinceLastFlush = Date.now() - lastFlushTime; + if (timeSinceLastFlush < throttleMs) { + await sleep(throttleMs - timeSinceLastFlush); + } + lastFlushTime = Date.now(); + }; + + // Periodically flush the accumulated updates from the db listener. + const triggerFlush = async () => { + // Skip if we're already flushing. + // Will retry when more updates arrive. + if (isFlushing) { + return; + } + try { + isFlushing = true; + // Keep flushing until no more changes are pending + while (updatedTables.length > 0) { + await throttleFlush(); + await executeFlush(); + } + } catch (error) { + onError?.(error); + } finally { + // Allow future flush attempts. + isFlushing = false; + } + }; + + const executeFlush = async () => { + // Get snapshot of the updated tables to avoid race conditions + // between async operations here and the listener that adds updates. + const updatesToFlush = [...updatedTables]; + // Reset the queue to begin collecting new updates by the listener. + updatedTables.length = 0; + // Skip if we're already disposed. + if (disposeManager.isDisposed()) { + return; + } + // Dispose then skip if we're closed. + if (this.closed) { + disposeManager.dispose(); + return; + } + // Broadcast the updates. + const update = convertToBatchedUpdateNotification(updatesToFlush); + if (update.tables.length > 0) { + await onChange({ changedTables: update.tables, update }); + } + }; + + const disposeListener = this.database.registerListener({ + tablesUpdated: (update) => { try { - this.processTableUpdates(update, changedTables); - flushTableUpdates(); + if (isBatchedUpdateNotification(update)) { + const rawUpdates = convertToUpdateNotifications(update); + for (const rawUpdate of rawUpdates) { + if (watchedTables.has(rawUpdate.table)) { + updatedTables.push(rawUpdate); + } + } + } else { + if (watchedTables.has(update.table)) { + updatedTables.push(update); + } + } + triggerFlush(); } catch (error) { onError?.(error); } } }); - resolvedOptions.signal?.addEventListener('abort', () => { - executor.dispose(); - dispose(); - }); + disposeManager.add(() => disposeListener()); + + if (resolvedOptions.signal) { + disposeManager.disposeOnAbort(resolvedOptions.signal); + } - return () => dispose(); + return dispose; } /** @@ -1119,33 +1184,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver, - watchedTables: Set, - onDetectedChanges: (changedTables: string[]) => void - ): void { - if (changedTables.size > 0) { - const intersection = Array.from(changedTables.values()).filter((change) => watchedTables.has(change)); - if (intersection.length) { - onDetectedChanges(intersection); - } - } - changedTables.clear(); - } - - private processTableUpdates( - updateNotification: BatchedUpdateNotification | UpdateNotification, - changedTables: Set - ): void { - const tables = isBatchedUpdateNotification(updateNotification) - ? updateNotification.tables - : [updateNotification.table]; - - for (const table of tables) { - changedTables.add(table); - } - } - /** * @ignore */ diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 85d493ef5..f66f02c60 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru import * as sync_status from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; -import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js'; +import { resolveEarlyOnAbort, throttleLeadingTrailing } from '../../../utils/async.js'; import { BucketChecksum, BucketDescription, @@ -1062,7 +1062,7 @@ The next upload iteration will be delayed.`); }); } - private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) { + private async applyCheckpoint(checkpoint: Checkpoint, signal: AbortSignal) { let result = await this.options.adapter.syncLocalDatabase(checkpoint); const pending = this.pendingCrudUpload; @@ -1079,9 +1079,9 @@ The next upload iteration will be delayed.`); this.logger.debug( 'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.' ); - await Promise.race([pending, onAbortPromise(abort)]); + await resolveEarlyOnAbort(pending, signal); - if (abort.aborted) { + if (signal.aborted) { return { applied: false, endIteration: true }; } diff --git a/packages/common/src/db/DBAdapter.ts b/packages/common/src/db/DBAdapter.ts index 580603c4a..2d3a7382a 100644 --- a/packages/common/src/db/DBAdapter.ts +++ b/packages/common/src/db/DBAdapter.ts @@ -76,10 +76,12 @@ export enum RowUpdateType { SQLITE_DELETE = 9, SQLITE_UPDATE = 23 } + export interface TableUpdateOperation { opType: RowUpdateType; rowId: number; } + /** * Notification of an update to one or more tables, for the purpose of realtime change notifications. */ @@ -129,6 +131,40 @@ export function isBatchedUpdateNotification( return 'tables' in update; } +export function convertToBatchedUpdateNotification(updates: UpdateNotification[]): BatchedUpdateNotification { + const groupedUpdates: BatchedUpdateNotification['groupedUpdates'] = {}; + + for (const update of updates) { + groupedUpdates[update.table] ??= []; + groupedUpdates[update.table].push(update); + } + + return { + tables: Object.keys(groupedUpdates), + rawUpdates: updates, + groupedUpdates + }; +} + +export function convertToUpdateNotifications(update: BatchedUpdateNotification): UpdateNotification[] { + // Not all implementations emit a complete batched update. + // Some only emit the table names, or not even that. + if (update.rawUpdates?.length) { + return update.rawUpdates; + } + if (Object.keys(update.groupedUpdates ?? {}).length) { + return Object.entries(update.groupedUpdates).flatMap(([table, updates]) => + updates.map((update) => ({ ...update, table })) + ); + } + if (update.tables?.length) { + return update.tables.map((table) => { + return { table } as unknown as UpdateNotification; + }); + } + return []; +} + export function extractTableUpdates(update: BatchedUpdateNotification | UpdateNotification) { return isBatchedUpdateNotification(update) ? update.tables : [update.table]; } diff --git a/packages/common/src/utils/DisposeManager.ts b/packages/common/src/utils/DisposeManager.ts new file mode 100644 index 000000000..e32dfa442 --- /dev/null +++ b/packages/common/src/utils/DisposeManager.ts @@ -0,0 +1,107 @@ +export type Disposer = () => unknown | PromiseLike; + +export interface DisposeManagerOptions { + /** + * Initial set of disposers to add to the manager. + * Add more disposers later using {@link DisposeManager.add}. + */ + disposers: Array; +} + +/** + * Use a DisposeManager to centralize the management of disposers. + * Add one or more disposers to the manager, and use the manager's + * dispose function to invoke them all in order. + * + * Once disposed, trying to dispose again is a no-op. + */ +export class DisposeManager { + private disposed: boolean; + private disposers: Array; + + constructor(options?: DisposeManagerOptions) { + this.disposed = false; + this.disposers = [...(options?.disposers ?? [])]; + } + + public isDisposed(): boolean { + return this.disposed; + } + + /** + * Add a callback to be invoked when the manager is disposed. + */ + public add(disposer: Disposer): void { + if (this.disposed || !disposer) { + return; + } + this.disposers.push(disposer); + } + + /** + * Add one or more signals that when any abort then disposes the manager. + */ + public disposeOnAbort(signal: AbortSignal): void { + if (this.disposed || !signal) { + return; + } + + if (signal.aborted) { + this.dispose(); + return; + } + + const dispose = () => this.dispose(); + + signal.addEventListener('abort', dispose, { once: true }); + + // Add cleanup for the event listener itself to avoid memory leaks + this.add(() => { + signal.removeEventListener('abort', dispose); + }); + } + + /** + * Invokes each disposer in order. + * If any are async then they are not awaited. + */ + public dispose(): void { + return this.disposeInternal('sync'); + } + + /** + * Invokes each disposer in order. + * If any are async then they are awaited before calling the next disposer. + */ + public async disposeAsync(): Promise { + return this.disposeInternal('async'); + } + + private disposeInternal(mode: 'sync'): void; + private disposeInternal(mode: 'async'): Promise; + private disposeInternal(mode: 'sync' | 'async'): void | Promise { + if (this.disposed) { + return mode === 'async' ? Promise.resolve() : undefined; + } + this.disposed = true; + if (mode === 'async') { + return Promise.resolve() + .then(async () => { + for (const disposer of this.disposers) { + await disposer(); + } + }) + .finally(() => { + this.disposers.length = 0; + }); + } else { + try { + for (const disposer of this.disposers) { + disposer(); + } + } finally { + this.disposers.length = 0; + } + } + } +} diff --git a/packages/common/src/utils/async.ts b/packages/common/src/utils/async.ts index c6fe822d8..bd8fd63a7 100644 --- a/packages/common/src/utils/async.ts +++ b/packages/common/src/utils/async.ts @@ -49,12 +49,60 @@ export function throttleLeadingTrailing(func: () => void, wait: number) { }; } -export function onAbortPromise(signal: AbortSignal): Promise { - return new Promise((resolve) => { +/** + * Sleep for a given number of milliseconds. + */ +export function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Race a promise against an abort signal. + * Returns a promise that resolves early if the signal is aborted before the + * original promise resolves. + * + * Note: The signal does not cancel the promise. To cancel the promise then + * its logic needs to explicitly check the signal. + */ +export function resolveEarlyOnAbort( + promise: Promise, + signal: AbortSignal +): Promise> { + return new Promise((resolve, reject) => { + const resolveWith = (result: ResolveEarlyOnAbortResult) => { + removeAbortHandler(); + resolve(result); + }; + + const rejectWith = (error: Error) => { + removeAbortHandler(); + reject(error); + }; + + const abortHandler = () => { + resolveWith({ aborted: true }); + }; + + const addAbortHandler = () => { + // Use an event listener to avoid interfering with the onabort + // property where other code may have registered a handler. + signal.addEventListener('abort', abortHandler); + }; + + const removeAbortHandler = () => { + // Remove the abort handler to avoid memory leaks. + signal.removeEventListener('abort', abortHandler); + }; + + addAbortHandler(); + if (signal.aborted) { - resolve(); - } else { - signal.onabort = () => resolve(); + abortHandler(); + return; } + + promise.then((result) => resolveWith({ result, aborted: false })).catch((error) => rejectWith(error)); }); } + +type ResolveEarlyOnAbortResult = { result: T; aborted: false } | { aborted: true }; diff --git a/packages/common/tests/db/DBAdapter.test.ts b/packages/common/tests/db/DBAdapter.test.ts new file mode 100644 index 000000000..7869424db --- /dev/null +++ b/packages/common/tests/db/DBAdapter.test.ts @@ -0,0 +1,227 @@ +import { describe, it, expect } from 'vitest'; +import { + convertToBatchedUpdateNotification, + convertToUpdateNotifications, + RowUpdateType, + UpdateNotification, + BatchedUpdateNotification, + TableUpdateOperation +} from '../../src/db/DBAdapter.js'; + +describe('DBAdapter', () => { + describe('convertToBatchedUpdateNotification', () => { + it('should convert empty array to empty batched notification', () => { + const result = convertToBatchedUpdateNotification([]); + + expect(result).toEqual({ + rawUpdates: [], + tables: [], + groupedUpdates: {} + }); + }); + + it('should convert single update notification', () => { + const updates: UpdateNotification[] = [ + { + table: 'users', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + } + ]; + + const result = convertToBatchedUpdateNotification(updates); + + expect(result.rawUpdates).toEqual(updates); + expect(result.tables).toEqual(['users']); + expect(result.groupedUpdates).toEqual({ + users: [updates[0]] + }); + }); + + it('should group multiple updates for same table', () => { + const updates: UpdateNotification[] = [ + { + table: 'users', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + }, + { + table: 'users', + opType: RowUpdateType.SQLITE_UPDATE, + rowId: 2 + }, + { + table: 'users', + opType: RowUpdateType.SQLITE_DELETE, + rowId: 3 + } + ]; + + const result = convertToBatchedUpdateNotification(updates); + + expect(result.rawUpdates).toEqual(updates); + expect(result.tables).toEqual(['users']); + expect(result.groupedUpdates.users).toHaveLength(3); + expect(result.groupedUpdates.users).toEqual(updates); + }); + + it('should handle updates for multiple tables', () => { + const updates: UpdateNotification[] = [ + { + table: 'users', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + }, + { + table: 'posts', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 2 + }, + { + table: 'users', + opType: RowUpdateType.SQLITE_UPDATE, + rowId: 3 + }, + { + table: 'comments', + opType: RowUpdateType.SQLITE_DELETE, + rowId: 4 + } + ]; + + const result = convertToBatchedUpdateNotification(updates); + + expect(result.rawUpdates).toEqual(updates); + expect(result.tables).toHaveLength(3); + expect(result.tables).toContain('users'); + expect(result.tables).toContain('posts'); + expect(result.tables).toContain('comments'); + + expect(result.groupedUpdates.users).toHaveLength(2); + expect(result.groupedUpdates.users).toEqual([updates[0], updates[2]]); + + expect(result.groupedUpdates.posts).toHaveLength(1); + expect(result.groupedUpdates.posts).toEqual([updates[1]]); + + expect(result.groupedUpdates.comments).toHaveLength(1); + expect(result.groupedUpdates.comments).toEqual([updates[3]]); + }); + }); + + describe('convertToUpdateNotifications', () => { + it('should return empty array for empty batched notification', () => { + const batchedUpdate: BatchedUpdateNotification = { + // will be ignored because empty + rawUpdates: [], + // will be ignored because empty + groupedUpdates: {}, + // will be ignored because empty + tables: [] + }; + + const result = convertToUpdateNotifications(batchedUpdate); + + expect(result).toEqual([]); + }); + + it('should return rawUpdates when available', () => { + const rawUpdates: UpdateNotification[] = [ + { + table: 'users', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + }, + { + table: 'posts', + opType: RowUpdateType.SQLITE_UPDATE, + rowId: 2 + } + ]; + + const batchedUpdate: BatchedUpdateNotification = { + // will be used as priority + rawUpdates, + // will be ignored because rawUpdates is available + groupedUpdates: { + comments: [ + { + opType: RowUpdateType.SQLITE_INSERT, + rowId: 3 + } + ] + }, + // will be ignored because rawUpdates is available + tables: ['users', 'posts', 'comments'] + }; + + const result = convertToUpdateNotifications(batchedUpdate); + + expect(result).toEqual(rawUpdates); + }); + + it('should convert groupedUpdates when rawUpdates is empty', () => { + const batchedUpdate: BatchedUpdateNotification = { + // will be ignored because empty + rawUpdates: [], + // will be used as priority + groupedUpdates: { + users: [ + { + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + } + ], + posts: [ + { + opType: RowUpdateType.SQLITE_UPDATE, + rowId: 2 + }, + { + opType: RowUpdateType.SQLITE_DELETE, + rowId: 3 + } + ] + }, + // will be ignored because groupedUpdates is available + tables: ['users', 'posts', 'comments'] + }; + + const result = convertToUpdateNotifications(batchedUpdate); + + expect(result).toHaveLength(3); + expect(result).toContainEqual({ + table: 'users', + opType: RowUpdateType.SQLITE_INSERT, + rowId: 1 + }); + expect(result).toContainEqual({ + table: 'posts', + opType: RowUpdateType.SQLITE_UPDATE, + rowId: 2 + }); + expect(result).toContainEqual({ + table: 'posts', + opType: RowUpdateType.SQLITE_DELETE, + rowId: 3 + }); + }); + + it('should create minimal notifications from tables when no updates available', () => { + const batchedUpdate: BatchedUpdateNotification = { + // will be ignored because empty + rawUpdates: [], + // will be ignored because empty + groupedUpdates: {}, + // will be used as priority + tables: ['users', 'posts', 'comments'] + }; + + const result = convertToUpdateNotifications(batchedUpdate); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ table: 'users' }); + expect(result[1]).toEqual({ table: 'posts' }); + expect(result[2]).toEqual({ table: 'comments' }); + }); + }); +}); diff --git a/packages/common/tests/utils/DisposeManager.test.ts b/packages/common/tests/utils/DisposeManager.test.ts new file mode 100644 index 000000000..8cb65c65a --- /dev/null +++ b/packages/common/tests/utils/DisposeManager.test.ts @@ -0,0 +1,348 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; +import { DisposeManager, type Disposer } from '../../src/utils/DisposeManager.js'; + +describe('DisposeManager', () => { + let disposeManager: DisposeManager; + let mockDisposer1: Disposer; + let mockDisposer2: Disposer; + + beforeEach(() => { + mockDisposer1 = vi.fn(); + mockDisposer2 = vi.fn(); + + disposeManager = new DisposeManager(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe('Basic Functionality', () => { + it('should create an empty DisposeManager by default', () => { + disposeManager = new DisposeManager(); + + expect(disposeManager.isDisposed()).toBe(false); + }); + + it('should create DisposeManager with initial disposers', () => { + disposeManager = new DisposeManager({ + disposers: [mockDisposer1, mockDisposer2] + }); + + expect(disposeManager.isDisposed()).toBe(false); + + disposeManager.dispose(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + expect(mockDisposer2).toHaveBeenCalledOnce(); + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should add disposers after construction', () => { + disposeManager.add(mockDisposer1); + disposeManager.add(mockDisposer2); + + disposeManager.dispose(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + expect(mockDisposer2).toHaveBeenCalledOnce(); + }); + + it('should correctly report disposal state', () => { + expect(disposeManager.isDisposed()).toBe(false); + + disposeManager.dispose(); + + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should handle empty disposer array', () => { + disposeManager.dispose(); + expect(disposeManager.isDisposed()).toBe(true); + }); + }); + + describe('Disposal Behavior', () => { + it('should call disposers in order they were added', () => { + const callOrder = new Array(); + const disposer1 = vi.fn(() => callOrder.push(1)); + const disposer2 = vi.fn(() => callOrder.push(2)); + const disposer3 = vi.fn(() => callOrder.push(3)); + + disposeManager.add(disposer1); + disposeManager.add(disposer2); + disposeManager.add(disposer3); + + disposeManager.dispose(); + + expect(callOrder).toEqual([1, 2, 3]); + }); + + it('should handle multiple dispose calls (idempotent)', () => { + disposeManager.add(mockDisposer1); + + disposeManager.dispose(); + disposeManager.dispose(); + disposeManager.dispose(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should not add disposers after disposal', () => { + disposeManager.dispose(); + + expect(disposeManager.isDisposed()).toBe(true); + + disposeManager.add(mockDisposer1); + + // Should not be called since manager is already disposed + disposeManager.dispose(); + + expect(mockDisposer1).not.toHaveBeenCalled(); + }); + + it('should stop execution if disposer throws an error', () => { + const errorDisposer = vi.fn(() => { + throw new Error('Test Error'); + }); + const workingDisposer = vi.fn(); + + disposeManager.add(errorDisposer); + disposeManager.add(workingDisposer); + + expect(() => disposeManager.dispose()).toThrow(); + expect(errorDisposer).toHaveBeenCalledOnce(); + expect(workingDisposer).not.toHaveBeenCalled(); + expect(disposeManager.isDisposed()).toBe(true); + }); + }); + + describe('AbortSignal Integration', () => { + it('should dispose when AbortSignal is aborted', () => { + const controller = new AbortController(); + + disposeManager.add(mockDisposer1); + disposeManager.disposeOnAbort(controller.signal); + + expect(disposeManager.isDisposed()).toBe(false); + + controller.abort(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should dispose immediately if AbortSignal is already aborted', () => { + const controller = new AbortController(); + controller.abort(); // Abort before setting up listener + + disposeManager.add(mockDisposer1); + expect(disposeManager.isDisposed()).toBe(false); + + disposeManager.disposeOnAbort(controller.signal); + expect(disposeManager.isDisposed()).toBe(true); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + }); + + it('should dispose immediately if AbortSignal is already aborted', () => { + const controller = new AbortController(); + controller.abort(); // Abort before setting up listener + + disposeManager.disposeOnAbort(controller.signal); + expect(disposeManager.isDisposed()).toBe(true); + + // Disposer added after adding aborted signal which + // would already have disposed the manager, so this won't be called. + disposeManager.add(mockDisposer1); + expect(disposeManager.isDisposed()).toBe(true); + + expect(mockDisposer1).not.toHaveBeenCalledOnce(); + }); + + it('should handle multiple abort signals', () => { + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + disposeManager.add(mockDisposer1); + disposeManager.disposeOnAbort(controller1.signal); + disposeManager.disposeOnAbort(controller2.signal); + + controller1.abort(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + expect(disposeManager.isDisposed()).toBe(true); + + // Second abort should have no effect + controller2.abort(); + + expect(mockDisposer1).toHaveBeenCalledOnce(); + }); + + it('should not set up listeners on already disposed manager', () => { + const controller = new AbortController(); + + const addEventListenerSpy = vi.spyOn(controller.signal, 'addEventListener'); + const removeEventListenerSpy = vi.spyOn(controller.signal, 'removeEventListener'); + + disposeManager.dispose(); + + expect(disposeManager.isDisposed()).toBe(true); + + // Should be a no-op + disposeManager.disposeOnAbort(controller.signal); + + controller.abort(); + + // No additional effects should occur + expect(addEventListenerSpy).not.toHaveBeenCalled(); + expect(removeEventListenerSpy).not.toHaveBeenCalled(); + }); + + it('should clean up event listeners to prevent memory leaks', () => { + const controller = new AbortController(); + + const addEventListenerSpy = vi.spyOn(controller.signal, 'addEventListener'); + const removeEventListenerSpy = vi.spyOn(controller.signal, 'removeEventListener'); + + disposeManager.disposeOnAbort(controller.signal); + disposeManager.dispose(); + + expect(addEventListenerSpy).toHaveBeenCalledWith('abort', expect.any(Function), { once: true }); + expect(removeEventListenerSpy).toHaveBeenCalledWith('abort', expect.any(Function)); + }); + }); + + describe('Async Disposal', () => { + it('should handle async disposers with disposeAsync', async () => { + const asyncDisposer = vi.fn().mockResolvedValue(undefined); + + disposeManager.add(asyncDisposer); + + await disposeManager.disposeAsync(); + + expect(asyncDisposer).toHaveBeenCalledOnce(); + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should wait for async disposers in sequence', async () => { + const callOrder = new Array(); + const asyncDisposer1 = vi.fn(async () => { + await new Promise((resolve) => setTimeout(resolve, 10)); + callOrder.push(1); + }); + const asyncDisposer2 = vi.fn(async () => { + await new Promise((resolve) => setTimeout(resolve, 5)); + callOrder.push(2); + }); + + disposeManager.add(asyncDisposer1); + disposeManager.add(asyncDisposer2); + + await disposeManager.disposeAsync(); + + expect(callOrder).toEqual([1, 2]); + expect(asyncDisposer1).toHaveBeenCalledOnce(); + expect(asyncDisposer2).toHaveBeenCalledOnce(); + }); + + it('should handle mixed sync and async disposers', async () => { + const callOrder = new Array(); + const syncDisposer = vi.fn(() => { + callOrder.push(1); + }); + const asyncDisposer = vi.fn(async () => { + await new Promise((resolve) => setTimeout(resolve, 5)); + callOrder.push(2); + }); + + disposeManager.add(syncDisposer); + disposeManager.add(asyncDisposer); + + await disposeManager.disposeAsync(); + + expect(callOrder).toEqual([1, 2]); + }); + + it('should handle mixed async and sync disposers', async () => { + const callOrder = new Array(); + const asyncDisposer = vi.fn(async () => { + await new Promise((resolve) => setTimeout(resolve, 5)); + callOrder.push(1); + }); + const syncDisposer = vi.fn(() => { + callOrder.push(2); + }); + + disposeManager.add(asyncDisposer); + disposeManager.add(syncDisposer); + + await disposeManager.disposeAsync(); + + expect(callOrder).toEqual([1, 2]); + }); + + it('should not await async disposers with sync dispose', () => { + const callOrder = new Array(); + const asyncDisposer = vi.fn(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + callOrder.push(1); + }); + + disposeManager.add(asyncDisposer); + + disposeManager.dispose(); // Sync dispose + + expect(callOrder).toEqual([]); // Async disposer won't have gotten this far yet + expect(asyncDisposer).toHaveBeenCalled(); + expect(disposeManager.isDisposed()).toBe(true); + }); + + it('should handle async disposer rejections gracefully', async () => { + const errorAsyncDisposer = vi.fn().mockRejectedValue(new Error('Test Error')); + const workingDisposer = vi.fn(); + + disposeManager.add(errorAsyncDisposer); + disposeManager.add(workingDisposer); + + await expect(disposeManager.disposeAsync()).rejects.toThrow(); + + expect(errorAsyncDisposer).toHaveBeenCalledOnce(); + expect(workingDisposer).not.toHaveBeenCalled(); + expect(disposeManager.isDisposed()).toBe(true); + }); + }); + + describe('Edge Cases and Error Handling', () => { + it('should handle undefined/null disposers gracefully', () => { + disposeManager = new DisposeManager(); + + // These should be no-ops and not throw + disposeManager.add(undefined as any); + disposeManager.add(null as any); + disposeManager.add(mockDisposer1); + + expect(() => disposeManager.dispose()).not.toThrow(); + expect(mockDisposer1).toHaveBeenCalledOnce(); + }); + + it('should maintain state consistency after errors', () => { + const errorDisposer = vi.fn(() => { + throw new Error('Test Error'); + }); + + disposeManager = new DisposeManager(); + disposeManager.add(errorDisposer); + + expect(() => disposeManager.dispose()).toThrow(); + expect(disposeManager.isDisposed()).toBe(true); + + // Subsequent operations should be no-ops + disposeManager.add(mockDisposer1); + disposeManager.dispose(); + + expect(mockDisposer1).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/common/tests/utils/async.test.ts b/packages/common/tests/utils/async.test.ts new file mode 100644 index 000000000..892c5d9fb --- /dev/null +++ b/packages/common/tests/utils/async.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, it, vi } from 'vitest'; +import { resolveEarlyOnAbort, sleep } from '../../src/utils/async'; + +describe('async', () => { + describe('sleep', () => { + it('should sleep for the given number of milliseconds', async () => { + const start = Date.now(); + await sleep(100); + const end = Date.now(); + expect(end - start).toBeGreaterThanOrEqual(100); + }); + }); + + describe('resolveEarlyOnAbort', () => { + it('should resolve early when signal is aborted', async () => { + const controller = new AbortController(); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal); + + // Abort after a short delay + setTimeout(() => controller.abort(), 10); + + const result = await racePromise; + expect(result).toEqual({ aborted: true }); + }); + + it('should resolve immediately if signal is already aborted', async () => { + const controller = new AbortController(); + controller.abort(); // Abort before creating the race + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const result = await resolveEarlyOnAbort(slowPromise, controller.signal); + expect(result).toEqual({ aborted: true }); + }); + + it('should resolve with the result if the promise resolves before the signal is aborted', async () => { + const controller = new AbortController(); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const result = await resolveEarlyOnAbort(slowPromise, controller.signal); + expect(result).toEqual({ result: 'completed', aborted: false }); + }); + + it('should show that resolveEarlyOnAbort does not interfere with onabort property or other event listeners', async () => { + const controller = new AbortController(); + let onabortCalled = false; + let eventListenerCalled = false; + + // Set onabort property + controller.signal.onabort = () => { + onabortCalled = true; + }; + + // Add another event listener + controller.signal.addEventListener('abort', () => { + eventListenerCalled = true; + }); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal); + + // Abort after a short delay + setTimeout(() => controller.abort(), 10); + + const result = await racePromise; + + expect(result).toEqual({ aborted: true }); + expect(onabortCalled).toBe(true); + expect(eventListenerCalled).toBe(true); + }); + }); +}); diff --git a/packages/node/src/db/AsyncDatabase.ts b/packages/node/src/db/AsyncDatabase.ts index 20816ab89..ae183db6d 100644 --- a/packages/node/src/db/AsyncDatabase.ts +++ b/packages/node/src/db/AsyncDatabase.ts @@ -1,4 +1,4 @@ -import { QueryResult } from '@powersync/common'; +import { BatchedUpdateNotification, QueryResult } from '@powersync/common'; export type ProxiedQueryResult = Omit & { rows?: { @@ -20,5 +20,5 @@ export interface AsyncDatabase { // This happens on the worker because we otherwise get race conditions when wrapping // callbacks to invoke on the main thread (we need a guarantee that collectCommittedUpdates // contains entries immediately after calling COMMIT). - collectCommittedUpdates: () => Promise; + collectCommittedUpdates: () => Promise; } diff --git a/packages/node/src/db/BetterSQLite3DBAdapter.ts b/packages/node/src/db/BetterSQLite3DBAdapter.ts index 83caa716a..9cc0600fb 100644 --- a/packages/node/src/db/BetterSQLite3DBAdapter.ts +++ b/packages/node/src/db/BetterSQLite3DBAdapter.ts @@ -190,13 +190,8 @@ export class BetterSQLite3DBAdapter extends BaseObserver impl } finally { const updates = await this.writeConnection.database.collectCommittedUpdates(); - if (updates.length > 0) { - const event: BatchedUpdateNotification = { - tables: updates, - groupedUpdates: {}, - rawUpdates: [] - }; - this.iterateListeners((cb) => cb.tablesUpdated?.(event)); + if (updates.tables.length > 0) { + this.iterateListeners((cb) => cb.tablesUpdated?.(updates)); } } } finally { diff --git a/packages/node/src/db/SqliteWorker.ts b/packages/node/src/db/SqliteWorker.ts index 2bb3f8e3e..95d5b30f4 100644 --- a/packages/node/src/db/SqliteWorker.ts +++ b/packages/node/src/db/SqliteWorker.ts @@ -1,5 +1,6 @@ import * as path from 'node:path'; import BetterSQLite3Database, { Database } from '@powersync/better-sqlite3'; +import { BatchedUpdateNotification, RowUpdateType, TableUpdateOperation, UpdateNotification } from '@powersync/common'; import * as Comlink from 'comlink'; import { parentPort, threadId } from 'node:worker_threads'; import OS from 'node:os'; @@ -9,8 +10,8 @@ import { AsyncDatabase, AsyncDatabaseOpener } from './AsyncDatabase.js'; class BlockingAsyncDatabase implements AsyncDatabase { private readonly db: Database; - private readonly uncommittedUpdatedTables = new Set(); - private readonly committedUpdatedTables = new Set(); + private readonly uncommittedUpdates = new Array(); + private readonly committedUpdates = new Array(); constructor(db: Database) { this.db = db; @@ -18,27 +19,60 @@ class BlockingAsyncDatabase implements AsyncDatabase { db.function('node_thread_id', () => threadId); } - collectCommittedUpdates() { - const resolved = Promise.resolve([...this.committedUpdatedTables]); - this.committedUpdatedTables.clear(); - return resolved; + async collectCommittedUpdates() { + const rawUpdates: UpdateNotification[] = []; + const groupedUpdates: Record = {}; + + for (const rawUpdate of this.committedUpdates) { + rawUpdates.push(rawUpdate); + groupedUpdates[rawUpdate.table] ??= []; + groupedUpdates[rawUpdate.table].push(rawUpdate); + } + + const result: BatchedUpdateNotification = { + tables: Object.keys(groupedUpdates), + rawUpdates, + groupedUpdates + }; + + this.committedUpdates.length = 0; + + return result; } installUpdateHooks() { - this.db.updateHook((_op: string, _dbName: string, tableName: string, _rowid: bigint) => { - this.uncommittedUpdatedTables.add(tableName); - }); + this.db.updateHook( + ( + operation: 'SQLITE_INSERT' | 'SQLITE_UPDATE' | 'SQLITE_DELETE', + _dbName: string, + table: string, + rowId: number + ) => { + let opType: RowUpdateType; + switch (operation) { + case 'SQLITE_INSERT': + opType = RowUpdateType.SQLITE_INSERT; + break; + case 'SQLITE_UPDATE': + opType = RowUpdateType.SQLITE_UPDATE; + break; + case 'SQLITE_DELETE': + opType = RowUpdateType.SQLITE_DELETE; + break; + } - this.db.commitHook(() => { - for (const tableName of this.uncommittedUpdatedTables) { - this.committedUpdatedTables.add(tableName); + this.uncommittedUpdates.push({ table, opType, rowId }); } - this.uncommittedUpdatedTables.clear(); + ); + + this.db.commitHook(() => { + this.committedUpdates.push(...this.uncommittedUpdates); + this.uncommittedUpdates.length = 0; return true; }); this.db.rollbackHook(() => { - this.uncommittedUpdatedTables.clear(); + this.uncommittedUpdates.length = 0; }); } diff --git a/packages/node/tests/PowerSyncDatabase.test.ts b/packages/node/tests/PowerSyncDatabase.test.ts index 28ab4b31f..da15e5699 100644 --- a/packages/node/tests/PowerSyncDatabase.test.ts +++ b/packages/node/tests/PowerSyncDatabase.test.ts @@ -1,9 +1,8 @@ import * as path from 'node:path'; import { Worker } from 'node:worker_threads'; - import { vi, expect, test } from 'vitest'; import { AppSchema, databaseTest, tempDirectoryTest } from './utils'; -import { PowerSyncDatabase } from '../lib'; +import { PowerSyncDatabase, RowUpdateType, WatchOnChangeEvent } from '../lib'; import { WorkerOpener } from '../lib/db/options'; test('validates options', async () => { @@ -65,28 +64,46 @@ databaseTest('runs queries on multiple threads', async ({ database }) => { databaseTest('can watch tables', async ({ database }) => { const fn = vi.fn(); const disposeWatch = database.onChangeWithCallback( - { - onChange: () => { - fn(); - } - }, + { onChange: (event: WatchOnChangeEvent) => fn(event) }, { tables: ['todos'], throttleMs: 0 } ); await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['first']); await expect.poll(() => fn).toHaveBeenCalledOnce(); + expect(fn).toHaveBeenNthCalledWith(1, { + changedTables: ['ps_data__todos'], + update: { + tables: ['ps_data__todos'], + rawUpdates: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 1n }], + groupedUpdates: { + ps_data__todos: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 1n }] + } + } + }); await database.writeTransaction(async (tx) => { await tx.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['second']); }); await expect.poll(() => fn).toHaveBeenCalledTimes(2); + expect(fn).toHaveBeenNthCalledWith(2, { + changedTables: ['ps_data__todos'], + update: { + tables: ['ps_data__todos'], + rawUpdates: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 2n }], + groupedUpdates: { + ps_data__todos: [{ table: 'ps_data__todos', opType: RowUpdateType.SQLITE_INSERT, rowId: 2n }] + } + } + }); + // Assert that rolled back changes aren't emitted await database.writeTransaction(async (tx) => { await tx.execute('DELETE FROM todos;'); await tx.rollback(); }); await expect.poll(() => fn).toHaveBeenCalledTimes(2); + // Assert that unwatched changes aren't emitted disposeWatch(); await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?)', ['fourth']); await expect.poll(() => fn).toHaveBeenCalledTimes(2);