Skip to content

Commit 07254dc

Browse files
committed
Only set connected on first line
1 parent 413206f commit 07254dc

File tree

1 file changed

+16
-17
lines changed

1 file changed

+16
-17
lines changed

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ class StreamingSyncImplementation implements StreamingSync {
4848

4949
final http.Client _client;
5050

51-
@visibleForTesting
52-
final SyncStatusStateStream state = SyncStatusStateStream();
51+
final SyncStatusStateStream _state = SyncStatusStateStream();
5352

5453
AbortController? _abort;
5554

@@ -81,7 +80,7 @@ class StreamingSyncImplementation implements StreamingSync {
8180
Duration get _retryDelay => options.retryDelay;
8281

8382
@override
84-
Stream<SyncStatus> get statusStream => state.statusStream;
83+
Stream<SyncStatus> get statusStream => _state.statusStream;
8584

8685
@override
8786
Future<void> abort() async {
@@ -105,7 +104,7 @@ class StreamingSyncImplementation implements StreamingSync {
105104
await _nonLineSyncEvents.close();
106105

107106
_client.close();
108-
state.close();
107+
_state.close();
109108
}
110109
}
111110

@@ -121,7 +120,7 @@ class StreamingSyncImplementation implements StreamingSync {
121120
_crudLoop();
122121
var invalidCredentials = false;
123122
while (!aborted) {
124-
state.updateStatus((s) => s.setConnectingIfNotConnected());
123+
_state.updateStatus((s) => s.setConnectingIfNotConnected());
125124
try {
126125
if (invalidCredentials) {
127126
// This may error. In that case it will be retried again on the next
@@ -143,7 +142,7 @@ class StreamingSyncImplementation implements StreamingSync {
143142
logger.warning('Sync error: $message', e, stacktrace);
144143
invalidCredentials = true;
145144

146-
state.updateStatus((s) => s.applyDownloadError(e));
145+
_state.updateStatus((s) => s.applyDownloadError(e));
147146

148147
// On error, wait a little before retrying
149148
// When aborting, don't wait
@@ -189,7 +188,7 @@ class StreamingSyncImplementation implements StreamingSync {
189188
// This is the first item in the FIFO CRUD queue.
190189
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
191190
if (nextCrudItem != null) {
192-
state.updateStatus((s) => s.uploading = true);
191+
_state.updateStatus((s) => s.uploading = true);
193192
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
194193
// This will force a higher log level than exceptions which are caught here.
195194
logger.warning(
@@ -202,7 +201,7 @@ class StreamingSyncImplementation implements StreamingSync {
202201

203202
checkedCrudItem = nextCrudItem;
204203
await connector.uploadCrud();
205-
state.updateStatus((s) => s.uploadError = null);
204+
_state.updateStatus((s) => s.uploadError = null);
206205
} else {
207206
// Uploading is completed
208207
await adapter.updateLocalTarget(() => getWriteCheckpoint());
@@ -211,10 +210,10 @@ class StreamingSyncImplementation implements StreamingSync {
211210
} catch (e, stacktrace) {
212211
checkedCrudItem = null;
213212
logger.warning('Data upload error', e, stacktrace);
214-
state.updateStatus((s) => s.applyUploadError(e));
213+
_state.updateStatus((s) => s.applyUploadError(e));
215214
await _delayRetry();
216215

217-
if (!state.status.connected) {
216+
if (!_state.status.connected) {
218217
// Exit the upload loop if the sync stream is no longer connected
219218
break;
220219
}
@@ -223,7 +222,7 @@ class StreamingSyncImplementation implements StreamingSync {
223222
e,
224223
stacktrace);
225224
} finally {
226-
state.updateStatus((s) => s.uploading = false);
225+
_state.updateStatus((s) => s.uploading = false);
227226
}
228227
}
229228
}, timeout: _retryDelay).whenComplete(() {
@@ -264,7 +263,7 @@ class StreamingSyncImplementation implements StreamingSync {
264263
}
265264

266265
void _updateStatusForPriority(SyncPriorityStatus completed) {
267-
state.updateStatus((s) {
266+
_state.updateStatus((s) {
268267
// All status entries with a higher priority can be deleted since this
269268
// partial sync includes them.
270269
s.priorityStatusEntries = [
@@ -325,7 +324,7 @@ class StreamingSyncImplementation implements StreamingSync {
325324
bucketMap = newBuckets;
326325
await adapter.removeBuckets([...bucketsToDelete]);
327326
final initialProgress = await adapter.getBucketOperationProgress();
328-
state.updateStatus(
327+
_state.updateStatus(
329328
(s) => s.applyCheckpointStarted(initialProgress, line));
330329
case StreamingSyncCheckpointComplete():
331330
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
@@ -376,7 +375,7 @@ class StreamingSyncImplementation implements StreamingSync {
376375
writeCheckpoint: diff.writeCheckpoint);
377376
targetCheckpoint = newCheckpoint;
378377
final initialProgress = await adapter.getBucketOperationProgress();
379-
state.updateStatus(
378+
_state.updateStatus(
380379
(s) => s.applyCheckpointStarted(initialProgress, newCheckpoint));
381380

382381
bucketMap = newBuckets.map((name, checksum) =>
@@ -386,7 +385,7 @@ class StreamingSyncImplementation implements StreamingSync {
386385
case SyncDataBatch():
387386
// TODO: This increments the counters before actually saving sync
388387
// data. Might be fine though?
389-
state.updateStatus((s) => s.applyBatchReceived(line));
388+
_state.updateStatus((s) => s.applyBatchReceived(line));
390389
await adapter.saveSyncData(line);
391390
case StreamingSyncKeepalive(:final tokenExpiresIn):
392391
if (tokenExpiresIn == 0) {
@@ -421,6 +420,7 @@ class StreamingSyncImplementation implements StreamingSync {
421420

422421
switch (line) {
423422
case ReceivedLine(:final line):
423+
_state.updateStatus((s) => s.setConnected());
424424
await handleLine(line as StreamingSyncLine);
425425
case UploadCompleted():
426426
// Only relevant for the Rust sync implementation.
@@ -433,7 +433,6 @@ class StreamingSyncImplementation implements StreamingSync {
433433
throw AssertionError('unreachable');
434434
}
435435

436-
state.updateStatus((s) => s.setConnected());
437436
if (haveInvalidated) {
438437
// Stop this connection, so that a new one will be started
439438
break;
@@ -473,7 +472,7 @@ class StreamingSyncImplementation implements StreamingSync {
473472
if (result.checkpointValid && result.ready) {
474473
logger.fine('validated checkpoint: $targetCheckpoint');
475474

476-
state.updateStatus((s) => s.applyCheckpointReached(targetCheckpoint));
475+
_state.updateStatus((s) => s.applyCheckpointReached(targetCheckpoint));
477476

478477
return const (abort: false, didApply: true);
479478
} else {

0 commit comments

Comments
 (0)