Skip to content

Commit 7d1d378

Browse files
committed
Add tests
1 parent 2480604 commit 7d1d378

File tree

9 files changed

+242
-28
lines changed

9 files changed

+242
-28
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/Watch
4040
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
4141
import { WatchedQueryComparator } from './watched/processors/comparators.js';
4242
import { coreStatusToJs, CoreSyncStatus } from './sync/stream/core-instruction.js';
43+
import { SyncStream } from './sync/sync-streams.js';
4344

4445
export interface DisconnectAndClearOptions {
4546
/** When set to false, data in local-only tables is preserved. */
@@ -536,7 +537,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
536537
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
537538
}
538539

539-
syncStream(name: string, params?: Record<string, any>) {}
540+
syncStream(name: string, params?: Record<string, any>): SyncStream {
541+
return this.connectionManager.stream(name, params ?? null);
542+
}
540543

541544
/**
542545
* Close the database, releasing resources.

packages/common/src/client/ConnectionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
292292
}
293293
});
294294

295-
if (!this.pendingConnectionOptions) {
295+
if (!this.syncStreamImplementation) {
296296
// We're not connected. So, update the offline sync status to reflect the new subscription.
297297
// (With an active iteration, the sync client would include it in its state).
298298
await this.options.resolveOfflineSyncStatus();

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export interface Checkpoint {
1212
last_op_id: OpId;
1313
buckets: BucketChecksum[];
1414
write_checkpoint?: string;
15+
streams?: any[];
1516
}
1617

1718
export interface BucketState {
@@ -49,6 +50,7 @@ export interface BucketChecksum {
4950
* Count of operations - informational only.
5051
*/
5152
count?: number;
53+
subscriptions?: any;
5254
}
5355

5456
export enum PSInternalTable {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,13 @@ The next upload iteration will be delayed.`);
548548
while (true) {
549549
this.updateSyncStatus({ connecting: true });
550550
let shouldDelayRetry = true;
551+
let result: RustIterationResult | null = null;
552+
551553
try {
552554
if (signal?.aborted) {
553555
break;
554556
}
555-
await this.streamingSyncIteration(nestedAbortController.signal, options);
557+
result = await this.streamingSyncIteration(nestedAbortController.signal, options);
556558
// Continue immediately, streamingSyncIteration will wait before completing if necessary.
557559
} catch (ex) {
558560
/**
@@ -586,14 +588,16 @@ The next upload iteration will be delayed.`);
586588
nestedAbortController = new AbortController();
587589
}
588590

589-
this.updateSyncStatus({
590-
connected: false,
591-
connecting: true // May be unnecessary
592-
});
591+
if (result?.immediateRestart != true) {
592+
this.updateSyncStatus({
593+
connected: false,
594+
connecting: true // May be unnecessary
595+
});
593596

594-
// On error, wait a little before retrying
595-
if (shouldDelayRetry) {
596-
await this.delayRetry(nestedAbortController.signal);
597+
// On error, wait a little before retrying
598+
if (shouldDelayRetry) {
599+
await this.delayRetry(nestedAbortController.signal);
600+
}
597601
}
598602
}
599603
}
@@ -641,8 +645,11 @@ The next upload iteration will be delayed.`);
641645
}
642646
}
643647

644-
protected async streamingSyncIteration(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
645-
await this.obtainLock({
648+
protected streamingSyncIteration(
649+
signal: AbortSignal,
650+
options?: PowerSyncConnectionOptions
651+
): Promise<RustIterationResult | null> {
652+
return this.obtainLock({
646653
type: LockType.SYNC,
647654
signal,
648655
callback: async () => {
@@ -655,9 +662,10 @@ The next upload iteration will be delayed.`);
655662

