Skip to content

Commit b44e949

Browse files
committed
Remove subscriptions when tab is removed
1 parent de04c27 commit b44e949

File tree

3 files changed

+49
-14
lines changed

3 files changed

+49
-14
lines changed

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,15 @@ import {
77
} from '@powersync/common';
88
import * as Comlink from 'comlink';
99
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
10-
import {
11-
ManualSharedSyncPayload,
12-
SharedSyncClientEvent,
13-
SharedSyncImplementation
14-
} from '../../worker/sync/SharedSyncImplementation';
10+
import { ManualSharedSyncPayload, SharedSyncClientEvent } from '../../worker/sync/SharedSyncImplementation';
1511
import { DEFAULT_CACHE_SIZE_KB, resolveWebSQLFlags, TemporaryStorageOption } from '../adapters/web-sql-flags';
1612
import { WebDBAdapter } from '../adapters/WebDBAdapter';
1713
import {
1814
WebStreamingSyncImplementation,
1915
WebStreamingSyncImplementationOptions
2016
} from './WebStreamingSyncImplementation';
2117
import { WorkerClient } from '../../worker/sync/WorkerClient';
18+
import { getNavigatorLocks } from '../../shared/navigator';
2219

2320
/**
2421
* The shared worker will trigger methods on this side of the message port
@@ -111,6 +108,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
111108

112109
protected isInitialized: Promise<void>;
113110
protected dbAdapter: WebDBAdapter;
111+
private abortOnClose = new AbortController();
114112

115113
constructor(options: SharedWebStreamingSyncImplementationOptions) {
116114
super(options);
@@ -192,6 +190,19 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
192190
* This performs bi-directional method calling.
193191
*/
194192
Comlink.expose(this.clientProvider, this.messagePort);
193+
194+
// Request a random lock until this client is disposed. The name of the lock is sent to the shared worker, which
195+
// will also attempt to acquire it. Since the lock is returned when the tab is closed, this allows the share worker
196+
// to free resources associated with this tab.
197+
getNavigatorLocks().request(`tab-close-signal-${crypto.randomUUID()}`, async (lock) => {
198+
if (!this.abortOnClose.signal.aborted) {
199+
this.syncManager.addLockBasedCloseSignal(lock!.name);
200+
201+
await new Promise<void>((r) => {
202+
this.abortOnClose.signal.onabort = () => r();
203+
});
204+
}
205+
});
195206
}
196207

197208
/**
@@ -238,6 +249,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
238249
};
239250
this.messagePort.postMessage(closeMessagePayload);
240251
});
252+
this.abortOnClose.abort();
241253

242254
// Release the proxy
243255
this.syncManager[Comlink.releaseProxy]();

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,12 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
292292
* Removes a message port client from this manager's managed
293293
* clients.
294294
*/
295-
async removePort(port: MessagePort) {
295+
async removePort(port: WrappedSyncPort) {
296296
// Remove the port within a mutex context.
297297
// Warns if the port is not found. This should not happen in practice.
298298
// We return early if the port is not found.
299299
const { trackedPort, shouldReconnect } = await this.portMutex.runExclusive(async () => {
300-
const index = this.ports.findIndex((p) => p.port == port);
300+
const index = this.ports.findIndex((p) => p == port);
301301
if (index < 0) {
302302
this.logger.warn(`Could not remove port ${port} since it is not present in active ports.`);
303303
return {};
@@ -312,7 +312,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
312312
* not resolve. Abort them here.
313313
*/
314314
[this.fetchCredentialsController, this.uploadDataController].forEach((abortController) => {
315-
if (abortController?.activePort.port == port) {
315+
if (abortController?.activePort == port) {
316316
abortController!.controller.abort(
317317
new AbortOperation('Closing pending requests after client port is removed')
318318
);
@@ -347,6 +347,10 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
347347
if (trackedPort.db) {
348348
await trackedPort.db.close();
349349
}
350+
351+
// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
352+
this.collectActiveSubscriptions();
353+
350354
// Release proxy
351355
return () => trackedPort.clientProvider[Comlink.releaseProxy]();
352356
}

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
WrappedSyncPort
88
} from './SharedSyncImplementation';
99
import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common';
10+
import { getNavigatorLocks } from '../../shared/navigator';
1011

1112
/**
1213
* A client to the shared sync worker.
@@ -30,19 +31,37 @@ export class WorkerClient {
3031
this.port.addEventListener('message', async (event) => {
3132
const payload = event.data as ManualSharedSyncPayload;
3233
if (payload?.event == SharedSyncClientEvent.CLOSE_CLIENT) {
33-
const release = await this.sync.removePort(this.port);
34-
this.port.postMessage({
35-
event: SharedSyncClientEvent.CLOSE_ACK,
36-
data: {}
37-
} satisfies ManualSharedSyncPayload);
38-
release?.();
34+
await this.removePort();
3935
}
4036
});
4137

4238
this.resolvedPort = await this.sync.addPort(this.port);
4339
Comlink.expose(this, this.port);
4440
}
4541

42+
private async removePort() {
43+
if (this.resolvedPort) {
44+
const release = await this.sync.removePort(this.resolvedPort);
45+
this.port.postMessage({
46+
event: SharedSyncClientEvent.CLOSE_ACK,
47+
data: {}
48+
} satisfies ManualSharedSyncPayload);
49+
release?.();
50+
}
51+
}
52+
53+
/**
54+
* Called by a client after obtaining a lock with a random name.
55+
*
56+
* When the client tab is closed, its lock will be returned. So when the shared worker attempts to acquire the lock,
57+
* it can consider the connection to be closed.
58+
*/
59+
addLockBasedCloseSignal(name: string) {
60+
getNavigatorLocks().request(name, async () => {
61+
await this.removePort();
62+
});
63+
}
64+
4665
setLogLevel(level: ILogLevel) {
4766
this.sync.setLogLevel(level);
4867
}

0 commit comments

Comments
 (0)