Skip to content

Commit 8b5c59f

Browse files
committed
Fix more tests
1 parent 7d1d378 commit 8b5c59f

13 files changed

+203
-76
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ import { Schema } from '../db/schema/Schema.js';
1515
import { BaseObserver } from '../utils/BaseObserver.js';
1616
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1717
import { symbolAsyncIterator, throttleTrailing } from '../utils/async.js';
18-
import { ConnectionManager, CreateSyncImplementationOptions } from './ConnectionManager.js';
18+
import {
19+
ConnectionManager,
20+
CreateSyncImplementationOptions,
21+
InternalSubscriptionAdapter
22+
} from './ConnectionManager.js';
1923
import { CustomQuery } from './CustomQuery.js';
2024
import { ArrayQueryDefinition, Query } from './Query.js';
2125
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
@@ -183,6 +187,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
183187
protected bucketStorageAdapter: BucketStorageAdapter;
184188
protected _isReadyPromise: Promise<void>;
185189
protected connectionManager: ConnectionManager;
190+
private subscriptions: InternalSubscriptionAdapter;
186191

187192
get syncStreamImplementation() {
188193
return this.connectionManager.syncStreamImplementation;
@@ -237,14 +242,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
237242
this.runExclusiveMutex = new Mutex();
238243

239244
// Start async init
240-
this.connectionManager = new ConnectionManager({
241-
firstStatusMatching: (predicate) => this.waitForStatus(predicate),
245+
this.subscriptions = {
246+
firstStatusMatching: (predicate, abort) => this.waitForStatus(predicate, abort),
242247
resolveOfflineSyncStatus: () => this.resolveOfflineSyncStatus(),
243248
rustSubscriptionsCommand: async (payload) => {
244249
await this.writeTransaction((tx) => {
245250
return tx.execute('select powersync_control(?,?)', ['subscriptions', JSON.stringify(payload)]);
246251
});
247-
},
252+
}
253+
};
254+
this.connectionManager = new ConnectionManager({
248255
createSyncImplementation: async (connector, options) => {
249256
await this.waitForReady();
250257
return this.runExclusive(async () => {
@@ -538,7 +545,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
538545
}
539546

540547
syncStream(name: string, params?: Record<string, any>): SyncStream {
541-
return this.connectionManager.stream(name, params ?? null);
548+
return this.connectionManager.stream(this.subscriptions, name, params ?? null);
542549
}
543550

544551
/**

packages/common/src/client/ConnectionManager.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ export interface CreateSyncImplementationOptions extends AdditionalConnectionOpt
3636
subscriptions: SubscribedStream[];
3737
}
3838

39+
export interface InternalSubscriptionAdapter {
40+
firstStatusMatching(predicate: (status: SyncStatus) => any, abort?: AbortSignal): Promise<void>;
41+
resolveOfflineSyncStatus(): Promise<void>;
42+
rustSubscriptionsCommand(payload: any): Promise<void>;
43+
}
44+
3945
/**
4046
* @internal
4147
*/
@@ -44,9 +50,7 @@ export interface ConnectionManagerOptions {
4450
connector: PowerSyncBackendConnector,
4551
options: CreateSyncImplementationOptions
4652
): Promise<ConnectionManagerSyncImplementationResult>;
47-
firstStatusMatching(predicate: (status: SyncStatus) => any): Promise<void>;
48-
resolveOfflineSyncStatus(): Promise<void>;
49-
rustSubscriptionsCommand(payload: any): Promise<void>;
53+
5054
logger: ILogger;
5155
}
5256

@@ -269,19 +273,19 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
269273
await disposer?.();
270274
}
271275

272-
stream(name: string, parameters: Record<string, any> | null): SyncStream {
276+
stream(adapter: InternalSubscriptionAdapter, name: string, parameters: Record<string, any> | null): SyncStream {
273277
const desc = { name, parameters } satisfies SyncStreamDescription;
274278

275-
const waitForFirstSync = () => {
276-
return this.options.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced ?? false);
279+
const waitForFirstSync = (abort?: AbortSignal) => {
280+
return adapter.firstStatusMatching((s) => s.statusFor(desc)?.subscription.hasSynced, abort);
277281
};
278282

279283
return {
280284
...desc,
281285
subscribe: async (options?: SyncStreamSubscribeOptions) => {
282286
// NOTE: We also run this command if a subscription already exists, because this increases the expiry date
283287
// (relevant if the app is closed before connecting again, where the last subscribe call determines the ttl).
284-
await this.options.rustSubscriptionsCommand({
288+
await adapter.rustSubscriptionsCommand({
285289
subscribe: {
286290
stream: {
287291
name,
@@ -295,7 +299,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
295299
if (!this.syncStreamImplementation) {
296300
// We're not connected. So, update the offline sync status to reflect the new subscription.
297301
// (With an active iteration, the sync client would include it in its state).
298-
await this.options.resolveOfflineSyncStatus();
302+
await adapter.resolveOfflineSyncStatus();
299303
}
300304

301305
const key = `${name}|${JSON.stringify(parameters)}`;
@@ -314,7 +318,7 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
314318
return new SyncStreamSubscriptionHandle(subscription);
315319
},
316320
unsubscribeAll: async () => {
317-
await this.options.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } });
321+
await adapter.rustSubscriptionsCommand({ unsubscribe: { name, params: parameters } });
318322
this.subscriptionsMayHaveChanged();
319323
}
320324
};
@@ -337,7 +341,7 @@ class ActiveSubscription {
337341
constructor(
338342
readonly name: string,
339343
readonly parameters: Record<string, any> | null,
340-
readonly waitForFirstSync: () => Promise<void>,
344+
readonly waitForFirstSync: (abort?: AbortSignal) => Promise<void>,
341345
private clearSubscription: () => void
342346
) {}
343347

@@ -362,8 +366,8 @@ class SyncStreamSubscriptionHandle implements SyncStreamSubscription {
362366
return this.subscription.parameters;
363367
}
364368

365-
waitForFirstSync(): Promise<void> {
366-
return this.subscription.waitForFirstSync();
369+
waitForFirstSync(abort?: AbortSignal): Promise<void> {
370+
return this.subscription.waitForFirstSync(abort);
367371
}
368372

369373
unsubscribe(): void {

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ export interface LockOptions<T> {
9595
signal?: AbortSignal;
9696
}
9797

98-
export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions {
98+
export interface AbstractStreamingSyncImplementationOptions extends RequiredAdditionalConnectionOptions {
9999
adapter: BucketStorageAdapter;
100100
subscriptions: SubscribedStream[];
101101
uploadCrud: () => Promise<void>;
@@ -185,7 +185,9 @@ export interface AdditionalConnectionOptions {
185185
}
186186

187187
/** @internal */
188-
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>;
188+
export interface RequiredAdditionalConnectionOptions extends Required<AdditionalConnectionOptions> {
189+
subscriptions: SubscribedStream[];
190+
}
189191

190192
export interface StreamingSyncImplementation
191193
extends BaseObserverInterface<StreamingSyncImplementationListener>,
@@ -265,7 +267,7 @@ export abstract class AbstractStreamingSyncImplementation
265267

266268
constructor(options: AbstractStreamingSyncImplementationOptions) {
267269
super();
268-
this.options = { ...DEFAULT_STREAMING_SYNC_OPTIONS, ...options };
270+
this.options = options;
269271
this.activeStreams = options.subscriptions;
270272
this.logger = options.logger ?? Logger.get('PowerSyncStream');
271273

packages/common/src/client/sync/sync-streams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export interface SyncStreamSubscription extends SyncStreamDescription {
9898
/**
9999
* A promise that resolves once data from in this sync stream has been synced and applied.
100100
*/
101-
waitForFirstSync(): Promise<void>;
101+
waitForFirstSync(abort?: AbortSignal): Promise<void>;
102102

103103
/**
104104
* Removes this stream subscription.

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import {
44
AbstractStreamingSyncImplementation,
55
AdditionalConnectionOptions,
66
BucketStorageAdapter,
7-
CreateSyncImplementationOptions,
87
DBAdapter,
98
PowerSyncBackendConnector,
109
PowerSyncConnectionOptions,
1110
PowerSyncDatabaseOptions,
1211
PowerSyncDatabaseOptionsWithSettings,
12+
RequiredAdditionalConnectionOptions,
1313
SqliteBucketStorage,
1414
SQLOpenFactory
1515
} from '@powersync/common';
@@ -77,7 +77,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
7777

7878
protected generateSyncStreamImplementation(
7979
connector: PowerSyncBackendConnector,
80-
options: CreateSyncImplementationOptions & NodeAdditionalConnectionOptions
80+
options: RequiredAdditionalConnectionOptions & NodeAdditionalConnectionOptions
8181
): AbstractStreamingSyncImplementation {
8282
const logger = this.logger;
8383
const remote = new NodeRemote(connector, logger, {
@@ -86,15 +86,13 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
8686
});
8787

8888
return new NodeStreamingSyncImplementation({
89-
subscriptions: options.subscriptions,
9089
adapter: this.bucketStorageAdapter,
9190
remote,
9291
uploadCrud: async () => {
9392
await this.waitForReady();
9493
await connector.uploadData(this);
9594
},
96-
retryDelayMs: this.options.retryDelayMs,
97-
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
95+
...options,
9896
identifier: this.database.name,
9997
logger
10098
});

packages/web/src/db/PowerSyncDatabase.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
189189
const remote = new WebRemote(connector, this.logger);
190190
const syncOptions: WebStreamingSyncImplementationOptions = {
191191
...(this.options as {}),
192-
retryDelayMs: options.retryDelayMs,
193-
crudUploadThrottleMs: options.crudUploadThrottleMs,
192+
...options,
194193
flags: this.resolvedFlags,
195194
adapter: this.bucketStorageAdapter,
196195
remote,

packages/web/src/db/sync/SSRWebStreamingSyncImplementation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
LockType,
66
PowerSyncConnectionOptions,
77
StreamingSyncImplementation,
8+
SubscribedStream,
89
SyncStatus,
910
SyncStatusOptions
1011
} from '@powersync/common';
@@ -80,4 +81,9 @@ export class SSRStreamingSyncImplementation extends BaseObserver implements Stre
8081
* This is a no-op in SSR mode.
8182
*/
8283
triggerCrudUpload() {}
84+
85+
/**
86+
* No-op in SSR mode.
87+
*/
88+
updateSubscriptions(): void {}
8389
}

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
1+
import {
2+
PowerSyncConnectionOptions,
3+
PowerSyncCredentials,
4+
SubscribedStream,
5+
SyncStatus,
6+
SyncStatusOptions
7+
} from '@powersync/common';
28
import * as Comlink from 'comlink';
39
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
410
import {
@@ -12,6 +18,7 @@ import {
1218
WebStreamingSyncImplementation,
1319
WebStreamingSyncImplementationOptions
1420
} from './WebStreamingSyncImplementation';
21+
import { WorkerClient } from '../../worker/sync/WorkerClient';
1522

1623
/**
1724
* The shared worker will trigger methods on this side of the message port
@@ -94,8 +101,11 @@ export interface SharedWebStreamingSyncImplementationOptions extends WebStreamin
94101
db: WebDBAdapter;
95102
}
96103

104+
/**
105+
* The local part of the sync implementation on the web, which talks to a sync implementation hosted in a shared worker.
106+
*/
97107
export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplementation {
98-
protected syncManager: Comlink.Remote<SharedSyncImplementation>;
108+
protected syncManager: Comlink.Remote<WorkerClient>;
99109
protected clientProvider: SharedSyncClientProvider;
100110
protected messagePort: MessagePort;
101111

@@ -138,7 +148,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
138148
).port;
139149
}
140150

141-
this.syncManager = Comlink.wrap<SharedSyncImplementation>(this.messagePort);
151+
this.syncManager = Comlink.wrap<WorkerClient>(this.messagePort);
142152
this.syncManager.setLogLevel(this.logger.getLevel());
143153

144154
this.triggerCrudUpload = this.syncManager.triggerCrudUpload;
@@ -152,15 +162,18 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
152162
const { crudUploadThrottleMs, identifier, retryDelayMs } = this.options;
153163
const flags = { ...this.webOptions.flags, workers: undefined };
154164

155-
this.isInitialized = this.syncManager.setParams({
156-
dbParams: this.dbAdapter.getConfiguration(),
157-
streamOptions: {
158-
crudUploadThrottleMs,
159-
identifier,
160-
retryDelayMs,
161-
flags: flags
162-
}
163-
});
165+
this.isInitialized = this.syncManager.setParams(
166+
{
167+
dbParams: this.dbAdapter.getConfiguration(),
168+
streamOptions: {
169+
crudUploadThrottleMs,
170+
identifier,
171+
retryDelayMs,
172+
flags: flags
173+
}
174+
},
175+
options.subscriptions
176+
);
164177

165178
/**
166179
* Pass along any sync status updates to this listener
@@ -235,6 +248,10 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
235248
return this.isInitialized;
236249
}
237250

251+
updateSubscriptions(subscriptions: SubscribedStream[]): void {
252+
this.syncManager.updateSubscriptions(subscriptions);
253+
}
254+
238255
/**
239256
* Used in tests to force a connection states
240257
*/

0 commit comments

Comments
 (0)