Skip to content

Option to use core exension for sync logic #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/database/core_version.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ extension type const PowerSyncCoreVersion((int, int, int) _tuple) {
// Note: When updating this, also update the download URL in
// scripts/init_powersync_core_binary.dart and the version ref in
// packages/sqlite3_wasm_build/build.sh
static const minimum = PowerSyncCoreVersion((0, 3, 14));
static const minimum = PowerSyncCoreVersion((0, 4, 0));

/// The first version of the core extensions that this version of the Dart
/// SDK doesn't support.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required SyncOptions options,
required ResolvedSyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
}) async {
Expand All @@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl
SendPort? initPort;
final hasInitPort = Completer<void>();
final receivedIsolateExit = Completer<void>();
final resolved = ResolvedSyncOptions(options);

Future<void> waitForShutdown() async {
// Only complete the abortion signal after the isolate shuts down. This
Expand Down Expand Up @@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl
final port = initPort = data[1] as SendPort;
hasInitPort.complete();
var crudStream = database
.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
.onChange(['ps_crud'], throttle: options.crudThrottleTime);
crudUpdateSubscription = crudStream.listen((event) {
port.send(['update']);
});
Expand Down Expand Up @@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl
_PowerSyncDatabaseIsolateArgs(
receiveMessages.sendPort,
dbRef,
resolved,
options,
crudMutex.shared,
syncMutex.shared,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required AbortController abort,
required Zone asyncWorkZone,
required SyncOptions options,
required ResolvedSyncOptions options,
}) {
throw UnimplementedError();
}
Expand Down
11 changes: 6 additions & 5 deletions packages/powersync_core/lib/src/database/powersync_db_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
// the lock for the connection.
await initialize();

final resolvedOptions = SyncOptions(
crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime,
final resolvedOptions = ResolvedSyncOptions.resolve(
options,
crudThrottleTime: crudThrottleTime,
// ignore: deprecated_member_use_from_same_package
retryDelay: options?.retryDelay ?? retryDelay,
params: options?.params ?? params,
retryDelay: retryDelay,
params: params,
);

// ignore: deprecated_member_use_from_same_package
Expand Down Expand Up @@ -362,7 +363,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
@internal
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required SyncOptions options,
required ResolvedSyncOptions options,
required AbortController abort,
required Zone asyncWorkZone,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl
required PowerSyncBackendConnector connector,
required AbortController abort,
required Zone asyncWorkZone,
required SyncOptions options,
required ResolvedSyncOptions options,
}) async {
final resolved = ResolvedSyncOptions(options);

final storage = BucketStorage(database);
StreamingSync sync;
// Try using a shared worker for the synchronization implementation to avoid
Expand All @@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl
sync = await SyncWorkerHandle.start(
database: this,
connector: connector,
options: options,
options: options.source,
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
);
} catch (e) {
Expand All @@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl
e,
);
final crudStream =
database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
database.onChange(['ps_crud'], throttle: options.crudThrottleTime);

sync = StreamingSyncImplementation(
adapter: storage,
connector: InternalConnector.wrap(connector, this),
crudUpdateTriggerStream: crudStream,
options: resolved,
options: options,
client: BrowserClient(),
// Only allows 1 sync implementation to run at a time per database
// This should be global (across tabs) when using Navigator locks.
Expand Down
72 changes: 16 additions & 56 deletions packages/powersync_core/lib/src/sync/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ typedef LocalOperationCounters = ({int atLast, int sinceLast});
class BucketStorage {
final SqliteConnection _internalDb;
bool _hasCompletedSync = false;
bool _pendingBucketDeletes = false;
int _compactCounter = compactOperationInterval;

BucketStorage(SqliteConnection db) : _internalDb = db {
_init();
Expand Down Expand Up @@ -67,11 +65,8 @@ class BucketStorage {
}

Future<void> saveSyncData(SyncDataBatch batch) async {
var count = 0;

await writeTransaction((tx) async {
for (var b in batch.buckets) {
count += b.data.length;
await _updateBucket2(
tx,
jsonEncode({
Expand All @@ -82,7 +77,6 @@ class BucketStorage {
// We get major initial sync performance improvements with IndexedDB by
// not flushing here.
}, flush: false);
_compactCounter += count;
}

Future<void> _updateBucket2(SqliteWriteContext tx, String json) async {
Expand All @@ -103,8 +97,6 @@ class BucketStorage {
['delete_bucket', bucket]);
// No need to flush - not directly visible to the user
}, flush: false);

_pendingBucketDeletes = true;
}

Future<bool> hasCompletedSync() async {
Expand Down Expand Up @@ -154,8 +146,6 @@ class BucketStorage {
return SyncLocalDatabaseResult(ready: false);
}

await forceCompact();

return SyncLocalDatabaseResult(ready: true);
}

Expand Down Expand Up @@ -226,52 +216,6 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
}
}

Future<void> forceCompact() async {
_compactCounter = compactOperationInterval;
_pendingBucketDeletes = true;

await autoCompact();
}

Future<void> autoCompact() async {
// This is a no-op since powersync-sqlite-core v0.3.0

// 1. Delete buckets
await _deletePendingBuckets();

// 2. Clear REMOVE operations, only keeping PUT ones
await _clearRemoveOps();
}

Future<void> _deletePendingBuckets() async {
// This is a no-op since powersync-sqlite-core v0.3.0
if (_pendingBucketDeletes) {
// Executed once after start-up, and again when there are pending deletes.
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['delete_pending_buckets', '']);
// No need to flush - not directly visible to the user
}, flush: false);
_pendingBucketDeletes = false;
}
}

Future<void> _clearRemoveOps() async {
if (_compactCounter < compactOperationInterval) {
return;
}

// This is a no-op since powersync-sqlite-core v0.3.0
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['clear_remove_ops', '']);
// No need to flush - not directly visible to the user
}, flush: false);
_compactCounter = 0;
}

