Skip to content

Commit f5de535

Browse files
authored
Merge pull request #279 from powersync-ja/test-disconnect-while-fetching-credentials
Allow disconnecting in `fetchCredentials` callback
2 parents 82c774f + fb45a1a commit f5de535

8 files changed

+76
-15
lines changed

packages/powersync_core/lib/src/abort_controller.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class AbortController {
2525
_abortRequested.complete();
2626
}
2727

28-
await _abortCompleter.future;
28+
await onCompletion;
2929
}
3030

3131
/// Signal that an abort has completed.

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ class PowerSyncDatabaseImpl
120120
required PowerSyncBackendConnector connector,
121121
required Duration crudThrottleTime,
122122
required AbortController abort,
123+
required Zone asyncWorkZone,
123124
Map<String, dynamic>? params,
124125
}) async {
125126
final dbRef = database.isolateConnectionFactory();
@@ -157,7 +158,7 @@ class PowerSyncDatabaseImpl
157158
await waitForShutdown();
158159
}
159160

160-
receiveMessages.listen((data) async {
161+
Future<void> handleMessage(Object? data) async {
161162
if (data is List) {
162163
String action = data[0] as String;
163164
if (action == "getCredentials") {
@@ -192,7 +193,14 @@ class PowerSyncDatabaseImpl
192193
record.level, record.message, record.error, record.stackTrace);
193194
}
194195
}
195-
});
196+
}
197+
198+
// This function is called in a Zone marking the connection lock as locked.
199+
// This is used to prevent reentrant calls to the lock (which would be a
200+
// deadlock). However, the lock is returned as soon as this function
201+
// returns - and handleMessage may run later. So, make sure we run those
202+
// callbacks in the parent zone.
203+
receiveMessages.listen(asyncWorkZone.bindUnaryCallback(handleMessage));
196204

197205
receiveUnhandledErrors.listen((message) async {
198206
// Sample error:

packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class PowerSyncDatabaseImpl
114114
{required PowerSyncBackendConnector connector,
115115
required Duration crudThrottleTime,
116116
required AbortController abort,
117+
required Zone asyncWorkZone,
117118
Map<String, dynamic>? params}) {
118119
throw UnimplementedError();
119120
}

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
282282

283283
clientParams = params;
284284
var thisConnectAborter = AbortController();
285+
final zone = Zone.current;
285286

286287
late void Function() retryHandler;
287288

@@ -296,6 +297,9 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
296297
crudThrottleTime: crudThrottleTime,
297298
params: params,
298299
abort: thisConnectAborter,
300+
// Run follow-up async tasks in the parent zone, a new one is introduced
301+
// while we hold the lock (and async tasks won't hold the sync lock).
302+
asyncWorkZone: zone,
299303
);
300304

301305
thisConnectAborter.onCompletion.whenComplete(retryHandler);
@@ -347,6 +351,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
347351
required PowerSyncBackendConnector connector,
348352
required Duration crudThrottleTime,
349353
required AbortController abort,
354+
required Zone asyncWorkZone,
350355
Map<String, dynamic>? params,
351356
});
352357

@@ -372,7 +377,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
372377
_abortActiveSync = null;
373378
} else {
374379
/// Wait for the abort to complete. Continue updating the sync status after completed
375-
await disconnector.onAbort;
380+
await disconnector.onCompletion;
376381
}
377382
}
378383
}

packages/powersync_core/lib/src/database/web/web_powersync_database.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class PowerSyncDatabaseImpl
116116
required PowerSyncBackendConnector connector,
117117
required Duration crudThrottleTime,
118118
required AbortController abort,
119+
required Zone asyncWorkZone,
119120
Map<String, dynamic>? params,
120121
}) async {
121122
final crudStream =

packages/powersync_core/test/streaming_sync_test.dart

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import 'package:logging/logging.dart';
99
import 'package:powersync_core/powersync_core.dart';
1010
import 'package:test/test.dart';
1111

12+
import 'server/sync_server/in_memory_sync_server.dart';
1213
import 'test_server.dart';
1314
import 'utils/abstract_test_utils.dart';
1415
import 'utils/test_utils_impl.dart';
@@ -61,6 +62,45 @@ void main() {
6162
server.close();
6263
});
6364

