Skip to content

Commit 409cb76

Browse files
js2702simolus3
authored andcommitted
pass schema to sync worker
1 parent 3e98407 commit 409cb76

File tree

7 files changed

+40
-21
lines changed

7 files changed

+40
-21
lines changed

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'dart:isolate';
34
import 'package:meta/meta.dart';
45

@@ -266,7 +267,7 @@ class PowerSyncDatabaseImpl
266267
options,
267268
crudMutex.shared,
268269
syncMutex.shared,
269-
schema,
270+
jsonEncode(schema),
270271
),
271272
debugName: 'Sync ${database.openFactory.path}',
272273
onError: receiveUnhandledErrors.sendPort,
@@ -310,15 +311,15 @@ class _PowerSyncDatabaseIsolateArgs {
310311
final ResolvedSyncOptions options;
311312
final SerializedMutex crudMutex;
312313
final SerializedMutex syncMutex;
313-
final Schema schema;
314+
final String schemaJson;
314315

315316
_PowerSyncDatabaseIsolateArgs(
316317
this.sPort,
317318
this.dbRef,
318319
this.options,
319320
this.crudMutex,
320321
this.syncMutex,
321-
this.schema,
322+
this.schemaJson,
322323
);
323324
}
324325

@@ -414,7 +415,7 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
414415
final storage = BucketStorage(connection);
415416
final sync = StreamingSyncImplementation(
416417
adapter: storage,
417-
schema: args.schema,
418+
schemaJson: args.schemaJson,
418419
connector: InternalConnector(
419420
getCredentialsCached: getCredentialsCached,
420421
prefetchCredentials: prefetchCredentials,

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'package:meta/meta.dart';
34
import 'package:http/browser_client.dart';
45
import 'package:logging/logging.dart';
@@ -159,7 +160,7 @@ class PowerSyncDatabaseImpl
159160

160161
sync = StreamingSyncImplementation(
161162
adapter: storage,
162-
schema: schema,
163+
schemaJson: jsonEncode(schema),
163164
connector: InternalConnector.wrap(connector, this),
164165
crudUpdateTriggerStream: crudStream,
165166
options: options,

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import 'dart:typed_data';
55
import 'package:http/http.dart' as http;
66
import 'package:logging/logging.dart';
77
import 'package:meta/meta.dart';
8-
import 'package:powersync_core/powersync_core.dart';
98
import 'package:powersync_core/src/abort_controller.dart';
109
import 'package:powersync_core/src/exceptions.dart';
1110
import 'package:powersync_core/src/log_internal.dart';
@@ -33,7 +32,7 @@ abstract interface class StreamingSync {
3332

3433
@internal
3534
class StreamingSyncImplementation implements StreamingSync {
36-
final Schema? schema; //TODO(SkillDevs): pass in all implementations
35+
final String schemaJson;
3736
final BucketStorage adapter;
3837
final InternalConnector connector;
3938
final ResolvedSyncOptions options;
@@ -64,7 +63,7 @@ class StreamingSyncImplementation implements StreamingSync {
6463
String? clientId;
6564

6665
StreamingSyncImplementation({
67-
required this.schema,
66+
required this.schemaJson,
6867
required this.adapter,
6968
required this.connector,
7069
required this.crudUpdateTriggerStream,
@@ -602,7 +601,7 @@ final class _ActiveRustStreamingIteration {
602601
'start',
603602
convert.json.encode({
604603
'parameters': sync.options.params,
605-
'schema': sync.schema,
604+
'schema': convert.json.decode(sync.schemaJson),
606605
}),
607606
);
608607
assert(_completedStream.isCompleted, 'Should have started streaming');

packages/powersync_core/lib/src/web/sync_controller.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class SyncWorkerHandle implements StreamingSync {
113113
@override
114114
Future<void> streamingSync() async {
115115
await _channel.startSynchronization(
116-
database.database.openFactory.path, ResolvedSyncOptions(options));
116+
database.database.openFactory.path,
117+
ResolvedSyncOptions(options),
118+
database.schema,
119+
);
117120
}
118121
}

packages/powersync_core/lib/src/web/sync_worker.dart

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,16 @@ class _SyncWorker {
4545
});
4646
}
4747

48-
_SyncRunner referenceSyncTask(
49-
String databaseIdentifier, SyncOptions options, _ConnectedClient client) {
48+
_SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options,
49+
String schemaJson, _ConnectedClient client) {
5050
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
5151
return _SyncRunner(databaseIdentifier);
5252
})
53-
..registerClient(client, options);
53+
..registerClient(
54+
client,
55+
options,
56+
schemaJson,
57+
);
5458
}
5559
}
5660

@@ -86,8 +90,8 @@ class _ConnectedClient {
8690
},
8791
);
8892

89-
_runner = _worker.referenceSyncTask(
90-
request.databaseName, recoveredOptions, this);
93+
_runner = _worker.referenceSyncTask(request.databaseName,
94+
recoveredOptions, request.schemaJson, this);
9195
return (JSObject(), null);
9296
case SyncWorkerMessageType.abortSynchronization:
9397
_runner?.disconnectClient(this);
@@ -128,6 +132,7 @@ class _ConnectedClient {
128132
class _SyncRunner {
129133
final String identifier;
130134
ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions());
135+
String schemaJson = '{}';
131136

132137
final StreamGroup<_RunnerEvent> _group = StreamGroup();
133138
final StreamController<_RunnerEvent> _mainEvents = StreamController();
@@ -146,10 +151,12 @@ class _SyncRunner {
146151
case _AddConnection(
147152
:final client,
148153
:final options,
154+
:final schemaJson,
149155
):
150156
connections.add(client);
151157
final (newOptions, reconnect) = this.options.applyFrom(options);
152158
this.options = newOptions;
159+
this.schemaJson = schemaJson;
153160

154161
if (sync == null) {
155162
await _requestDatabase(client);
@@ -264,7 +271,7 @@ class _SyncRunner {
264271

265272
sync = StreamingSyncImplementation(
266273
adapter: WebBucketStorage(database),
267-
schema: null,
274+
schemaJson: client._runner!.schemaJson,
268275
connector: InternalConnector(
269276
getCredentialsCached: client.channel.credentialsCallback,
270277
prefetchCredentials: ({required bool invalidate}) async {
@@ -287,8 +294,9 @@ class _SyncRunner {
287294
sync!.streamingSync();
288295
}
289296

290-
void registerClient(_ConnectedClient client, SyncOptions options) {
291-
_mainEvents.add(_AddConnection(client, options));
297+
void registerClient(
298+
_ConnectedClient client, SyncOptions options, String schemaJson) {
299+
_mainEvents.add(_AddConnection(client, options, schemaJson));
292300
}
293301

294302
/// Remove a client, disconnecting if no clients remain..
@@ -307,8 +315,9 @@ sealed class _RunnerEvent {}
307315
final class _AddConnection implements _RunnerEvent {
308316
final _ConnectedClient client;
309317
final SyncOptions options;
318+
final String schemaJson;
310319

311-
_AddConnection(this.client, this.options);
320+
_AddConnection(this.client, this.options, this.schemaJson);
312321
}
313322

314323
final class _RemoveConnection implements _RunnerEvent {

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:convert';
33
import 'dart:js_interop';
44

55
import 'package:logging/logging.dart';
6+
import 'package:powersync_core/src/schema.dart';
67
import 'package:powersync_core/src/sync/options.dart';
78
import 'package:web/web.dart';
89

@@ -71,6 +72,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
7172
required int requestId,
7273
required int retryDelayMs,
7374
required String implementationName,
75+
required String schemaJson,
7476
String? syncParamsEncoded,
7577
});
7678

@@ -79,6 +81,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
7981
external int get crudThrottleTimeMs;
8082
external int? get retryDelayMs;
8183
external String? get implementationName;
84+
external String get schemaJson;
8285
external String? get syncParamsEncoded;
8386
}
8487

@@ -410,7 +413,7 @@ final class WorkerCommunicationChannel {
410413
}
411414

412415
Future<void> startSynchronization(
413-
String databaseName, ResolvedSyncOptions options) async {
416+
String databaseName, ResolvedSyncOptions options, Schema schema) async {
414417
final (id, completion) = _newRequest();
415418
port.postMessage(SyncWorkerMessage(
416419
type: SyncWorkerMessageType.startSynchronization.name,
@@ -420,6 +423,7 @@ final class WorkerCommunicationChannel {
420423
retryDelayMs: options.retryDelay.inMilliseconds,
421424
requestId: id,
422425
implementationName: options.source.syncImplementation.name,
426+
schemaJson: jsonEncode(schema),
423427
syncParamsEncoded: switch (options.source.params) {
424428
null => null,
425429
final params => jsonEncode(params),

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:convert';
2+
13
import 'package:http/http.dart';
24
import 'package:logging/logging.dart';
35
import 'package:powersync_core/powersync_core.dart';
@@ -154,7 +156,7 @@ extension MockSync on PowerSyncDatabase {
154156
}) {
155157
final impl = StreamingSyncImplementation(
156158
adapter: BucketStorage(this),
157-
schema: schema,
159+
schemaJson: jsonEncode(schema),
158160
client: client,
159161
options: ResolvedSyncOptions(options),
160162
connector: InternalConnector.wrap(connector, this),

0 commit comments

Comments
 (0)