void setTargetCheckpoint(Checkpoint checkpoint) {
// No-op for now
}
Expand Down Expand Up @@ -365,6 +309,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
});
}

Future<String> control(String op, [Object? payload]) async {
return await writeTransaction(
(tx) async {
final [row] =
await tx.execute('SELECT powersync_control(?, ?)', [op, payload]);
return row.columnAt(0) as String;
},
// We flush when powersync_control yields an instruction to do so.
flush: false,
);
}

Future<void> flushFileSystem() async {
// Noop outside of web.
}

/// Note: The asynchronous nature of this is due to this needing a global
/// lock. The actual database operations are still synchronous, and it
/// is assumed that multiple functions on this instance won't be called
Expand Down
153 changes: 153 additions & 0 deletions packages/powersync_core/lib/src/sync/instruction.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import 'sync_status.dart';

/// An internal instruction emitted by the sync client in the core extension in
/// response to the Dart SDK passing sync data into the extension.
sealed class Instruction {
factory Instruction.fromJson(Map<String, Object?> json) {
return switch (json) {
{'LogLine': final logLine} =>
LogLine.fromJson(logLine as Map<String, Object?>),
{'UpdateSyncStatus': final updateStatus} =>
UpdateSyncStatus.fromJson(updateStatus as Map<String, Object?>),
{'EstablishSyncStream': final establish} =>
EstablishSyncStream.fromJson(establish as Map<String, Object?>),
{'FetchCredentials': final creds} =>
FetchCredentials.fromJson(creds as Map<String, Object?>),
{'CloseSyncStream': _} => const CloseSyncStream(),
{'FlushFileSystem': _} => const FlushFileSystem(),
{'DidCompleteSync': _} => const DidCompleteSync(),
_ => UnknownSyncInstruction(json)
};
}
}

final class LogLine implements Instruction {
final String severity;
final String line;

LogLine({required this.severity, required this.line});

factory LogLine.fromJson(Map<String, Object?> json) {
return LogLine(
severity: json['severity'] as String,
line: json['line'] as String,
);
}
}

final class EstablishSyncStream implements Instruction {
final Map<String, Object?> request;

EstablishSyncStream(this.request);

factory EstablishSyncStream.fromJson(Map<String, Object?> json) {
return EstablishSyncStream(json['request'] as Map<String, Object?>);
}
}

final class UpdateSyncStatus implements Instruction {
final CoreSyncStatus status;

UpdateSyncStatus({required this.status});

factory UpdateSyncStatus.fromJson(Map<String, Object?> json) {
return UpdateSyncStatus(
status:
CoreSyncStatus.fromJson(json['status'] as Map<String, Object?>));
}
}

final class CoreSyncStatus {
final bool connected;
final bool connecting;
final List<SyncPriorityStatus> priorityStatus;
final DownloadProgress? downloading;

CoreSyncStatus({
required this.connected,
required this.connecting,
required this.priorityStatus,
required this.downloading,
});

factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
return CoreSyncStatus(
connected: json['connected'] as bool,
connecting: json['connecting'] as bool,
priorityStatus: [
for (final entry in json['priority_status'] as List)
_priorityStatusFromJson(entry as Map<String, Object?>)
],
downloading: switch (json['downloading']) {
null => null,
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
},
);
}

static SyncPriorityStatus _priorityStatusFromJson(Map<String, Object?> json) {
return (
priority: BucketPriority(json['priority'] as int),
hasSynced: json['has_synced'] as bool?,
lastSyncedAt: switch (json['last_synced_at']) {
null => null,
final lastSyncedAt as int =>
DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000),
},
);
}
}

final class DownloadProgress {
final Map<String, BucketProgress> buckets;

DownloadProgress(this.buckets);

factory DownloadProgress.fromJson(Map<String, Object?> line) {
final rawBuckets = line['buckets'] as Map<String, Object?>;

return DownloadProgress(rawBuckets.map((k, v) {
return MapEntry(
k,
_bucketProgressFromJson(v as Map<String, Object?>),
);
}));
}

static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {
return (
priority: BucketPriority(json['priority'] as int),
atLast: json['at_last'] as int,
sinceLast: json['since_last'] as int,
targetCount: json['target_count'] as int,
);
}
}

final class FetchCredentials implements Instruction {
final bool didExpire;

FetchCredentials(this.didExpire);

factory FetchCredentials.fromJson(Map<String, Object?> line) {
return FetchCredentials(line['did_expire'] as bool);
}
}

final class CloseSyncStream implements Instruction {
const CloseSyncStream();
}

final class FlushFileSystem implements Instruction {
const FlushFileSystem();
}

final class DidCompleteSync implements Instruction {
const DidCompleteSync();
}

final class UnknownSyncInstruction implements Instruction {
final Map<String, Object?> source;

UnknownSyncInstruction(this.source);
}
Loading
Loading