diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 2f846fd5..d55b69db 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'dart:isolate'; import 'package:meta/meta.dart'; @@ -83,8 +84,12 @@ class PowerSyncDatabaseImpl DefaultSqliteOpenFactory factory = // ignore: deprecated_member_use_from_same_package PowerSyncOpenFactory(path: path, sqliteSetup: sqliteSetup); - return PowerSyncDatabaseImpl.withFactory(factory, - schema: schema, maxReaders: maxReaders, logger: logger); + return PowerSyncDatabaseImpl.withFactory( + factory, + schema: schema, + maxReaders: maxReaders, + logger: logger, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -96,13 +101,17 @@ class PowerSyncDatabaseImpl /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. factory PowerSyncDatabaseImpl.withFactory( - DefaultSqliteOpenFactory openFactory, - {required Schema schema, - int maxReaders = SqliteDatabase.defaultMaxReaders, - Logger? logger}) { + DefaultSqliteOpenFactory openFactory, { + required Schema schema, + int maxReaders = SqliteDatabase.defaultMaxReaders, + Logger? logger, + }) { final db = SqliteDatabase.withFactory(openFactory, maxReaders: maxReaders); return PowerSyncDatabaseImpl.withDatabase( - schema: schema, database: db, logger: logger); + schema: schema, + database: db, + logger: logger, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -110,8 +119,11 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds.s - PowerSyncDatabaseImpl.withDatabase( - {required this.schema, required this.database, Logger? logger}) { + PowerSyncDatabaseImpl.withDatabase({ + required this.schema, + required this.database, + Logger? logger, + }) { this.logger = logger ?? autoLogger; isInitialized = baseInit(); } @@ -247,6 +259,7 @@ class PowerSyncDatabaseImpl options, crudMutex.shared, syncMutex.shared, + jsonEncode(schema), ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, @@ -290,6 +303,7 @@ class _PowerSyncDatabaseIsolateArgs { final ResolvedSyncOptions options; final SerializedMutex crudMutex; final SerializedMutex syncMutex; + final String schemaJson; _PowerSyncDatabaseIsolateArgs( this.sPort, @@ -297,6 +311,7 @@ class _PowerSyncDatabaseIsolateArgs { this.options, this.crudMutex, this.syncMutex, + this.schemaJson, ); } @@ -392,6 +407,7 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { final storage = BucketStorage(connection); final sync = StreamingSyncImplementation( adapter: storage, + schemaJson: args.schemaJson, connector: InternalConnector( getCredentialsCached: getCredentialsCached, prefetchCredentials: prefetchCredentials, diff --git a/packages/powersync_core/lib/src/database/powersync_database.dart b/packages/powersync_core/lib/src/database/powersync_database.dart index 796bdefc..4de7ea92 100644 --- a/packages/powersync_core/lib/src/database/powersync_database.dart +++ b/packages/powersync_core/lib/src/database/powersync_database.dart @@ -32,19 +32,21 @@ abstract class PowerSyncDatabase /// A maximum of [maxReaders] concurrent read transactions are allowed. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - factory PowerSyncDatabase( - {required Schema schema, - required String path, - Logger? logger, - @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") - // ignore: deprecated_member_use_from_same_package - SqliteConnectionSetup? sqliteSetup}) { + factory PowerSyncDatabase({ + required Schema schema, + required String path, + Logger? logger, + @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") + // ignore: deprecated_member_use_from_same_package + SqliteConnectionSetup? sqliteSetup, + }) { return PowerSyncDatabaseImpl( - schema: schema, - path: path, - logger: logger, - // ignore: deprecated_member_use_from_same_package - sqliteSetup: sqliteSetup); + schema: schema, + path: path, + logger: logger, + // ignore: deprecated_member_use_from_same_package + sqliteSetup: sqliteSetup, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -55,12 +57,18 @@ abstract class PowerSyncDatabase /// Subclass [PowerSyncOpenFactory] to add custom logic to this process. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - factory PowerSyncDatabase.withFactory(DefaultSqliteOpenFactory openFactory, - {required Schema schema, - int maxReaders = SqliteDatabase.defaultMaxReaders, - Logger? logger}) { - return PowerSyncDatabaseImpl.withFactory(openFactory, - schema: schema, maxReaders: maxReaders, logger: logger); + factory PowerSyncDatabase.withFactory( + DefaultSqliteOpenFactory openFactory, { + required Schema schema, + int maxReaders = SqliteDatabase.defaultMaxReaders, + Logger? logger, + }) { + return PowerSyncDatabaseImpl.withFactory( + openFactory, + schema: schema, + maxReaders: maxReaders, + logger: logger, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 2a795497..a4f0b419 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -82,10 +82,11 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds.s - factory PowerSyncDatabaseImpl.withDatabase( - {required Schema schema, - required SqliteDatabase database, - Logger? logger}) { + factory PowerSyncDatabaseImpl.withDatabase({ + required Schema schema, + required SqliteDatabase database, + Logger? logger, + }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 808efc71..dc4b2ddb 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -297,6 +297,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { params: params, ); + if (schema.rawTables.isNotEmpty && + resolvedOptions.source.syncImplementation != + SyncClientImplementation.rust) { + throw UnsupportedError( + 'Raw tables are only supported by the Rust client.'); + } + // ignore: deprecated_member_use_from_same_package clientParams = params; var thisConnectAborter = AbortController(); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 6b40a6a2..4af2821e 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:meta/meta.dart'; import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; @@ -75,8 +76,12 @@ class PowerSyncDatabaseImpl SqliteConnectionSetup? sqliteSetup}) { // ignore: deprecated_member_use_from_same_package DefaultSqliteOpenFactory factory = PowerSyncOpenFactory(path: path); - return PowerSyncDatabaseImpl.withFactory(factory, - maxReaders: maxReaders, logger: logger, schema: schema); + return PowerSyncDatabaseImpl.withFactory( + factory, + maxReaders: maxReaders, + logger: logger, + schema: schema, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -94,7 +99,10 @@ class PowerSyncDatabaseImpl Logger? logger}) { final db = SqliteDatabase.withFactory(openFactory, maxReaders: 1); return PowerSyncDatabaseImpl.withDatabase( - schema: schema, logger: logger, database: db); + schema: schema, + logger: logger, + database: db, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -102,8 +110,11 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - PowerSyncDatabaseImpl.withDatabase( - {required this.schema, required this.database, Logger? logger}) { + PowerSyncDatabaseImpl.withDatabase({ + required this.schema, + required this.database, + Logger? logger, + }) { if (logger != null) { this.logger = logger; } else { @@ -141,6 +152,7 @@ class PowerSyncDatabaseImpl sync = StreamingSyncImplementation( adapter: storage, + schemaJson: jsonEncode(schema), connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, diff --git a/packages/powersync_core/lib/src/schema.dart b/packages/powersync_core/lib/src/schema.dart index 4892ee6c..1289cae0 100644 --- a/packages/powersync_core/lib/src/schema.dart +++ b/packages/powersync_core/lib/src/schema.dart @@ -7,11 +7,23 @@ import 'schema_logic.dart'; /// No migrations are required on the client. class Schema { /// List of tables in the schema. + /// + /// When opening a PowerSync database, these tables will be created and + /// migrated automatically. final List tables; - const Schema(this.tables); + /// A list of [RawTable]s in addition to PowerSync-managed [tables]. + /// + /// Raw tables give users full control over the SQLite tables, but that + /// includes the responsibility to create those tables and to write migrations + /// for them. + /// + /// For more information on raw tables, see [RawTable] and [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables). + final List rawTables; + + const Schema(this.tables, {this.rawTables = const []}); - Map toJson() => {'tables': tables}; + Map toJson() => {'raw_tables': rawTables, 'tables': tables}; void validate() { Set tableNames = {}; @@ -315,6 +327,120 @@ class Column { Map toJson() => {'name': name, 'type': type.sqlite}; } +/// A raw table, defined by the user instead of being managed by PowerSync. +/// +/// Any ordinary SQLite table can be defined as a raw table, which enables: +/// +/// - More performant queries, since data is stored in typed rows instead of the +/// schemaless JSON view PowerSync uses by default. +/// - More control over the table, since custom column constraints can be used +/// in its definition. +/// +/// PowerSync doesn't know anything about the internal structure of raw tables - +/// instead, it relies on user-defined [put] and [delete] statements to sync +/// data into them. +/// +/// When using raw tables, you are responsible for creating and migrating them +/// when they've changed. Further, triggers are necessary to collect local +/// writes to those tables. For more information, see +/// [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables). +/// +/// Note that raw tables are only supported by the Rust sync client, which needs +/// to be enabled when connecting with raw tables. +final class RawTable { + /// The name of the table as used by the sync service. + /// + /// This doesn't necessarily have to match the name of the SQLite table that + /// [put] and [delete] write to. Instead, it's used by the sync client to + /// identify which statements to use when it encounters sync operations for + /// this table. + final String name; + + /// A statement responsible for inserting or updating a row in this raw table + /// based on data from the sync service. + /// + /// See [PendingStatement] for details. + final PendingStatement put; + + /// A statement responsible for deleting a row based on its PowerSync id. + /// + /// See [PendingStatement] for details. Note that [PendingStatementValue]s + /// used here must all be [PendingStatementValue.id]. + final PendingStatement delete; + + const RawTable({ + required this.name, + required this.put, + required this.delete, + }); + + Map toJson() => { + 'name': name, + 'put': put, + 'delete': delete, + }; +} + +/// An SQL statement to be run by the sync client against raw tables. +/// +/// Since raw tables are managed by the user, PowerSync can't know how to apply +/// serverside changes to them. These statements bridge raw tables and PowerSync +/// by providing upserts and delete statements. +/// +/// For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables) +final class PendingStatement { + /// The SQL statement to run to upsert or delete data from a raw table. + final String sql; + + /// A list of value identifiers for parameters in [sql]. + /// + /// Put statements can use both [PendingStatementValue.id] and + /// [PendingStatementValue.column], whereas delete statements can only use + /// [PendingStatementValue.id]. + final List params; + + PendingStatement({required this.sql, required this.params}); + + Map toJson() => { + 'sql': sql, + 'params': params, + }; +} + +/// A description of a value that will be resolved in the sync client when +/// running a [PendingStatement] for a [RawTable]. +sealed class PendingStatementValue { + /// A value that is bound to the textual id used in the PowerSync protocol. + factory PendingStatementValue.id() = _PendingStmtValueId; + + /// A value that is bound to the value of a column in a replace (`PUT`) + /// operation of the PowerSync protocol. + factory PendingStatementValue.column(String column) = _PendingStmtValueColumn; + + dynamic toJson(); +} + +class _PendingStmtValueColumn implements PendingStatementValue { + final String column; + const _PendingStmtValueColumn(this.column); + + @override + dynamic toJson() { + return { + 'Column': column, + }; + } +} + +class _PendingStmtValueId implements PendingStatementValue { + const _PendingStmtValueId(); + + @override + dynamic toJson() { + return 'Id'; + } +} + /// Type of column. enum ColumnType { /// TEXT column. diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 1f8091b3..bbdcb85a 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -32,6 +32,7 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { + final String schemaJson; final BucketStorage adapter; final InternalConnector connector; final ResolvedSyncOptions options; @@ -62,6 +63,7 @@ class StreamingSyncImplementation implements StreamingSync { String? clientId; StreamingSyncImplementation({ + required this.schemaJson, required this.adapter, required this.connector, required this.crudUpdateTriggerStream, @@ -596,7 +598,12 @@ final class _ActiveRustStreamingIteration { Future syncIteration() async { try { await _control( - 'start', convert.json.encode({'parameters': sync.options.params})); + 'start', + convert.json.encode({ + 'parameters': sync.options.params, + 'schema': convert.json.decode(sync.schemaJson), + }), + ); assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index 0c26252e..7f05cff3 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -113,6 +113,9 @@ class SyncWorkerHandle implements StreamingSync { @override Future streamingSync() async { await _channel.startSynchronization( - database.database.openFactory.path, ResolvedSyncOptions(options)); + database.database.openFactory.path, + ResolvedSyncOptions(options), + database.schema, + ); } } diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index b5e8ed63..ddc4eaf0 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -45,12 +45,16 @@ class _SyncWorker { }); } - _SyncRunner referenceSyncTask( - String databaseIdentifier, SyncOptions options, _ConnectedClient client) { + _SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options, + String schemaJson, _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { return _SyncRunner(databaseIdentifier); }) - ..registerClient(client, options); + ..registerClient( + client, + options, + schemaJson, + ); } } @@ -86,8 +90,8 @@ class _ConnectedClient { }, ); - _runner = _worker.referenceSyncTask( - request.databaseName, recoveredOptions, this); + _runner = _worker.referenceSyncTask(request.databaseName, + recoveredOptions, request.schemaJson, this); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.disconnectClient(this); @@ -128,6 +132,7 @@ class _ConnectedClient { class _SyncRunner { final String identifier; ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions()); + String schemaJson = '{}'; final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); @@ -146,10 +151,12 @@ class _SyncRunner { case _AddConnection( :final client, :final options, + :final schemaJson, ): connections.add(client); final (newOptions, reconnect) = this.options.applyFrom(options); this.options = newOptions; + this.schemaJson = schemaJson; if (sync == null) { await _requestDatabase(client); @@ -264,6 +271,7 @@ class _SyncRunner { sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), + schemaJson: client._runner!.schemaJson, connector: InternalConnector( getCredentialsCached: client.channel.credentialsCallback, prefetchCredentials: ({required bool invalidate}) async { @@ -286,8 +294,9 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient(_ConnectedClient client, SyncOptions options) { - _mainEvents.add(_AddConnection(client, options)); + void registerClient( + _ConnectedClient client, SyncOptions options, String schemaJson) { + _mainEvents.add(_AddConnection(client, options, schemaJson)); } /// Remove a client, disconnecting if no clients remain.. @@ -306,8 +315,9 @@ sealed class _RunnerEvent {} final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; final SyncOptions options; + final String schemaJson; - _AddConnection(this.client, this.options); + _AddConnection(this.client, this.options, this.schemaJson); } final class _RemoveConnection implements _RunnerEvent { diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 2b859e53..3c64d90f 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'package:logging/logging.dart'; +import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:web/web.dart'; @@ -71,6 +72,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required int requestId, required int retryDelayMs, required String implementationName, + required String schemaJson, String? syncParamsEncoded, }); @@ -79,6 +81,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external int get crudThrottleTimeMs; external int? get retryDelayMs; external String? get implementationName; + external String get schemaJson; external String? get syncParamsEncoded; } @@ -410,7 +413,7 @@ final class WorkerCommunicationChannel { } Future startSynchronization( - String databaseName, ResolvedSyncOptions options) async { + String databaseName, ResolvedSyncOptions options, Schema schema) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, @@ -420,6 +423,7 @@ final class WorkerCommunicationChannel { retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, implementationName: options.source.syncImplementation.name, + schemaJson: jsonEncode(schema), syncParamsEncoded: switch (options.source.params) { null => null, final params => jsonEncode(params), diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index b6432014..1819795d 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -48,7 +48,7 @@ void _declareTests(String name, SyncOptions options) { var credentialsCallbackCount = 0; Future Function(PowerSyncDatabase) uploadData = (db) async {}; - void createSyncClient() { + void createSyncClient({Schema? schema}) { final (client, server) = inMemoryServer(); server.mount(syncService.router.call); @@ -63,6 +63,7 @@ void _declareTests(String name, SyncOptions options) { ); }, uploadData: (db) => uploadData(db)), options: options, + customSchema: schema, ); addTearDown(() async { @@ -207,6 +208,103 @@ void _declareTests(String name, SyncOptions options) { // because the messages were received in quick succession. expect(commits, 1); }); + } else { + // raw tables are only supported by the rust sync client + test('raw tables', () async { + final schema = Schema(const [], rawTables: [ + RawTable( + name: 'lists', + put: PendingStatement( + sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)', + params: [ + PendingStatementValue.id(), + PendingStatementValue.column('name'), + ], + ), + delete: PendingStatement( + sql: 'DELETE FROM lists WHERE id = ?', + params: [ + PendingStatementValue.id(), + ], + ), + ), + ]); + + await database.execute( + 'CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + final query = StreamQueue( + database.watch('SELECT * FROM lists', throttle: Duration.zero)); + await expectLater(query, emits(isEmpty)); + + createSyncClient(schema: schema); + await waitForConnection(); + + syncService + ..addLine({ + 'checkpoint': Checkpoint( + lastOpId: '1', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', priority: 3, checksum: 0) + ], + ) + }) + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': [ + { + 'checksum': 0, + 'data': json.encode({'name': 'custom list'}), + 'op': 'PUT', + 'op_id': '1', + 'object_id': 'my_list', + 'object_type': 'lists' + } + ] + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '1'} + }); + + await expectLater( + query, + emits([ + {'id': 'my_list', 'name': 'custom list'} + ]), + ); + + syncService + ..addLine({ + 'checkpoint': Checkpoint( + lastOpId: '2', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', priority: 3, checksum: 0) + ], + ) + }) + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': [ + { + 'checksum': 0, + 'op': 'REMOVE', + 'op_id': '2', + 'object_id': 'my_list', + 'object_type': 'lists' + } + ] + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '2'} + }); + + await expectLater(query, emits(isEmpty)); + }); } group('partial sync', () { diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 5cfb8405..6a1a90aa 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,3 +1,5 @@ +import 'dart:convert'; + import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; @@ -151,9 +153,11 @@ extension MockSync on PowerSyncDatabase { PowerSyncBackendConnector connector, { Logger? logger, SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)), + Schema? customSchema, }) { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), + schemaJson: jsonEncode(customSchema ?? schema), client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), diff --git a/packages/powersync_core/test/watch_test.dart b/packages/powersync_core/test/watch_test.dart index 00e7ea49..28617e2e 100644 --- a/packages/powersync_core/test/watch_test.dart +++ b/packages/powersync_core/test/watch_test.dart @@ -53,7 +53,7 @@ void main() { 'INSERT INTO customers(id, name) VALUES (?, ?)', [id, 'a customer']); var done = false; - inserts() async { + Future inserts() async { while (!done) { await powersync.execute( 'INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', @@ -65,7 +65,7 @@ void main() { const numberOfQueries = 10; - inserts(); + final insertsFuture = inserts(); try { List times = []; final results = await stream.take(numberOfQueries).map((e) { @@ -100,6 +100,8 @@ void main() { } finally { done = true; } + + await insertsFuture; }); test('onChange', () async { @@ -111,7 +113,7 @@ void main() { const throttleDuration = Duration(milliseconds: baseTime); var done = false; - inserts() async { + Future inserts() async { while (!done) { await powersync.execute( 'INSERT INTO assets(id, make) VALUES (uuid(), ?)', ['test']); @@ -120,7 +122,7 @@ void main() { } } - inserts(); + final insertsFuture = inserts(); final stream = powersync.onChange({'assets', 'customers'}, throttle: throttleDuration).asyncMap((event) async { @@ -138,6 +140,7 @@ void main() { UpdateNotification.single('assets'), UpdateNotification.single('assets') ])); + await insertsFuture; }); test('emits update events with friendly names', () async {