From efb9d09396d0fefe93f46fead72c23c19babb312 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 6 Nov 2025 17:03:47 +0100 Subject: [PATCH 1/2] Debug node test failures --- packages/node/package.json | 2 +- packages/node/src/db/RemoteConnection.ts | 48 +++++++++++++++----- packages/node/src/db/WorkerConnectionPool.ts | 18 ++++++-- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/packages/node/package.json b/packages/node/package.json index 6b9db8100..8479f835f 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -70,7 +70,7 @@ "async-mutex": "^0.5.0", "comlink": "^4.4.2", "undici": "^7.11.0", - "bson": "^6.10.4" + "bson": "^6.10.4" }, "devDependencies": { "@powersync/drizzle-driver": "workspace:*", diff --git a/packages/node/src/db/RemoteConnection.ts b/packages/node/src/db/RemoteConnection.ts index fc8d94f4d..08f335952 100644 --- a/packages/node/src/db/RemoteConnection.ts +++ b/packages/node/src/db/RemoteConnection.ts @@ -11,49 +11,73 @@ export class RemoteConnection implements LockContext { private readonly worker: Worker; private readonly comlink: Remote; - readonly database: Remote; + private readonly database: Remote; + + private readonly notifyWorkerClosed = new AbortController(); constructor(worker: Worker, comlink: Remote, database: Remote) { this.worker = worker; this.comlink = comlink; this.database = database; + + this.worker.once('exit', (_) => { + this.notifyWorkerClosed.abort(); + }); } /** * Runs the inner function, but appends the stack trace where this function was called. This is useful for workers * because stack traces from worker errors are otherwise unrelated to the application issue that has caused them. */ - private async recoverTrace(inner: () => Promise): Promise { + private withRemote(inner: () => Promise): Promise { const trace = {}; Error.captureStackTrace(trace); + const controller = this.notifyWorkerClosed; - try { - return await inner(); - } catch (e) { - if (e instanceof Error && e.stack) { - e.stack += (trace as any).stack; + return new Promise((resolve, reject) => { + if (controller.signal.aborted) { + reject(new Error('Called operation on closed remote')); } - throw e; - } + function handleAbort() { + reject(new Error('Remote peer closed with request in flight')); + } + + function completePromise(action: () => void) { + controller!.signal.removeEventListener('abort', handleAbort); + action(); + } + + controller.signal.addEventListener('abort', handleAbort); + + inner() + .then((data) => completePromise(() => resolve(data))) + .catch((e) => { + if (e instanceof Error && e.stack) { + e.stack += (trace as any).stack; + } + + return completePromise(() => reject(e)); + }); + }); } executeBatch(query: string, params: any[][] = []): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { const result = await this.database.executeBatch(query, params ?? []); return RemoteConnection.wrapQueryResult(result); }); } execute(query: string, params?: any[] | undefined): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { const result = await this.database.execute(query, params ?? []); return RemoteConnection.wrapQueryResult(result); }); } executeRaw(query: string, params?: any[] | undefined): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { return await this.database.executeRaw(query, params ?? []); }); } diff --git a/packages/node/src/db/WorkerConnectionPool.ts b/packages/node/src/db/WorkerConnectionPool.ts index d05c21725..551c33f9f 100644 --- a/packages/node/src/db/WorkerConnectionPool.ts +++ b/packages/node/src/db/WorkerConnectionPool.ts @@ -92,11 +92,22 @@ export class WorkerConnectionPool extends BaseObserver implem const listeners = new WeakMap void>(); const comlink = Comlink.wrap({ - postMessage: worker.postMessage.bind(worker), + postMessage: (message: any, transfer?: any) => { + console.log('to worker', message); + return worker.postMessage(message, transfer); + }, addEventListener: (type, listener) => { let resolved: (event: any) => void = 'handleEvent' in listener ? listener.handleEvent.bind(listener) : listener; + { + const original = resolved; + resolved = (event) => { + console.log('from worker', event); + return original(event); + }; + } + // Comlink wants message events, but the message event on workers in Node returns the data only. if (type === 'message') { const original = resolved; @@ -213,10 +224,7 @@ export class WorkerConnectionPool extends BaseObserver implem try { return await fn(this.writeConnection); } finally { - const serializedUpdates = await this.writeConnection.database.executeRaw( - "SELECT powersync_update_hooks('get');", - [] - ); + const serializedUpdates = await this.writeConnection.executeRaw("SELECT powersync_update_hooks('get');", []); const updates = JSON.parse(serializedUpdates[0][0] as string) as string[]; if (updates.length > 0) { From 3b2a2c946bc8c606a3c70beb3a6aadfc09bd7298 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 6 Nov 2025 17:12:15 +0100 Subject: [PATCH 2/2] Remove debug log --- .changeset/wicked-wolves-greet.md | 5 +++++ packages/node/src/db/WorkerConnectionPool.ts | 13 +------------ 2 files changed, 6 insertions(+), 12 deletions(-) create mode 100644 .changeset/wicked-wolves-greet.md diff --git a/.changeset/wicked-wolves-greet.md b/.changeset/wicked-wolves-greet.md new file mode 100644 index 000000000..401773815 --- /dev/null +++ b/.changeset/wicked-wolves-greet.md @@ -0,0 +1,5 @@ +--- +'@powersync/node': patch +--- + +Throw when database is used after being closed. diff --git a/packages/node/src/db/WorkerConnectionPool.ts b/packages/node/src/db/WorkerConnectionPool.ts index 551c33f9f..696c9a70a 100644 --- a/packages/node/src/db/WorkerConnectionPool.ts +++ b/packages/node/src/db/WorkerConnectionPool.ts @@ -92,22 +92,11 @@ export class WorkerConnectionPool extends BaseObserver implem const listeners = new WeakMap void>(); const comlink = Comlink.wrap({ - postMessage: (message: any, transfer?: any) => { - console.log('to worker', message); - return worker.postMessage(message, transfer); - }, + postMessage: worker.postMessage.bind(worker), addEventListener: (type, listener) => { let resolved: (event: any) => void = 'handleEvent' in listener ? listener.handleEvent.bind(listener) : listener; - { - const original = resolved; - resolved = (event) => { - console.log('from worker', event); - return original(event); - }; - } - // Comlink wants message events, but the message event on workers in Node returns the data only. if (type === 'message') { const original = resolved;