656663
if (clientImplementation == SyncClientImplementation.JAVASCRIPT) {
657664
await this.legacyStreamingSyncIteration(signal, resolvedOptions);
665+
return null;
658666
} else {
659667
await this.requireKeyFormat(true);
660-
await this.rustSyncIteration(signal, resolvedOptions);
668+
return await this.rustSyncIteration(signal, resolvedOptions);
661669
}
662670
}
663671
});
@@ -912,12 +920,16 @@ The next upload iteration will be delayed.`);
912920
return;
913921
}
914922

915-
private async rustSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) {
923+
private async rustSyncIteration(
924+
signal: AbortSignal,
925+
resolvedOptions: RequiredPowerSyncConnectionOptions
926+
): Promise<RustIterationResult> {
916927
const syncImplementation = this;
917928
const adapter = this.options.adapter;
918929
const remote = this.options.remote;
919930
let receivingLines: Promise<void> | null = null;
920931
let hadSyncLine = false;
932+
let hideDisconnectOnRestart = false;
921933

922934
if (signal.aborted) {
923935
throw new AbortOperation('Connection request has been aborted');
@@ -1046,6 +1058,7 @@ The next upload iteration will be delayed.`);
10461058
}
10471059
} else if ('CloseSyncStream' in instruction) {
10481060
abortController.abort();
1061+
hideDisconnectOnRestart = instruction.CloseSyncStream.hide_disconnect;
10491062
} else if ('FlushFileSystem' in instruction) {
10501063
// Not necessary on JS platforms.
10511064
} else if ('DidCompleteSync' in instruction) {
@@ -1084,7 +1097,7 @@ The next upload iteration will be delayed.`);
10841097
if (controlInvocations && !controlInvocations?.closed) {
10851098
controlInvocations.enqueueData({
10861099
command: PowerSyncControlCommand.UPDATE_SUBSCRIPTIONS,
1087-
payload: JSON.stringify({ active_streams: this.activeStreams })
1100+
payload: JSON.stringify(this.activeStreams)
10881101
});
10891102
}
10901103
};
@@ -1093,6 +1106,8 @@ The next upload iteration will be delayed.`);
10931106
this.notifyCompletedUploads = this.handleActiveStreamsChange = undefined;
10941107
await stop();
10951108
}
1109+
1110+
return { immediateRestart: hideDisconnectOnRestart };
10961111
}
10971112

