Skip to content

Commit 2c7bb0c

Browse files
committed
Two more tests
1 parent 2099f25 commit 2c7bb0c

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

packages/common/src/client/ConnectionManager.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,8 @@ class ActiveSubscription {
355355
}
356356

357357
class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
358+
private active: boolean = false;
359+
358360
constructor(readonly subscription: ActiveSubscription) {
359361
subscription.refcount++;
360362
_finalizer?.register(this, subscription);
@@ -373,8 +375,11 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
373375
}
374376

375377
unsubscribe(): void {
376-
_finalizer?.unregister(this);
377-
this.subscription.decrementRefCount();
378+
if (this.active) {
379+
this.active = false;
380+
_finalizer?.unregister(this);
381+
this.subscription.decrementRefCount();
382+
}
378383
}
379384
}
380385

packages/node/tests/sync-stream.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,40 @@ describe('Sync streams', () => {
146146
let status = await statusPromise;
147147
expect(status.forStream(subscription)).not.toBeNull();
148148
});
149+
150+
mockSyncServiceTest('unsubscribing multiple times has no effect', async ({ syncService }) => {
151+
const database = await syncService.createDatabase();
152+
const a = await database.syncStream('a').subscribe();
153+
const aAgain = await database.syncStream('a').subscribe();
154+
a.unsubscribe();
155+
a.unsubscribe();
156+
157+
// Pretend the streams are expired - they should still be requested because the core extension extends the lifetime
158+
// of streams currently referenced before connecting.
159+
await database.execute('UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000');
160+
await database.connect(new TestConnector(), defaultOptions);
161+
162+
expect(syncService.connectedListeners[0]).toMatchObject({
163+
streams: {
164+
include_defaults: true,
165+
subscriptions: [{}]
166+
}
167+
});
168+
aAgain.unsubscribe();
169+
});
170+
171+
mockSyncServiceTest('unsubscribeAll', async ({ syncService }) => {
172+
const database = await syncService.createDatabase();
173+
const a = await database.syncStream('a').subscribe();
174+
database.syncStream('a').unsubscribeAll();
175+
176+
await database.connect(new TestConnector(), defaultOptions);
177+
expect(syncService.connectedListeners[0]).toMatchObject({
178+
streams: {
179+
include_defaults: true,
180+
subscriptions: []
181+
}
182+
});
183+
a.unsubscribe();
184+
});
149185
});

0 commit comments

Comments
 (0)