Skip to content

Commit 081649b

Browse files
committed
Refactor connection logic into separate class
1 parent b9b5d18 commit 081649b

File tree

2 files changed

+168
-126
lines changed

2 files changed

+168
-126
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 12 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import 'package:powersync_core/src/powersync_update_notification.dart';
1313
import 'package:powersync_core/src/schema.dart';
1414
import 'package:powersync_core/src/schema_logic.dart';
1515
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
16+
import 'package:powersync_core/src/sync/connection_manager.dart';
1617
import 'package:powersync_core/src/sync/options.dart';
1718
import 'package:powersync_core/src/sync/sync_status.dart';
1819

@@ -43,16 +44,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
4344
@Deprecated("This field is unused, pass params to connect() instead")
4445
Map<String, dynamic>? clientParams;
4546

47+
late final ConnectionManager _connections;
48+
4649
/// Current connection status.
47-
SyncStatus currentStatus =
48-
const SyncStatus(connected: false, lastSyncedAt: null);
50+
SyncStatus get currentStatus => _connections.currentStatus;
4951

5052
/// Use this stream to subscribe to connection status updates.
51-
late final Stream<SyncStatus> statusStream;
52-
53-
@protected
54-
StreamController<SyncStatus> statusStreamController =
55-
StreamController<SyncStatus>.broadcast();
53+
Stream<SyncStatus> get statusStream => _connections.statusStream;
5654

5755
late final ActiveDatabaseGroup _activeGroup;
5856

@@ -82,15 +80,6 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
8280
@protected
8381
Future<void> get isInitialized;
8482

85-
/// The abort controller for the current sync iteration.
86-
///
87-
/// null when disconnected, present when connecting or connected.
88-
///
89-
/// The controller must only be accessed from within a critical section of the
90-
/// sync mutex.
91-
@protected
92-
AbortController? _abortActiveSync;
93-
9483
@protected
9584
Future<void> baseInit() async {
9685
String identifier = 'memory';
@@ -108,8 +97,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
10897
'instantiation logic if this is not intentional',
10998
);
11099
}
111-
112-
statusStream = statusStreamController.stream;
100+
_connections = ConnectionManager(this);
113101
updates = powerSyncUpdateNotifications(database.updates);
114102

115103
await database.initialize();
@@ -216,33 +204,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
216204
@protected
217205
@visibleForTesting
218206
void setStatus(SyncStatus status) {
219-
if (status != currentStatus) {
220-
final newStatus = SyncStatus(
221-
connected: status.connected,
222-
downloading: status.downloading,
223-
uploading: status.uploading,
224-
connecting: status.connecting,
225-
uploadError: status.uploadError,
226-
downloadError: status.downloadError,
227-
priorityStatusEntries: status.priorityStatusEntries,
228-
downloadProgress: status.downloadProgress,
229-
// Note that currently the streaming sync implementation will never set
230-
// hasSynced. lastSyncedAt implies that syncing has completed at some
231-
// point (hasSynced = true).
232-
// The previous values of hasSynced should be preserved here.
233-
lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt,
234-
hasSynced: status.lastSyncedAt != null
235-
? true
236-
: status.hasSynced ?? currentStatus.hasSynced,
237-
);
238-
239-
// If the absence of hasSynced was the only difference, the new states
240-
// would be equal and don't require an event. So, check again.
241-
if (newStatus != currentStatus) {
242-
currentStatus = newStatus;
243-
statusStreamController.add(currentStatus);
244-
}
245-
}
207+
_connections.manuallyChangeSyncStatus(status);
246208
}
247209

248210
@override
@@ -269,7 +231,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
269231

270232
// If there are paused subscriptionso n the status stream, don't delay
271233
// closing the database because of that.
272-
unawaited(statusStreamController.close());
234+
_connections.close();
273235
await _activeGroup.close();
274236
}
275237
}
@@ -303,60 +265,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
303265
params: params,
304266
);
305267