10981113
private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) {
@@ -1228,3 +1243,7 @@ interface EnqueuedCommand {
12281243
command: PowerSyncControlCommand;
12291244
payload?: Uint8Array | string;
12301245
}
1246+
1247+
interface RustIterationResult {
1248+
immediateRestart: boolean;
1249+
}

packages/common/src/client/sync/stream/core-instruction.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ export function coreStatusToJs(status: CoreSyncStatus): sync_status.SyncStatusOp
8787
connecting: status.connecting,
8888
dataFlow: {
8989
downloading: status.downloading != null,
90-
downloadProgress: status.downloading?.buckets
90+
downloadProgress: status.downloading?.buckets,
91+
internalStreamSubscriptions: status.streams
9192
},
9293
lastSyncedAt: completeSync?.lastSyncedAt,
9394
hasSynced: completeSync?.hasSynced,

packages/common/src/db/crud/SyncStatus.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js';
22
import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js';
33
import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js';
4-
import { SyncStreamDescription, SyncSubscriptionDescription } from 'src/client/sync/sync-streams.js';
4+
import { SyncStreamDescription, SyncSubscriptionDescription } from '../../client/sync/sync-streams.js';
55

66
export type SyncDataFlowStatus = Partial<{
77
downloading: boolean;

packages/node/tests/stream.test.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import { describe, vi, expect, beforeEach } from 'vitest';
2+
import { SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common';
3+
import Logger from 'js-logger';
4+
import { bucket, checkpoint, mockSyncServiceTest, nextStatus, stream, TestConnector } from './utils';
5+
6+
Logger.useDefaults({ defaultLevel: Logger.WARN });
7+
8+
describe('Sync streams', () => {
9+
const defaultOptions = {
10+
clientImplementation: SyncClientImplementation.RUST,
11+
connectionMethod: SyncStreamConnectionMethod.HTTP
12+
};
13+
14+
mockSyncServiceTest('can disable default streams', async ({ syncService }) => {
15+
const database = await syncService.createDatabase();
16+
database.connect(new TestConnector(), {
17+
includeDefaultStreams: false,
18+
...defaultOptions
19+
});
20+
21+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
22+
expect(syncService.connectedListeners[0]).toMatchObject({
23+
streams: {
24+
include_defaults: false,
25+
subscriptions: []
26+
}
27+
});
28+
});
29+
30+
mockSyncServiceTest('subscribes with streams', async ({ syncService }) => {
31+
const database = await syncService.createDatabase();
32+
const a = await database.syncStream('stream', { foo: 'a' }).subscribe();
33+
const b = await database.syncStream('stream', { foo: 'b' }).subscribe({ priority: 1 });
34+
35+
database.connect(new TestConnector(), defaultOptions);
36+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
37+
38+
expect(syncService.connectedListeners[0]).toMatchObject({
39+
streams: {
40+
include_defaults: true,
41+
subscriptions: [
42+
{
43+
stream: 'stream',
44+
parameters: { foo: 'a' },
45+
override_priority: null
46+
},
47+
{
48+
stream: 'stream',
49+
parameters: { foo: 'b' },
50+
override_priority: 1
51+
}
52+
]
53+
}
54+
});
55+
56+
let statusPromise = nextStatus(database);
57+
syncService.pushLine(
58+
checkpoint({
59+
last_op_id: 0,
60+
buckets: [
61+
bucket('a', 0, { priority: 3, subscriptions: [{ sub: 0 }] }),
62+
bucket('b', 0, { priority: 1, subscriptions: [{ sub: 1 }] })
63+
],
64+
streams: [stream('stream', false)]
65+
})
66+
);
67+
let status = await statusPromise;
68+
for (const subscription of [a, b]) {
69+
expect(status.statusFor(subscription).subscription.active).toBeTruthy();
70+
expect(status.statusFor(subscription).subscription.lastSyncedAt).toBeNull();
71+
expect(status.statusFor(subscription).subscription.hasExplicitSubscription).toBeTruthy();
72+
}
73+
74+
statusPromise = nextStatus(database);
75+
syncService.pushLine({ partial_checkpoint_complete: { last_op_id: '0', priority: 1 } });
76+
status = await statusPromise;
77+
expect(status.statusFor(a).subscription.lastSyncedAt).toBeNull();
78+
expect(status.statusFor(b).subscription.lastSyncedAt).not.toBeNull();
79+
await b.waitForFirstSync();
80+
81+
syncService.pushLine({ checkpoint_complete: { last_op_id: '0' } });
82+
await a.waitForFirstSync();
83+
});
84+
85+
mockSyncServiceTest('reports default streams', async ({ syncService }) => {
86+
const database = await syncService.createDatabase();
87+
database.connect(new TestConnector(), defaultOptions);
88+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
89+
90+
let statusPromise = nextStatus(database);
91+
syncService.pushLine(
92+
checkpoint({
93+
last_op_id: 0,
94+
buckets: [],
95+
streams: [stream('default_stream', true)]
96+
})
97+
);
98+
let status = await statusPromise;
99+
100+
expect(status.subscriptions).toHaveLength(1);
101+
expect(status.subscriptions[0]).toMatchObject({
102+
subscription: {
103+
name: 'default_stream',
104+
parameters: null,
105+
isDefault: true,
106+
hasExplicitSubscription: false
107+
}
108+
});
109+
});
110+
111+
mockSyncServiceTest('changes subscriptions dynamically', async ({ syncService }) => {
112+
const database = await syncService.createDatabase();
113+
database.connect(new TestConnector(), defaultOptions);
114+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
115+
116+
syncService.pushLine(
117+
checkpoint({
118+
last_op_id: 0,
119+
buckets: []
120+
})
121+
);
122+
const subscription = await database.syncStream('a').subscribe();
123+
124+
await vi.waitFor(() =>
125+
expect(syncService.connectedListeners[0]).toMatchObject({
126+
streams: {
127+
include_defaults: true,
128+
subscriptions: [
129+
{
130+
stream: 'a',
131+
parameters: null,
132+
override_priority: null
133+
}
134+
]
135+
}
136+
})
137+
);
138+
139+
// Given that the subscription has a TTL, dropping the handle should not re-subscribe.
140+
subscription.unsubscribe();
141+
await new Promise((r) => setTimeout(r, 100));
142+
expect(syncService.connectedListeners[0].streams.subscriptions).toHaveLength(1);
143+
});
144+
145+
mockSyncServiceTest('subscriptions update while offline', async ({ syncService }) => {
146+
const database = await syncService.createDatabase();
147+
148+
let statusPromise = nextStatus(database);
149+
const subscription = await database.syncStream('foo').subscribe();
150+
let status = await statusPromise;
151+
expect(status.statusFor(subscription)).not.toBeNull();
152+
});
153+
});

packages/node/tests/sync.test.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { describe, vi, expect, beforeEach } from 'vitest';
22
import util from 'node:util';
33

4-
import { MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
4+
import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
55
import {
66
AbstractPowerSyncDatabase,
77
BucketChecksum,
@@ -925,15 +925,6 @@ function defineSyncTests(impl: SyncClientImplementation) {
925925
});
926926
}
927927

928-
function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {
929-
return {
930-
bucket: name,
931-
count,
932-
checksum: 0,
933-
priority: options.priority
934-
};
935-
}
936-
937928
async function waitForProgress(
938929
database: AbstractPowerSyncDatabase,
939930
total: [number, number],

0 commit comments

Comments
 (0)