Skip to content

Commit c20ac77

Browse files
committed
Initial API
1 parent 081649b commit c20ac77

File tree

3 files changed

+196
-15
lines changed

3 files changed

+196
-15
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
191191
}
192192
}
193193

194-
if (matches(currentStatus)) {
195-
return;
196-
}
197-
await for (final result in statusStream) {
198-
if (matches(result)) {
199-
break;
200-
}
201-
}
194+
return _connections.firstStatusMatching(matches);
202195
}
203196

204197
@protected
@@ -514,6 +507,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
514507
Future<void> refreshSchema() async {
515508
await database.refreshSchema();
516509
}
510+
511+
SyncStream syncStream(String name, [Map<String, Object?>? parameters]) {
512+
return _connections.syncStream(name, parameters);
513+
}
517514
}
518515

519516
Stream<UpdateNotification> powerSyncUpdateNotifications(

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 189 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,39 @@
11
import 'dart:async';
2+
import 'dart:convert';
23

34
import 'package:meta/meta.dart';
45
import 'package:powersync_core/powersync_core.dart';
56
import 'package:powersync_core/src/abort_controller.dart';
67
import 'package:powersync_core/src/database/active_instances.dart';
78
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
89
import 'package:powersync_core/src/sync/options.dart';
10+
import 'package:powersync_core/src/sync/stream.dart';
911

1012
@internal
1113
final class ConnectionManager {
1214
final PowerSyncDatabaseMixin db;
13-
final StreamController<SyncStatus> _statusController = StreamController();
15+
final ActiveDatabaseGroup _activeGroup;
1416

17+
final StreamController<SyncStatus> _statusController = StreamController();
1518
SyncStatus _currentStatus =
1619
const SyncStatus(connected: false, lastSyncedAt: null);
1720

1821
SyncStatus get currentStatus => _currentStatus;
1922
Stream<SyncStatus> get statusStream => _statusController.stream;
2023

21-
final ActiveDatabaseGroup _activeGroup;
22-
23-
ConnectionManager(this.db) : _activeGroup = db.group;
24-
2524
/// The abort controller for the current sync iteration.
2625
///
2726
/// null when disconnected, present when connecting or connected.
2827
///
2928
/// The controller must only be accessed from within a critical section of the
3029
/// sync mutex.
31-
@protected
3230
AbortController? _abortActiveSync;
3331

32+
/// Only to be called in the sync mutex.
33+
Future<void> Function()? _connectWithLastOptions;
34+
35+
ConnectionManager(this.db) : _activeGroup = db.group;
36+
3437
void checkNotConnected() {
3538
if (_abortActiveSync != null) {
3639
throw StateError('Cannot update schema while connected');
@@ -55,12 +58,39 @@ final class ConnectionManager {
5558
Future<void> disconnect() async {
5659
// Also wrap this in the sync mutex to ensure there's no race between us
5760
// connecting and disconnecting.
58-
await _activeGroup.syncConnectMutex.lock(_abortCurrentSync);
61+
await _activeGroup.syncConnectMutex.lock(() async {
62+
await _abortCurrentSync();
63+
_connectWithLastOptions = null;
64+
});
5965

6066
manuallyChangeSyncStatus(
6167
SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt));
6268
}
6369

70+
Future<void> firstStatusMatching(bool Function(SyncStatus) predicate) async {
71+
if (predicate(currentStatus)) {
72+
return;
73+
}
74+
await for (final result in statusStream) {
75+
if (predicate(result)) {
76+
break;
77+
}
78+
}
79+
}
80+
81+
Future<void> reconnect() async {
82+
// Also wrap this in the sync mutex to ensure there's no race between us
83+
// connecting and disconnecting.
84+
await _activeGroup.syncConnectMutex.lock(() async {
85+
if (_connectWithLastOptions case final activeSync?) {
86+
await _abortCurrentSync();
87+
assert(_abortActiveSync == null);
88+
89+
await activeSync();
90+
}
91+
});
92+
}
93+
6494
Future<void> connect({
6595
required PowerSyncBackendConnector connector,
6696
required ResolvedSyncOptions options,
@@ -112,6 +142,7 @@ final class ConnectionManager {
112142
// Disconnect a previous sync client, if one is active.
113143
await _abortCurrentSync();
114144
assert(_abortActiveSync == null);
145+
_connectWithLastOptions = connectWithSyncLock;
115146

116147
// Install the abort controller for this particular connect call, allowing
117148
// it to be disconnected.
@@ -150,7 +181,158 @@ final class ConnectionManager {
150181
}
151182
}
152183

184+
Future<void> _subscriptionsCommand(Object? command) async {
185+
await db.writeTransaction((tx) {
186+
return db.execute(
187+
'SELECT powersync_control(?, ?)',
188+
['subscriptions', json.encode(command)],
189+
);
190+
});
191+
192+
await reconnect();
193+
}
194+
195+
Future<void> subscribe({
196+
required String stream,
197+
required Object? parameters,
198+
Duration? ttl,
199+
BucketPriority? priority,
200+
}) async {
201+
await _subscriptionsCommand({
202+
'subscribe': {
203+
'stream': stream,
204+
'params': parameters,
205+
'ttl': ttl?.inSeconds,
206+
'priority': priority,
207+
},
208+
});
209+
}
210+
211+
Future<void> unsubscribe({
212+
required String stream,
213+
required Object? parameters,
214+
required bool immediate,
215+
}) async {
216+
await _subscriptionsCommand({
217+
'unsubscribe': {
218+
'stream': stream,
219+
'params': parameters,
220+
'immediate': immediate,
221+
},
222+
});
223+
}
224+
225+
Future<SyncStreamSubscription?> resolveCurrent(
226+
String name, Map<String, Object?>? parameters) async {
227+
final row = await db.getOptional(
228+
'SELECT stream_name, active, is_default, local_priority, local_params, expires_at, last_synced_at FROM ps_stream_subscriptions WHERE stream_name = ? AND local_params = ?',
229+
[name, json.encode(parameters)],
230+
);
231+
232+
if (row == null) {
233+
return null;
234+
}
235+
236+
return _SyncStreamSubscription(
237+
this,
238+
name: name,
239+
parameters:
240+
json.decode(row['local_params'] as String) as Map<String, Object?>?,
241+
active: row['active'] != 0,
242+
expiresAt: switch (row['expires_at']) {
243+
null => null,
244+
final expiresAt as int =>
245+
DateTime.fromMicrosecondsSinceEpoch(expiresAt * 1000),
246+
},
247+
hasSynced: row['has_synced'] != 0,
248+
lastSyncedAt: switch (row['last_synced_at']) {
249+
null => null,
250+
final lastSyncedAt as int =>
251+
DateTime.fromMicrosecondsSinceEpoch(lastSyncedAt * 1000),
252+
},
253+
);
254+
}
255+
256+
SyncStream syncStream(String name, Map<String, Object?>? parameters) {
257+
return _SyncStreamImplementation(this, name, parameters);
258+
}
259+
153260
void close() {
154261
_statusController.close();
155262
}
156263
}
264+
265+
final class _SyncStreamImplementation implements SyncStream {
266+
@override
267+
final String name;
268+
269+
@override
270+
final Map<String, Object?>? parameters;
271+
272+
final ConnectionManager _connections;
273+
274+
_SyncStreamImplementation(this._connections, this.name, this.parameters);
275+
276+
@override
277+
Future<SyncStreamSubscription?> get current {
278+
return _connections.resolveCurrent(name, parameters);
279+
}
280+
281+
@override
282+
Future<void> subscribe(
283+
{Duration? ttl,
284+
BucketPriority? priority,
285+
Map<String, Object?>? parameters}) async {
286+
await _connections.subscribe(
287+
stream: name,
288+
parameters: parameters,
289+
ttl: ttl,
290+
priority: priority,
291+
);
292+
}
293+
}
294+
295+
final class _SyncStreamSubscription implements SyncStreamSubscription {
296+
final ConnectionManager _connections;
297+
298+
@override
299+
final String name;
300+
@override
301+
final Map<String, Object?>? parameters;
302+
303+
@override
304+
final bool active;
305+
@override
306+
final DateTime? expiresAt;
307+
@override
308+
final bool hasSynced;
309+
@override
310+
final DateTime? lastSyncedAt;
311+
312+
_SyncStreamSubscription(
313+
this._connections, {
314+
required this.name,
315+
required this.parameters,
316+
required this.active,
317+
required this.expiresAt,
318+
required this.hasSynced,
319+
required this.lastSyncedAt,
320+
});
321+
322+
@override
323+
Future<void> unsubscribe({bool immediately = false}) async {
324+
await _connections.unsubscribe(
325+
stream: name, parameters: parameters, immediate: immediately);
326+
}
327+
328+
@override
329+
Future<void> waitForFirstSync() async {
330+
if (hasSynced) {
331+
return;
332+
}
333+
return _connections.firstStatusMatching((status) {
334+
final currentProgress = status.statusFor(this);
335+
return currentProgress?.subscription.hasSynced ?? false;
336+
});
337+
}
338+
}

packages/powersync_core/lib/src/sync/stream.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ abstract interface class SyncStream extends SyncStreamDescription {
2121
BucketPriority? priority,
2222
Map<String, Object?>? parameters,
2323
});
24+
25+
Future<SyncStreamSubscription?> get current;
2426
}
2527

2628
abstract interface class SyncStreamSubscription

0 commit comments

Comments
 (0)