306-
// ignore: deprecated_member_use_from_same_package
307-
clientParams = params;
308-
var thisConnectAborter = AbortController();
309-
final zone = Zone.current;
310-
311-
late void Function() retryHandler;
312-
313-
Future<void> connectWithSyncLock() async {
314-
// Ensure there has not been a subsequent connect() call installing a new
315-
// sync client.
316-
assert(identical(_abortActiveSync, thisConnectAborter));
317-
assert(!thisConnectAborter.aborted);
318-
319-
await connectInternal(
320-
connector: connector,
321-
options: resolvedOptions,
322-
abort: thisConnectAborter,
323-
// Run follow-up async tasks in the parent zone, a new one is introduced
324-
// while we hold the lock (and async tasks won't hold the sync lock).
325-
asyncWorkZone: zone,
326-
);
327-
328-
thisConnectAborter.onCompletion.whenComplete(retryHandler);
329-
}
330-
331-
// If the sync encounters a failure without being aborted, retry
332-
retryHandler = Zone.current.bindCallback(() async {
333-
_activeGroup.syncConnectMutex.lock(() async {
334-
// Is this still supposed to be active? (abort is only called within
335-
// mutex)
336-
if (!thisConnectAborter.aborted) {
337-
// We only change _abortActiveSync after disconnecting, which resets
338-
// the abort controller.
339-
assert(identical(_abortActiveSync, thisConnectAborter));
340-
341-
// We need a new abort controller for this attempt
342-
_abortActiveSync = thisConnectAborter = AbortController();
343-
344-
logger.warning('Sync client failed, retrying...');
345-
await connectWithSyncLock();
346-
}
347-
});
348-
});
349-
350-
await _activeGroup.syncConnectMutex.lock(() async {
351-
// Disconnect a previous sync client, if one is active.
352-
await _abortCurrentSync();
353-
assert(_abortActiveSync == null);
354-
355-
// Install the abort controller for this particular connect call, allowing
356-
// it to be disconnected.
357-
_abortActiveSync = thisConnectAborter;
358-
await connectWithSyncLock();
359-
});
268+
await _connections.connect(connector: connector, options: resolvedOptions);
360269
}
361270

362271
/// Internal method to establish a sync client connection.
@@ -378,27 +287,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
378287
///
379288
/// Use [connect] to connect again.
380289
Future<void> disconnect() async {
381-
// Also wrap this in the sync mutex to ensure there's no race between us
382-
// connecting and disconnecting.
383-
await _activeGroup.syncConnectMutex.lock(_abortCurrentSync);
384-
385-
setStatus(
386-
SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt));
387-
}
388-
389-
Future<void> _abortCurrentSync() async {
390-
if (_abortActiveSync case final disconnector?) {
391-
/// Checking `disconnecter.aborted` prevents race conditions
392-
/// where multiple calls to `disconnect` can attempt to abort
393-
/// the controller more than once before it has finished aborting.
394-
if (disconnector.aborted == false) {
395-
await disconnector.abort();
396-
_abortActiveSync = null;
397-
} else {
398-
/// Wait for the abort to complete. Continue updating the sync status after completed
399-
await disconnector.onCompletion;
400-
}
401-
}
290+
await _connections.disconnect();
402291
}
403292

404293
/// Disconnect and clear the database.
@@ -416,8 +305,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
416305
await tx.execute('select powersync_clear(?)', [clearLocal ? 1 : 0]);
417306
});
418307
// The data has been deleted - reset these
419-
currentStatus = SyncStatus(lastSyncedAt: null, hasSynced: false);
420-
statusStreamController.add(currentStatus);
308+
setStatus(SyncStatus(lastSyncedAt: null, hasSynced: false));
421309
}
422310

423311
@Deprecated('Use [disconnectAndClear] instead.')
@@ -439,9 +327,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
439327
schema.validate();
440328

