From cd7464aea7c13d4322af48bab7772ba5ee17b752 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Mon, 22 Sep 2025 23:34:57 +0200 Subject: [PATCH 1/4] fix(llc): improve sync reliability and error handling This commit enhances the `sync` method in `StreamChatClient` to be more robust and handle potential issues during the synchronization process. Key changes include: - **Initialization of `lastSyncAt`:** If `lastSyncAt` is null (e.g., on a fresh app start or after a flush), it's now initialized to the current time. This prevents unnecessary sync attempts with an invalid timestamp. - **Handling 400 Errors:** If the sync operation returns a 400 error (often indicating that the sync time is too old or the data to sync is too large), the persistence client is now flushed, and `lastSyncAt` is reset. This allows the client to recover by performing a fresh sync. - **`lastSyncAt` Update Logic:** `lastSyncAt` is now updated to the `createdAt` timestamp of the last synced event, or to the current time if no events were synced. This ensures more accurate tracking of the synchronization state. - **Removal of `_lastSyncedAt`:** The internal `_lastSyncedAt` property in `StreamChatClient` has been removed, as its functionality is now covered by the persistence client's `lastSyncAt`. - **`flush` method in `ChatPersistenceClient`:** A new `flush` method has been added to the `ChatPersistenceClient` interface to allow for clearing all stored data. Additionally, tests have been added to cover these new behaviors, including scenarios with null `lastSyncAt` and 400 errors during sync. A `FakePersistenceClient` has been introduced for testing purposes. --- .../stream_chat/lib/src/client/client.dart | 66 +++++++++---------- .../lib/src/db/chat_persistence_client.dart | 3 + .../test/src/client/client_test.dart | 66 +++++++++++++++++++ .../src/db/chat_persistence_client_test.dart | 3 + packages/stream_chat/test/src/fakes.dart | 57 ++++++++++++++++ 5 files changed, 162 insertions(+), 33 deletions(-) diff --git a/packages/stream_chat/lib/src/client/client.dart b/packages/stream_chat/lib/src/client/client.dart index 7897b83ec..68939b8e0 100644 --- a/packages/stream_chat/lib/src/client/client.dart +++ b/packages/stream_chat/lib/src/client/client.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:collection/collection.dart'; import 'package:dio/dio.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; @@ -186,9 +187,6 @@ class StreamChatClient { late final RetryPolicy _retryPolicy; - /// the last dateTime at the which all the channels were synced - DateTime? _lastSyncedAt; - /// The retry policy options getter RetryPolicy get retryPolicy => _retryPolicy; @@ -519,25 +517,17 @@ class StreamChatClient { if (connectionRecovered) { // connection recovered - final cids = state.channels.keys.toList(growable: false); + final cids = [...state.channels.keys.toSet()]; if (cids.isNotEmpty) { await queryChannelsOnline( filter: Filter.in_('cid', cids), paginationParams: const PaginationParams(limit: 30), ); - if (persistenceEnabled) { - await sync(cids: cids, lastSyncAt: _lastSyncedAt); - } - } else { - // channels are empty, assuming it's a fresh start - // and making sure `lastSyncAt` is initialized - if (persistenceEnabled) { - final lastSyncAt = await chatPersistenceClient?.getLastSyncAt(); - if (lastSyncAt == null) { - await chatPersistenceClient?.updateLastSyncAt(DateTime.now()); - } - } + + // Sync the persistence client if available + if (persistenceEnabled) await sync(cids: cids); } + handleEvent(Event( type: EventType.connectionRecovered, online: true, @@ -569,34 +559,45 @@ class StreamChatClient { Future sync({List? cids, DateTime? lastSyncAt}) { return _syncLock.synchronized(() async { final channels = cids ?? await chatPersistenceClient?.getChannelCids(); - if (channels == null || channels.isEmpty) { - return; - } + if (channels == null || channels.isEmpty) return; final syncAt = lastSyncAt ?? await chatPersistenceClient?.getLastSyncAt(); if (syncAt == null) { - return; + logger.info('Fresh sync start: lastSyncAt initialized to now.'); + return chatPersistenceClient?.updateLastSyncAt(DateTime.now()); } try { + logger.info('Syncing events since $syncAt for channels: $channels'); + final res = await _chatApi.general.sync(channels, syncAt); - final events = res.events - ..sort((a, b) => a.createdAt.compareTo(b.createdAt)); + final events = res.events.sorted( + (a, b) => a.createdAt.compareTo(b.createdAt), + ); for (final event in events) { - logger.fine('event.type: ${event.type}'); - final messageText = event.message?.text; - if (messageText != null) { - logger.fine('event.message.text: $messageText'); - } + logger.fine('Syncing event: ${event.type}'); handleEvent(event); } - final now = DateTime.now(); - _lastSyncedAt = now; - chatPersistenceClient?.updateLastSyncAt(now); - } catch (e, stk) { - logger.severe('Error during sync', e, stk); + final updatedSyncAt = events.lastOrNull?.createdAt ?? DateTime.now(); + return chatPersistenceClient?.updateLastSyncAt(updatedSyncAt); + } catch (error, stk) { + // If we got a 400 error, it means that either the sync time is too + // old or the channel list is too long or too many events need to be + // synced. In this case, we should just flush the persistence client + // and start over. + if (error is StreamChatNetworkError && error.statusCode == 400) { + logger.warning( + 'Failed to sync events due to stale or oversized state. ' + 'Resetting the persistence client to enable a fresh start.', + ); + + await chatPersistenceClient?.flush(); + return chatPersistenceClient?.updateLastSyncAt(DateTime.now()); + } + + logger.warning('Error syncing events', error, stk); } }); } @@ -2071,7 +2072,6 @@ class StreamChatClient { // resetting state. state.dispose(); state = ClientState(this); - _lastSyncedAt = null; // resetting credentials. _tokenManager.reset(); diff --git a/packages/stream_chat/lib/src/db/chat_persistence_client.dart b/packages/stream_chat/lib/src/db/chat_persistence_client.dart index c91a5ffa1..68dd33e22 100644 --- a/packages/stream_chat/lib/src/db/chat_persistence_client.dart +++ b/packages/stream_chat/lib/src/db/chat_persistence_client.dart @@ -34,6 +34,9 @@ abstract class ChatPersistenceClient { /// If [flush] is true, the data will also be deleted Future disconnect({bool flush = false}); + /// Clears all the data stored in the persistence client. + Future flush(); + /// Get stored replies by messageId Future> getReplies( String parentId, { diff --git a/packages/stream_chat/test/src/client/client_test.dart b/packages/stream_chat/test/src/client/client_test.dart index 4eaad9f83..2b2606746 100644 --- a/packages/stream_chat/test/src/client/client_test.dart +++ b/packages/stream_chat/test/src/client/client_test.dart @@ -1,3 +1,5 @@ +// ignore_for_file: avoid_redundant_argument_values + import 'package:mocktail/mocktail.dart'; import 'package:stream_chat/src/core/http/token.dart'; import 'package:stream_chat/stream_chat.dart'; @@ -3590,5 +3592,69 @@ void main() { ); }, ); + + group('Sync Method Tests', () { + test( + 'should retrieve data from persistence client and sync successfully', + () async { + final cids = ['channel1', 'channel2']; + final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1)); + final fakeClient = FakePersistenceClient( + channelCids: cids, + lastSyncAt: lastSyncAt, + ); + + client.chatPersistenceClient = fakeClient; + when(() => api.general.sync(cids, lastSyncAt)).thenAnswer( + (_) async => SyncResponse()..events = [], + ); + + await client.sync(); + + verify(() => api.general.sync(cids, lastSyncAt)).called(1); + + final newLastSyncAt = await fakeClient.getLastSyncAt(); + expect(newLastSyncAt?.isAfter(lastSyncAt), isTrue); + }, + ); + + test('should set lastSyncAt on first sync when null', () async { + final fakeClient = FakePersistenceClient( + channelCids: ['channel1'], + lastSyncAt: null, + ); + + client.chatPersistenceClient = fakeClient; + + await client.sync(); + + expectLater(fakeClient.getLastSyncAt(), completion(isNotNull)); + verifyNever(() => api.general.sync(any(), any())); + }); + + test('should flush persistence client on 400 error', () async { + final cids = ['channel1']; + final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1)); + final fakeClient = FakePersistenceClient( + channelCids: cids, + lastSyncAt: lastSyncAt, + ); + + client.chatPersistenceClient = fakeClient; + when(() => api.general.sync(cids, lastSyncAt)).thenThrow( + StreamChatNetworkError.raw( + code: 4, + statusCode: 400, + message: 'Too many events', + ), + ); + + await client.sync(); + + expect(await fakeClient.getChannelCids(), isEmpty); // Should be flushed + + verify(() => api.general.sync(cids, lastSyncAt)).called(1); + }); + }); }); } diff --git a/packages/stream_chat/test/src/db/chat_persistence_client_test.dart b/packages/stream_chat/test/src/db/chat_persistence_client_test.dart index 3ebdfc32e..faf69c799 100644 --- a/packages/stream_chat/test/src/db/chat_persistence_client_test.dart +++ b/packages/stream_chat/test/src/db/chat_persistence_client_test.dart @@ -67,6 +67,9 @@ class TestPersistenceClient extends ChatPersistenceClient { @override Future disconnect({bool flush = false}) => throw UnimplementedError(); + @override + Future flush() => throw UnimplementedError(); + @override Future getChannelByCid(String cid) async => ChannelModel(cid: cid); diff --git a/packages/stream_chat/test/src/fakes.dart b/packages/stream_chat/test/src/fakes.dart index b1873d1ea..c55c444ee 100644 --- a/packages/stream_chat/test/src/fakes.dart +++ b/packages/stream_chat/test/src/fakes.dart @@ -43,6 +43,63 @@ class FakeTokenManager extends Fake implements TokenManager { class FakeMultiPartFile extends Fake implements MultipartFile {} +/// Fake persistence client for testing persistence client reliability features +class FakePersistenceClient extends Fake implements ChatPersistenceClient { + FakePersistenceClient({ + DateTime? lastSyncAt, + List? channelCids, + }) : _lastSyncAt = lastSyncAt, + _channelCids = channelCids ?? []; + + String? _userId; + bool _isConnected = false; + DateTime? _lastSyncAt; + List _channelCids; + + // Track method calls for testing + int connectCallCount = 0; + int disconnectCallCount = 0; + + @override + bool get isConnected => _isConnected; + + @override + String? get userId => _userId; + + @override + Future connect(String userId) async { + _userId = userId; + _isConnected = true; + connectCallCount++; + } + + @override + Future disconnect({bool flush = false}) async { + if (flush) await this.flush(); + + _userId = null; + _isConnected = false; + disconnectCallCount++; + } + + @override + Future flush() async { + _lastSyncAt = null; + _channelCids = []; + } + + @override + Future getLastSyncAt() async => _lastSyncAt; + + @override + Future updateLastSyncAt(DateTime lastSyncAt) async { + _lastSyncAt = lastSyncAt; + } + + @override + Future> getChannelCids() async => _channelCids; +} + class FakeChatApi extends Fake implements StreamChatApi { UserApi? _user; From a7b4f41b3da507bbbc29d1e031c303655814c71b Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Mon, 22 Sep 2025 23:35:40 +0200 Subject: [PATCH 2/4] feat(persistence): add client.flush() method to clear database This commit introduces a new `flush()` method to the `StreamChatPersistenceClient`. This method allows users to clear the entire database. A corresponding test case has been added to verify that the `flush()` method correctly removes all data from the database. --- packages/stream_chat_persistence/CHANGELOG.md | 33 ++++++++++++++----- .../src/stream_chat_persistence_client.dart | 7 ++++ .../stream_chat_persistence_client_test.dart | 31 +++++++++++++++++ 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 599aec84f..7999a67bb 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,3 +1,9 @@ +## Upcoming + +✅ Added + +- Added support for `client.flush()` method to clear database. + ## 9.16.0 - Updated `stream_chat` dependency to [`9.16.0`](https://pub.dev/packages/stream_chat/changelog). @@ -110,7 +116,8 @@ ## 7.2.0-hotfix.1 -- Updated `stream_chat` dependency to [`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog). +- Updated `stream_chat` dependency to [ + `7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog). ## 7.2.0 @@ -131,7 +138,8 @@ ## 7.0.0 - Updated minimum supported `SDK` version to Flutter 3.13/Dart 3.1 -- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use `getChannelStates.channelStateSort` instead. +- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use + `getChannelStates.channelStateSort` instead. ## 6.10.0 @@ -148,7 +156,8 @@ ## 6.7.0 -- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no such column `messages.state`. +- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no + such column `messages.state`. - Updated `stream_chat` dependency to [`6.7.0`](https://pub.dev/packages/stream_chat/changelog). ## 6.6.0 @@ -169,12 +178,14 @@ ## 6.2.0 -- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is connected to the database. +- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is + connected to the database. - [[#1422]](https://github.com/GetStream/stream-chat-flutter/issues/1422) Removed default values from `UserEntity` `createdAt` and `updatedAt` fields. - Updated `stream_chat` dependency to [`6.2.0`](https://pub.dev/packages/stream_chat/changelog). - Added support for `StreamChatPersistenceClient.openPersistenceConnection` - and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database connection. + and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database + connection. ## 6.1.0 @@ -202,7 +213,8 @@ ## 5.0.0-beta.1 -- Updated `stream_chat` dependency to [`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog). +- Updated `stream_chat` dependency to [ + `5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog). ## 4.4.0 @@ -234,7 +246,8 @@ ## 4.0.0-beta.0 -- Updated `stream_chat` dependency to [`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog). +- Updated `stream_chat` dependency to [ + `4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog). ## 3.1.0 @@ -243,8 +256,10 @@ ## 3.0.0 - Updated `stream_chat` dependency to [`3.0.0`](https://pub.dev/packages/stream_chat/changelog). -- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by enabling `pragma foreign_keys`. -- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for pinned messages. +- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by + enabling `pragma foreign_keys`. +- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for + pinned messages. ## 2.2.0 diff --git a/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart b/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart index f136a9fe5..588510381 100644 --- a/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart +++ b/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart @@ -461,6 +461,13 @@ class StreamChatPersistenceClient extends ChatPersistenceClient { return db!.transaction(() => super.updateChannelStates(channelStates)); } + @override + Future flush() { + assert(_debugIsConnected, ''); + _logger.info('flush'); + return db!.flush(); + } + @override Future disconnect({bool flush = false}) async { _logger.info('disconnect'); diff --git a/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart b/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart index 3b0cb7314..018327e92 100644 --- a/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart +++ b/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart @@ -54,6 +54,37 @@ void main() { expect(client.isConnected, false); }); + test('flush', () async { + const userId = 'testUserId'; + final client = StreamChatPersistenceClient(logLevel: Level.ALL); + + await client.connect(userId, databaseProvider: testDatabaseProvider); + addTearDown(() async => client.disconnect()); + + final connectionEvent = Event( + type: EventType.healthCheck, + createdAt: DateTime.timestamp(), + me: OwnUser(id: userId, name: 'Test User'), + ); + + await client.updateConnectionInfo(connectionEvent); + + // Add some test data + final testDate = DateTime.now(); + await client.updateLastSyncAt(testDate); + + // Verify data exists + final lastSyncAtBeforeFlush = await client.getLastSyncAt(); + expect(lastSyncAtBeforeFlush, isNotNull); + + // Flush the database + await client.flush(); + + // Verify data is cleared + final lastSyncAtAfterFlush = await client.getLastSyncAt(); + expect(lastSyncAtAfterFlush, isNull); + }); + test('client function throws stateError if db is not yet connected', () { final client = StreamChatPersistenceClient(logLevel: Level.ALL); expect( From 46d07dcdde8166028cac3c521c63cf164cf1bca7 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Mon, 22 Sep 2025 23:43:52 +0200 Subject: [PATCH 3/4] chore: update CHANGELOG.md --- packages/stream_chat/CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/stream_chat/CHANGELOG.md b/packages/stream_chat/CHANGELOG.md index fb5e53a74..9d242fae9 100644 --- a/packages/stream_chat/CHANGELOG.md +++ b/packages/stream_chat/CHANGELOG.md @@ -1,3 +1,10 @@ +## Upcoming + +🐞 Fixed + +- Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400 + error recovery, and automatic flushing of stale persistence data after 30 days of inactivity. + ## 9.16.0 🐞 Fixed From ef27ce5b0be2a0d3926340b531d641d5c3e2e5b3 Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Tue, 23 Sep 2025 17:08:26 +0200 Subject: [PATCH 4/4] fix(persistence): disable foreign key constraints during flush This commit modifies the `flush` method in `DriftChatDatabase` to temporarily disable foreign key constraints using `PRAGMA foreign_keys = OFF` before deleting all table data. This prevents foreign key constraint violations that could occur when deleting tables in an order that doesn't respect dependencies. The constraints are re-enabled using `PRAGMA foreign_keys = ON` in a `finally` block to ensure they are restored even if an error occurs during the deletion process. --- .../lib/src/db/drift_chat_database.dart | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart b/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart index 5dc89d8a7..b7e761d78 100644 --- a/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart +++ b/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart @@ -73,11 +73,18 @@ class DriftChatDatabase extends _$DriftChatDatabase { ); /// Deletes all the tables - Future flush() => batch((batch) { - allTables.forEach((table) { - delete(table).go(); - }); + Future flush() async { + await customStatement('PRAGMA foreign_keys = OFF'); + try { + await transaction(() async { + for (final table in allTables) { + await delete(table).go(); + } }); + } finally { + await customStatement('PRAGMA foreign_keys = ON'); + } + } /// Closes the database instance Future disconnect() => close();