Skip to content

Commit a706aa0

Browse files
authored
Merge pull request #283 from powersync-ja/rust-core-sync
Option to use core exension for sync logic
2 parents 6547d44 + 87735fd commit a706aa0

16 files changed

+482
-147
lines changed

packages/powersync_core/lib/src/database/core_version.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ extension type const PowerSyncCoreVersion((int, int, int) _tuple) {
6060
// Note: When updating this, also update the download URL in
6161
// scripts/init_powersync_core_binary.dart and the version ref in
6262
// packages/sqlite3_wasm_build/build.sh
63-
static const minimum = PowerSyncCoreVersion((0, 3, 14));
63+
static const minimum = PowerSyncCoreVersion((0, 4, 0));
6464

6565
/// The first version of the core extensions that this version of the Dart
6666
/// SDK doesn't support.

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl
120120
@internal
121121
Future<void> connectInternal({
122122
required PowerSyncBackendConnector connector,
123-
required SyncOptions options,
123+
required ResolvedSyncOptions options,
124124
required AbortController abort,
125125
required Zone asyncWorkZone,
126126
}) async {
@@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl
135135
SendPort? initPort;
136136
final hasInitPort = Completer<void>();
137137
final receivedIsolateExit = Completer<void>();
138-
final resolved = ResolvedSyncOptions(options);
139138

140139
Future<void> waitForShutdown() async {
141140
// Only complete the abortion signal after the isolate shuts down. This
@@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl
183182
final port = initPort = data[1] as SendPort;
184183
hasInitPort.complete();
185184
var crudStream = database
186-
.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
185+
.onChange(['ps_crud'], throttle: options.crudThrottleTime);
187186
crudUpdateSubscription = crudStream.listen((event) {
188187
port.send(['update']);
189188
});
@@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl
245244
_PowerSyncDatabaseIsolateArgs(
246245
receiveMessages.sendPort,
247246
dbRef,
248-
resolved,
247+
options,
249248
crudMutex.shared,
250249
syncMutex.shared,
251250
),

packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl
115115
required PowerSyncBackendConnector connector,
116116
required AbortController abort,
117117
required Zone asyncWorkZone,
118-
required SyncOptions options,
118+
required ResolvedSyncOptions options,
119119
}) {
120120
throw UnimplementedError();
121121
}

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
289289
// the lock for the connection.
290290
await initialize();
291291

292-
final resolvedOptions = SyncOptions(
293-
crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime,
292+
final resolvedOptions = ResolvedSyncOptions.resolve(
293+
options,
294+
crudThrottleTime: crudThrottleTime,
294295
// ignore: deprecated_member_use_from_same_package
295-
retryDelay: options?.retryDelay ?? retryDelay,
296-
params: options?.params ?? params,
296+
retryDelay: retryDelay,
297+
params: params,
297298
);
298299

299300
// ignore: deprecated_member_use_from_same_package
@@ -362,7 +363,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
362363
@internal
363364
Future<void> connectInternal({
364365
required PowerSyncBackendConnector connector,
365-
required SyncOptions options,
366+
required ResolvedSyncOptions options,
366367
required AbortController abort,
367368
required Zone asyncWorkZone,
368369
});

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl
118118
required PowerSyncBackendConnector connector,
119119
required AbortController abort,
120120
required Zone asyncWorkZone,
121-
required SyncOptions options,
121+
required ResolvedSyncOptions options,
122122
}) async {
123-
final resolved = ResolvedSyncOptions(options);
124-
125123
final storage = BucketStorage(database);
126124
StreamingSync sync;
127125
// Try using a shared worker for the synchronization implementation to avoid
@@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl
130128
sync = await SyncWorkerHandle.start(
131129
database: this,
132130
connector: connector,
133-
options: options,
131+
options: options.source,
134132
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
135133
);
136134
} catch (e) {
@@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl
139137
e,
140138
);
141139
final crudStream =
142-
database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime);
140+
database.onChange(['ps_crud'], throttle: options.crudThrottleTime);
143141