441329
await _activeGroup.syncConnectMutex.lock(() async {
442-
if (_abortActiveSync != null) {
443-
throw AssertionError('Cannot update schema while connected');
444-
}
330+
_connections.checkNotConnected();
445331

446332
this.schema = schema;
447333
await database.writeLock((tx) => schema_logic.updateSchema(tx, schema));
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import 'dart:async';
2+
3+
import 'package:meta/meta.dart';
4+
import 'package:powersync_core/powersync_core.dart';
5+
import 'package:powersync_core/src/abort_controller.dart';
6+
import 'package:powersync_core/src/database/active_instances.dart';
7+
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
8+
import 'package:powersync_core/src/sync/options.dart';
9+
10+
@internal
11+
final class ConnectionManager {
12+
final PowerSyncDatabaseMixin db;
13+
final StreamController<SyncStatus> _statusController = StreamController();
14+
15+
SyncStatus _currentStatus =
16+
const SyncStatus(connected: false, lastSyncedAt: null);
17+
18+
SyncStatus get currentStatus => _currentStatus;
19+
Stream<SyncStatus> get statusStream => _statusController.stream;
20+
21+
final ActiveDatabaseGroup _activeGroup;
22+
23+
ConnectionManager(this.db) : _activeGroup = db.group;
24+
25+
/// The abort controller for the current sync iteration.
26+
///
27+
/// null when disconnected, present when connecting or connected.
28+
///
29+
/// The controller must only be accessed from within a critical section of the
30+
/// sync mutex.
31+
@protected
32+
AbortController? _abortActiveSync;
33+
34+
void checkNotConnected() {
35+
if (_abortActiveSync != null) {
36+
throw StateError('Cannot update schema while connected');
37+
}
38+
}
39+
40+
Future<void> _abortCurrentSync() async {
41+
if (_abortActiveSync case final disconnector?) {
42+
/// Checking `disconnecter.aborted` prevents race conditions
43+
/// where multiple calls to `disconnect` can attempt to abort
44+
/// the controller more than once before it has finished aborting.
45+
if (disconnector.aborted == false) {
46+
await disconnector.abort();
47+
_abortActiveSync = null;
48+
} else {
49+
/// Wait for the abort to complete. Continue updating the sync status after completed
50+
await disconnector.onCompletion;
51+
}
52+
}
53+
}
54+
55+
Future<void> disconnect() async {
56+
// Also wrap this in the sync mutex to ensure there's no race between us
57+
// connecting and disconnecting.
58+
await _activeGroup.syncConnectMutex.lock(_abortCurrentSync);
59+
60+
manuallyChangeSyncStatus(
61+
SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt));
62+
}
63+
64+
Future<void> connect({
65+
required PowerSyncBackendConnector connector,
66+
required ResolvedSyncOptions options,
67+
}) async {
68+
var thisConnectAborter = AbortController();
69+
final zone = Zone.current;
70+
71+
late void Function() retryHandler;
72+
73+
Future<void> connectWithSyncLock() async {
74+
// Ensure there has not been a subsequent connect() call installing a new
75+
// sync client.
76+
assert(identical(_abortActiveSync, thisConnectAborter));
77+
assert(!thisConnectAborter.aborted);
78+
79+
// ignore: invalid_use_of_protected_member
80+
await db.connectInternal(
81+
connector: connector,
82+
options: options,
83+
abort: thisConnectAborter,
84+
// Run follow-up async tasks in the parent zone, a new one is introduced
85+
// while we hold the lock (and async tasks won't hold the sync lock).
86+
asyncWorkZone: zone,
87+
);
88+
89+
thisConnectAborter.onCompletion.whenComplete(retryHandler);
90+
}
91+
92+
// If the sync encounters a failure without being aborted, retry
93+
retryHandler = Zone.current.bindCallback(() async {
94+
_activeGroup.syncConnectMutex.lock(() async {
95+
// Is this still supposed to be active? (abort is only called within
96+
// mutex)
97+
if (!thisConnectAborter.aborted) {
98+
// We only change _abortActiveSync after disconnecting, which resets
99+
// the abort controller.
100+
assert(identical(_abortActiveSync, thisConnectAborter));
101+
102+
// We need a new abort controller for this attempt
103+
_abortActiveSync = thisConnectAborter = AbortController();
104+
105+
db.logger.warning('Sync client failed, retrying...');
106+
await connectWithSyncLock();
107+
}
108+
});
109+
});
110+
111+
await _activeGroup.syncConnectMutex.lock(() async {
112+
// Disconnect a previous sync client, if one is active.
113+
await _abortCurrentSync();
114+
assert(_abortActiveSync == null);
115+
116+
// Install the abort controller for this particular connect call, allowing
117+
// it to be disconnected.
118+
_abortActiveSync = thisConnectAborter;
119+
await connectWithSyncLock();
120+
});
121+
}
122+
123+
void manuallyChangeSyncStatus(SyncStatus status) {
124+
if (status != currentStatus) {
125+
final newStatus = SyncStatus(
126+
connected: status.connected,
127+
downloading: status.downloading,
128+
uploading: status.uploading,
129+
connecting: status.connecting,
130+
uploadError: status.uploadError,
131+
downloadError: status.downloadError,
132+
priorityStatusEntries: status.priorityStatusEntries,
133+
downloadProgress: status.downloadProgress,
134+
// Note that currently the streaming sync implementation will never set
135+
// hasSynced. lastSyncedAt implies that syncing has completed at some
136+
// point (hasSynced = true).
137+
// The previous values of hasSynced should be preserved here.
138+
lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt,
139+
hasSynced: status.lastSyncedAt != null
140+
? true
141+
: status.hasSynced ?? currentStatus.hasSynced,
142+
);
143+
144+
// If the absence of hasSynced was the only difference, the new states
145+
// would be equal and don't require an event. So, check again.
146+
if (newStatus != currentStatus) {
147+
_currentStatus = newStatus;
148+
_statusController.add(currentStatus);
149+
}
150+
}
151+
}
152+
153+
void close() {
154+
_statusController.close();
155+
}
156+
}

0 commit comments

Comments
 (0)