65+
test('can disconnect in fetchCredentials', () async {
66+
final service = MockSyncService();
67+
final server = await createServer(mockSyncService: service);
68+
final ignoreLogger = Logger.detached('powersync.test');
69+
70+
final pdb =
71+
await testUtils.setupPowerSync(path: path, logger: ignoreLogger);
72+
pdb.retryDelay = Duration(milliseconds: 50);
73+
final connector = TestConnector(expectAsync0(() async {
74+
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
75+
}));
76+
77+
await pdb.connect(connector: connector);
78+
while (server.connectionCount != 1) {
79+
await Future<void>.delayed(const Duration(milliseconds: 100));
80+
}
81+
82+
service.addKeepAlive(60);
83+
84+
final didDisconnect = Completer<void>();
85+
86+
connector.fetchCredentialsCallback = expectAsync0(() async {
87+
didDisconnect.complete(pdb.disconnect());
88+
89+
throw 'deliberate disconnect';
90+
});
91+
92+
service.addKeepAlive(0);
93+
await didDisconnect.future;
94+
expect(pdb.currentStatus.connected, isFalse);
95+
// The error should be cleared after calling disconnect
96+
expect(pdb.currentStatus.downloadError, isNull);
97+
98+
// Wait for a short while to make sure the database doesn't reconnect.
99+
for (var i = 0; i < 10; i++) {
100+
expect(pdb.currentStatus.connecting, isFalse);
101+
}
102+
});
103+
64104
test('can connect as initial operation', () async {
65105
final server = await createServer();
66106
final ignoreLogger = Logger.detached('powersync.test');

packages/powersync_core/test/test_server.dart

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ import 'package:shelf/shelf.dart';
88
import 'package:shelf/shelf_io.dart' as shelf_io;
99
import 'package:shelf_router/shelf_router.dart';
1010

11-
class TestServer {
11+
import 'server/sync_server/in_memory_sync_server.dart';
12+
13+
final class TestServer {
1214
late HttpServer server;
1315
Router app = Router();
1416
int maxConnectionCount = 0;
1517
int tokenExpiresIn;
1618

1719
TestServer({this.tokenExpiresIn = 65});
1820

19-
Future<void> init() async {
21+
Future<void> init({MockSyncService? mockSyncService}) async {
2022
app.post('/sync/stream', handleSyncStream);
2123
// Open on an arbitrary open port
22-
server = await shelf_io.serve(app.call, 'localhost', 0);
24+
server = await shelf_io.serve(
25+
mockSyncService?.router.call ?? app.call, 'localhost', 0);
2326
}
2427

2528
String get endpoint {
@@ -34,6 +37,9 @@ class TestServer {
3437
return server.connectionsInfo();
3538
}
3639

40+
/// The default response if no [MockSyncService] has been passed to [init].
41+
///
42+
/// This will emit keepalive messages frequently.
3743
Future<Response> handleSyncStream(Request request) async {
3844
maxConnectionCount = max(connectionCount, maxConnectionCount);
3945

@@ -61,9 +67,9 @@ class TestServer {
6167
}
6268
}
6369

64-
Future<TestServer> createServer() async {
70+
Future<TestServer> createServer({MockSyncService? mockSyncService}) async {
6571
var server = TestServer();
66-
await server.init();
72+
await server.init(mockSyncService: mockSyncService);
6773
return server;
6874
}
6975

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,21 +125,21 @@ abstract class AbstractTestUtils {
125125
}
126126

127127
class TestConnector extends PowerSyncBackendConnector {
128-
final Future<PowerSyncCredentials> Function() _fetchCredentials;
129-
final Future<void> Function(PowerSyncDatabase)? _uploadData;
128+
Future<PowerSyncCredentials> Function() fetchCredentialsCallback;
129+
Future<void> Function(PowerSyncDatabase)? uploadDataCallback;
130130

131-
TestConnector(this._fetchCredentials,
131+
TestConnector(this.fetchCredentialsCallback,
132132
{Future<void> Function(PowerSyncDatabase)? uploadData})
133-
: _uploadData = uploadData;
133+
: uploadDataCallback = uploadData;
134134

135135
@override
136136
Future<PowerSyncCredentials?> fetchCredentials() {
137-
return _fetchCredentials();
137+
return fetchCredentialsCallback();
138138
}
139139

140140
@override
141141
Future<void> uploadData(PowerSyncDatabase database) async {
142-
await _uploadData?.call(database);
142+
await uploadDataCallback?.call(database);
143143
}
144144
}
145145

0 commit comments

Comments
 (0)