144142
sync = StreamingSyncImplementation(
145143
adapter: storage,
146144
connector: InternalConnector.wrap(connector, this),
147145
crudUpdateTriggerStream: crudStream,
148-
options: resolved,
146+
options: options,
149147
client: BrowserClient(),
150148
// Only allows 1 sync implementation to run at a time per database
151149
// This should be global (across tabs) when using Navigator locks.

packages/powersync_core/lib/src/sync/bucket_storage.dart

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ typedef LocalOperationCounters = ({int atLast, int sinceLast});
2020
class BucketStorage {
2121
final SqliteConnection _internalDb;
2222
bool _hasCompletedSync = false;
23-
bool _pendingBucketDeletes = false;
24-
int _compactCounter = compactOperationInterval;
2523

2624
BucketStorage(SqliteConnection db) : _internalDb = db {
2725
_init();
@@ -67,11 +65,8 @@ class BucketStorage {
6765
}
6866

6967
Future<void> saveSyncData(SyncDataBatch batch) async {
70-
var count = 0;
71-
7268
await writeTransaction((tx) async {
7369
for (var b in batch.buckets) {
74-
count += b.data.length;
7570
await _updateBucket2(
7671
tx,
7772
jsonEncode({
@@ -82,7 +77,6 @@ class BucketStorage {
8277
// We get major initial sync performance improvements with IndexedDB by
8378
// not flushing here.
8479
}, flush: false);
85-
_compactCounter += count;
8680
}
8781

8882
Future<void> _updateBucket2(SqliteWriteContext tx, String json) async {
@@ -103,8 +97,6 @@ class BucketStorage {
10397
['delete_bucket', bucket]);
10498
// No need to flush - not directly visible to the user
10599
}, flush: false);
106-
107-
_pendingBucketDeletes = true;
108100
}
109101

110102
Future<bool> hasCompletedSync() async {
@@ -154,8 +146,6 @@ class BucketStorage {
154146
return SyncLocalDatabaseResult(ready: false);
155147
}
156148

157-
await forceCompact();
158-
159149
return SyncLocalDatabaseResult(ready: true);
160150
}
161151

@@ -226,52 +216,6 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
226216
}
227217
}
228218

229-
Future<void> forceCompact() async {
230-
_compactCounter = compactOperationInterval;
231-
_pendingBucketDeletes = true;
232-
233-
await autoCompact();
234-
}
235-
236-
Future<void> autoCompact() async {
237-
// This is a no-op since powersync-sqlite-core v0.3.0
238-
239-
// 1. Delete buckets
240-
await _deletePendingBuckets();
241-
242-
// 2. Clear REMOVE operations, only keeping PUT ones
243-
await _clearRemoveOps();
244-
}
245-
246-
Future<void> _deletePendingBuckets() async {
247-
// This is a no-op since powersync-sqlite-core v0.3.0
248-
if (_pendingBucketDeletes) {
249-
// Executed once after start-up, and again when there are pending deletes.
250-
await writeTransaction((tx) async {
251-
await tx.execute(
252-
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
253-
['delete_pending_buckets', '']);
254-
// No need to flush - not directly visible to the user
255-
}, flush: false);
256-
_pendingBucketDeletes = false;
257-
}
258-
}
259-
260-
Future<void> _clearRemoveOps() async {
261-
if (_compactCounter < compactOperationInterval) {
262-
return;
263-
}
264-
265-
// This is a no-op since powersync-sqlite-core v0.3.0
266-
await writeTransaction((tx) async {
267-
await tx.execute(
268-
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
269-
['clear_remove_ops', '']);
270-
// No need to flush - not directly visible to the user
271-
}, flush: false);
272-
_compactCounter = 0;
273-
}
274-
275219
void setTargetCheckpoint(Checkpoint checkpoint) {
276220
// No-op for now
277221
}
@@ -365,6 +309,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name
365309
});
366310
}
367311

312+
Future<String> control(String op, [Object? payload]) async {
313+
return await writeTransaction(
314+
(tx) async {
315+
final [row] =
316+
await tx.execute('SELECT powersync_control(?, ?)', [op, payload]);
317+
return row.columnAt(0) as String;
318+
},
319+
// We flush when powersync_control yields an instruction to do so.
320+
flush: false,
321+
);
322+
}
323+
324+
Future<void> flushFileSystem() async {
325+
// Noop outside of web.
326+
}
327+
368328
/// Note: The asynchronous nature of this is due to this needing a global
369329
/// lock. The actual database operations are still synchronous, and it
370330
/// is assumed that multiple functions on this instance won't be called
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import 'sync_status.dart';
2+
3+
/// An internal instruction emitted by the sync client in the core extension in
4+
/// response to the Dart SDK passing sync data into the extension.
5+
sealed class Instruction {
6+
factory Instruction.fromJson(Map<String, Object?> json) {
7+
return switch (json) {
8+
{'LogLine': final logLine} =>
9+
LogLine.fromJson(logLine as Map<String, Object?>),
10+
{'UpdateSyncStatus': final updateStatus} =>
11+
UpdateSyncStatus.fromJson(updateStatus as Map<String, Object?>),
12+
{'EstablishSyncStream': final establish} =>
13+
EstablishSyncStream.fromJson(establish as Map<String, Object?>),
14+
{'FetchCredentials': final creds} =>
15+
FetchCredentials.fromJson(creds as Map<String, Object?>),
16+
{'CloseSyncStream': _} => const CloseSyncStream(),
17+
{'FlushFileSystem': _} => const FlushFileSystem(),
18+
{'DidCompleteSync': _} => const DidCompleteSync(),
19+
_ => UnknownSyncInstruction(json)
20+
};
21+
}
22+
}
23+
24+
final class LogLine implements Instruction {
25+
final String severity;
26+
final String line;
27+
28+
LogLine({required this.severity, required this.line});
29+
30+
factory LogLine.fromJson(Map<String, Object?> json) {
31+
return LogLine(
32+
severity: json['severity'] as String,
33+
line: json['line'] as String,
34+
);
35+
}
36+
}
37+
38+
final class EstablishSyncStream implements Instruction {
39+
final Map<String, Object?> request;
40+
41+
EstablishSyncStream(this.request);
42+
43+
factory EstablishSyncStream.fromJson(Map<String, Object?> json) {
44+
return EstablishSyncStream(json['request'] as Map<String, Object?>);
45+
}
46+
}
47+
48+
final class UpdateSyncStatus implements Instruction {
49+
final CoreSyncStatus status;
50+
51+
UpdateSyncStatus({required this.status});
52+
53+
factory UpdateSyncStatus.fromJson(Map<String, Object?> json) {
54+
return UpdateSyncStatus(
55+
status:
56+
CoreSyncStatus.fromJson(json['status'] as Map<String, Object?>));
57+
}
58+
}
59+
60+
final class CoreSyncStatus {
61+
final bool connected;
62+
final bool connecting;
63+
final List<SyncPriorityStatus> priorityStatus;
64+
final DownloadProgress? downloading;
65+
66+
CoreSyncStatus({
67+
required this.connected,
68+
required this.connecting,
69+
required this.priorityStatus,
70+
required this.downloading,
71+
});
72+
73+
factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
74+
return CoreSyncStatus(
75+
connected: json['connected'] as bool,
76+
connecting: json['connecting'] as bool,
77+
priorityStatus: [
78+
for (final entry in json['priority_status'] as List)
79+
_priorityStatusFromJson(entry as Map<String, Object?>)
80+
],
81+
downloading: switch (json['downloading']) {
82+
null => null,
83+
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
84+
},
85+
);
86+
}
87+
88+
static SyncPriorityStatus _priorityStatusFromJson(Map<String, Object?> json) {
89+
return (
90+
priority: BucketPriority(json['priority'] as int),
91+
hasSynced: json['has_synced'] as bool?,
92+
lastSyncedAt: switch (json['last_synced_at']) {
93+
null => null,
94+
final lastSyncedAt as int =>
95+
DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000),
96+
},
97+
);
98+
}
99+
}
100+
101+
final class DownloadProgress {
102+
final Map<String, BucketProgress> buckets;
103+
104+
DownloadProgress(this.buckets);
105+
106+
factory DownloadProgress.fromJson(Map<String, Object?> line) {
107+
final rawBuckets = line['buckets'] as Map<String, Object?>;
108+
109+
return DownloadProgress(rawBuckets.map((k, v) {
110+
return MapEntry(
111+
k,
112+
_bucketProgressFromJson(v as Map<String, Object?>),
113+
);
114+
}));
115+
}
116+
117+
static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {
118+
return (
119+
priority: BucketPriority(json['priority'] as int),
120+
atLast: json['at_last'] as int,
121+
sinceLast: json['since_last'] as int,
122+
targetCount: json['target_count'] as int,
123+
);
124+
}
125+
}
126+
127+
final class FetchCredentials implements Instruction {
128+
final bool didExpire;
129+
130+
FetchCredentials(this.didExpire);
131+
132+
factory FetchCredentials.fromJson(Map<String, Object?> line) {
133+
return FetchCredentials(line['did_expire'] as bool);
134+
}
135+
}
136+
137+
final class CloseSyncStream implements Instruction {
138+
const CloseSyncStream();
139+
}
140+
141+
final class FlushFileSystem implements Instruction {
142+
const FlushFileSystem();
143+
}
144+
145+
final class DidCompleteSync implements Instruction {
146+
const DidCompleteSync();
147+
}
148+
149+
final class UnknownSyncInstruction implements Instruction {
150+
final Map<String, Object?> source;
151+
152+
UnknownSyncInstruction(this.source);
153+
}

0 commit comments

Comments
 (0)