diff --git a/.changeset/wicked-wolves-greet.md b/.changeset/wicked-wolves-greet.md new file mode 100644 index 000000000..401773815 --- /dev/null +++ b/.changeset/wicked-wolves-greet.md @@ -0,0 +1,5 @@ +--- +'@powersync/node': patch +--- + +Throw when database is used after being closed. diff --git a/packages/node/package.json b/packages/node/package.json index 6b9db8100..8479f835f 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -70,7 +70,7 @@ "async-mutex": "^0.5.0", "comlink": "^4.4.2", "undici": "^7.11.0", - "bson": "^6.10.4" + "bson": "^6.10.4" }, "devDependencies": { "@powersync/drizzle-driver": "workspace:*", diff --git a/packages/node/src/db/RemoteConnection.ts b/packages/node/src/db/RemoteConnection.ts index fc8d94f4d..08f335952 100644 --- a/packages/node/src/db/RemoteConnection.ts +++ b/packages/node/src/db/RemoteConnection.ts @@ -11,49 +11,73 @@ export class RemoteConnection implements LockContext { private readonly worker: Worker; private readonly comlink: Remote; - readonly database: Remote; + private readonly database: Remote; + + private readonly notifyWorkerClosed = new AbortController(); constructor(worker: Worker, comlink: Remote, database: Remote) { this.worker = worker; this.comlink = comlink; this.database = database; + + this.worker.once('exit', (_) => { + this.notifyWorkerClosed.abort(); + }); } /** * Runs the inner function, but appends the stack trace where this function was called. This is useful for workers * because stack traces from worker errors are otherwise unrelated to the application issue that has caused them. */ - private async recoverTrace(inner: () => Promise): Promise { + private withRemote(inner: () => Promise): Promise { const trace = {}; Error.captureStackTrace(trace); + const controller = this.notifyWorkerClosed; - try { - return await inner(); - } catch (e) { - if (e instanceof Error && e.stack) { - e.stack += (trace as any).stack; + return new Promise((resolve, reject) => { + if (controller.signal.aborted) { + reject(new Error('Called operation on closed remote')); } - throw e; - } + function handleAbort() { + reject(new Error('Remote peer closed with request in flight')); + } + + function completePromise(action: () => void) { + controller!.signal.removeEventListener('abort', handleAbort); + action(); + } + + controller.signal.addEventListener('abort', handleAbort); + + inner() + .then((data) => completePromise(() => resolve(data))) + .catch((e) => { + if (e instanceof Error && e.stack) { + e.stack += (trace as any).stack; + } + + return completePromise(() => reject(e)); + }); + }); } executeBatch(query: string, params: any[][] = []): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { const result = await this.database.executeBatch(query, params ?? []); return RemoteConnection.wrapQueryResult(result); }); } execute(query: string, params?: any[] | undefined): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { const result = await this.database.execute(query, params ?? []); return RemoteConnection.wrapQueryResult(result); }); } executeRaw(query: string, params?: any[] | undefined): Promise { - return this.recoverTrace(async () => { + return this.withRemote(async () => { return await this.database.executeRaw(query, params ?? []); }); } diff --git a/packages/node/src/db/WorkerConnectionPool.ts b/packages/node/src/db/WorkerConnectionPool.ts index d05c21725..696c9a70a 100644 --- a/packages/node/src/db/WorkerConnectionPool.ts +++ b/packages/node/src/db/WorkerConnectionPool.ts @@ -213,10 +213,7 @@ export class WorkerConnectionPool extends BaseObserver implem try { return await fn(this.writeConnection); } finally { - const serializedUpdates = await this.writeConnection.database.executeRaw( - "SELECT powersync_update_hooks('get');", - [] - ); + const serializedUpdates = await this.writeConnection.executeRaw("SELECT powersync_update_hooks('get');", []); const updates = JSON.parse(serializedUpdates[0][0] as string) as string[]; if (updates.length > 0) {