@@ -47,7 +47,9 @@ class StreamingSyncImplementation implements StreamingSync {
4747 StreamController <Null >.broadcast ();
4848
4949 final http.Client _client;
50- final SyncStatusStateStream _state = SyncStatusStateStream ();
50+
51+ @visibleForTesting
52+ final SyncStatusStateStream state = SyncStatusStateStream ();
5153
5254 AbortController ? _abort;
5355
@@ -79,13 +81,14 @@ class StreamingSyncImplementation implements StreamingSync {
7981 Duration get _retryDelay => options.retryDelay;
8082
8183 @override
82- Stream <SyncStatus > get statusStream => _state .statusStream;
84+ Stream <SyncStatus > get statusStream => state .statusStream;
8385
8486 @override
8587 Future <void > abort () async {
8688 // If streamingSync() hasn't been called yet, _abort will be null.
8789 if (_abort case final abort? ) {
8890 final future = abort.abort ();
91+ _internalCrudTriggerController.close ();
8992
9093 // This immediately triggers a new iteration in the merged stream, allowing us
9194 // to break immediately.
@@ -95,11 +98,14 @@ class StreamingSyncImplementation implements StreamingSync {
9598
9699 // Wait for the abort to complete, which also guarantees that no requests
97100 // are pending.
98- await future;
101+ await Future .wait ([
102+ future,
103+ if (_activeCrudUpload case final activeUpload? ) activeUpload.future,
104+ ]);
99105 await _nonLineSyncEvents.close ();
100106
101107 _client.close ();
102- _state .close ();
108+ state .close ();
103109 }
104110 }
105111
@@ -115,7 +121,7 @@ class StreamingSyncImplementation implements StreamingSync {
115121 _crudLoop ();
116122 var invalidCredentials = false ;
117123 while (! aborted) {
118- _state .updateStatus ((s) => s.setConnectingIfNotConnected ());
124+ state .updateStatus ((s) => s.setConnectingIfNotConnected ());
119125 try {
120126 if (invalidCredentials) {
121127 // This may error. In that case it will be retried again on the next
@@ -137,7 +143,7 @@ class StreamingSyncImplementation implements StreamingSync {
137143 logger.warning ('Sync error: $message ' , e, stacktrace);
138144 invalidCredentials = true ;
139145
140- _state .updateStatus ((s) => s.applyDownloadError (e));
146+ state .updateStatus ((s) => s.applyDownloadError (e));
141147
142148 // On error, wait a little before retrying
143149 // When aborting, don't wait
@@ -183,7 +189,7 @@ class StreamingSyncImplementation implements StreamingSync {
183189 // This is the first item in the FIFO CRUD queue.
184190 CrudEntry ? nextCrudItem = await adapter.nextCrudItem ();
185191 if (nextCrudItem != null ) {
186- _state .updateStatus ((s) => s.uploading = true );
192+ state .updateStatus ((s) => s.uploading = true );
187193 if (nextCrudItem.clientId == checkedCrudItem? .clientId) {
188194 // This will force a higher log level than exceptions which are caught here.
189195 logger.warning (
@@ -196,7 +202,7 @@ class StreamingSyncImplementation implements StreamingSync {
196202
197203 checkedCrudItem = nextCrudItem;
198204 await connector.uploadCrud ();
199- _state .updateStatus ((s) => s.uploadError = null );
205+ state .updateStatus ((s) => s.uploadError = null );
200206 } else {
201207 // Uploading is completed
202208 await adapter.updateLocalTarget (() => getWriteCheckpoint ());
@@ -205,10 +211,10 @@ class StreamingSyncImplementation implements StreamingSync {
205211 } catch (e, stacktrace) {
206212 checkedCrudItem = null ;
207213 logger.warning ('Data upload error' , e, stacktrace);
208- _state .updateStatus ((s) => s.applyUploadError (e));
214+ state .updateStatus ((s) => s.applyUploadError (e));
209215 await _delayRetry ();
210216
211- if (! _state .status.connected) {
217+ if (! state .status.connected) {
212218 // Exit the upload loop if the sync stream is no longer connected
213219 break ;
214220 }
@@ -217,12 +223,15 @@ class StreamingSyncImplementation implements StreamingSync {
217223 e,
218224 stacktrace);
219225 } finally {
220- _state .updateStatus ((s) => s.uploading = false );
226+ state .updateStatus ((s) => s.uploading = false );
221227 }
222228 }
223229 }, timeout: _retryDelay).whenComplete (() {
230+ if (! aborted) {
231+ _nonLineSyncEvents.add (const UploadCompleted ());
232+ }
233+
224234 assert (identical (_activeCrudUpload, completer));
225- _nonLineSyncEvents.add (const UploadCompleted ());
226235 _activeCrudUpload = null ;
227236 completer.complete ();
228237 });
@@ -255,7 +264,7 @@ class StreamingSyncImplementation implements StreamingSync {
255264 }
256265
257266 void _updateStatusForPriority (SyncPriorityStatus completed) {
258- _state .updateStatus ((s) {
267+ state .updateStatus ((s) {
259268 // All status entries with a higher priority can be deleted since this
260269 // partial sync includes them.
261270 s.priorityStatusEntries = [
@@ -316,7 +325,7 @@ class StreamingSyncImplementation implements StreamingSync {
316325 bucketMap = newBuckets;
317326 await adapter.removeBuckets ([...bucketsToDelete]);
318327 final initialProgress = await adapter.getBucketOperationProgress ();
319- _state .updateStatus (
328+ state .updateStatus (
320329 (s) => s.applyCheckpointStarted (initialProgress, line));
321330 case StreamingSyncCheckpointComplete ():
322331 final result = await _applyCheckpoint (targetCheckpoint! , _abort);
@@ -367,7 +376,7 @@ class StreamingSyncImplementation implements StreamingSync {
367376 writeCheckpoint: diff.writeCheckpoint);
368377 targetCheckpoint = newCheckpoint;
369378 final initialProgress = await adapter.getBucketOperationProgress ();
370- _state .updateStatus (
379+ state .updateStatus (
371380 (s) => s.applyCheckpointStarted (initialProgress, newCheckpoint));
372381
373382 bucketMap = newBuckets.map ((name, checksum) =>
@@ -377,7 +386,7 @@ class StreamingSyncImplementation implements StreamingSync {
377386 case SyncDataBatch ():
378387 // TODO: This increments the counters before actually saving sync
379388 // data. Might be fine though?
380- _state .updateStatus ((s) => s.applyBatchReceived (line));
389+ state .updateStatus ((s) => s.applyBatchReceived (line));
381390 await adapter.saveSyncData (line);
382391 case StreamingSyncKeepalive (: final tokenExpiresIn):
383392 if (tokenExpiresIn == 0 ) {
@@ -392,7 +401,9 @@ class StreamingSyncImplementation implements StreamingSync {
392401 haveInvalidated = true ;
393402 // trigger next loop iteration ASAP, don't wait for another
394403 // message from the server.
395- _nonLineSyncEvents.add (TokenRefreshComplete ());
404+ if (! aborted) {
405+ _nonLineSyncEvents.add (TokenRefreshComplete ());
406+ }
396407 }, onError: (_) {
397408 // Token refresh failed - retry on next keepalive.
398409 credentialsInvalidation = null ;
@@ -422,7 +433,7 @@ class StreamingSyncImplementation implements StreamingSync {
422433 throw AssertionError ('unreachable' );
423434 }
424435
425- _state .updateStatus ((s) => s.setConnected ());
436+ state .updateStatus ((s) => s.setConnected ());
426437 if (haveInvalidated) {
427438 // Stop this connection, so that a new one will be started
428439 break ;
@@ -462,7 +473,7 @@ class StreamingSyncImplementation implements StreamingSync {
462473 if (result.checkpointValid && result.ready) {
463474 logger.fine ('validated checkpoint: $targetCheckpoint ' );
464475
465- _state .updateStatus ((s) => s.applyCheckpointReached (targetCheckpoint));
476+ state .updateStatus ((s) => s.applyCheckpointReached (targetCheckpoint));
466477
467478 return const (abort: false , didApply: true );
468479 } else {
@@ -501,22 +512,18 @@ class StreamingSyncImplementation implements StreamingSync {
501512 return res;
502513 }
503514
504- Stream <String > _rawStreamingSyncRequest (Object ? data) {
505- return Stream .fromFuture (_postStreamRequest (data)).asyncExpand ((stream) {
506- if (stream == null ) {
507- return const Stream .empty ();
508- }
509-
510- return stream.stream.lines;
511- });
515+ Stream <String > _rawStreamingSyncRequest (Object ? data) async * {
516+ final response = await _postStreamRequest (data);
517+ if (response != null ) {
518+ yield * response.stream.lines;
519+ }
512520 }
513521
514522 Stream <StreamingSyncLine > _streamingSyncRequest (StreamingSyncRequest data) {
515523 return _rawStreamingSyncRequest (data)
516524 .parseJson
517525 .cast <Map <String , dynamic >>()
518- .transform (StreamingSyncLine .reader)
519- .takeWhile ((_) => ! aborted);
526+ .transform (StreamingSyncLine .reader);
520527 }
521528
522529 /// Delays the standard `retryDelay` Duration, but exits early if
0 commit comments