diff --git a/packages/stream_chat/CHANGELOG.md b/packages/stream_chat/CHANGELOG.md index 63b13cb05..a9fe56004 100644 --- a/packages/stream_chat/CHANGELOG.md +++ b/packages/stream_chat/CHANGELOG.md @@ -1,5 +1,10 @@ ## Upcoming +🐞 Fixed + +- Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400 + error recovery, and automatic flushing of stale persistence data after 30 days of inactivity. + ✅ Added - Added support for `Channel.messageCount` field. diff --git a/packages/stream_chat/lib/src/client/client.dart b/packages/stream_chat/lib/src/client/client.dart index fee7eef0e..0fa074b47 100644 --- a/packages/stream_chat/lib/src/client/client.dart +++ b/packages/stream_chat/lib/src/client/client.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:collection/collection.dart'; import 'package:dio/dio.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; @@ -186,9 +187,6 @@ class StreamChatClient { late final RetryPolicy _retryPolicy; - /// the last dateTime at the which all the channels were synced - DateTime? _lastSyncedAt; - /// The retry policy options getter RetryPolicy get retryPolicy => _retryPolicy; @@ -519,25 +517,17 @@ class StreamChatClient { if (connectionRecovered) { // connection recovered - final cids = state.channels.keys.toList(growable: false); + final cids = [...state.channels.keys.toSet()]; if (cids.isNotEmpty) { await queryChannelsOnline( filter: Filter.in_('cid', cids), paginationParams: const PaginationParams(limit: 30), ); - if (persistenceEnabled) { - await sync(cids: cids, lastSyncAt: _lastSyncedAt); - } - } else { - // channels are empty, assuming it's a fresh start - // and making sure `lastSyncAt` is initialized - if (persistenceEnabled) { - final lastSyncAt = await chatPersistenceClient?.getLastSyncAt(); - if (lastSyncAt == null) { - await chatPersistenceClient?.updateLastSyncAt(DateTime.now()); - } - } + + // Sync the persistence client if available + if (persistenceEnabled) await sync(cids: cids); } + handleEvent(Event( type: EventType.connectionRecovered, online: true, @@ -569,34 +559,45 @@ class StreamChatClient { Future sync({List? cids, DateTime? lastSyncAt}) { return _syncLock.synchronized(() async { final channels = cids ?? await chatPersistenceClient?.getChannelCids(); - if (channels == null || channels.isEmpty) { - return; - } + if (channels == null || channels.isEmpty) return; final syncAt = lastSyncAt ?? await chatPersistenceClient?.getLastSyncAt(); if (syncAt == null) { - return; + logger.info('Fresh sync start: lastSyncAt initialized to now.'); + return chatPersistenceClient?.updateLastSyncAt(DateTime.now()); } try { + logger.info('Syncing events since $syncAt for channels: $channels'); + final res = await _chatApi.general.sync(channels, syncAt); - final events = res.events - ..sort((a, b) => a.createdAt.compareTo(b.createdAt)); + final events = res.events.sorted( + (a, b) => a.createdAt.compareTo(b.createdAt), + ); for (final event in events) { - logger.fine('event.type: ${event.type}'); - final messageText = event.message?.text; - if (messageText != null) { - logger.fine('event.message.text: $messageText'); - } + logger.fine('Syncing event: ${event.type}'); handleEvent(event); } - final now = DateTime.now(); - _lastSyncedAt = now; - chatPersistenceClient?.updateLastSyncAt(now); - } catch (e, stk) { - logger.severe('Error during sync', e, stk); + final updatedSyncAt = events.lastOrNull?.createdAt ?? DateTime.now(); + return chatPersistenceClient?.updateLastSyncAt(updatedSyncAt); + } catch (error, stk) { + // If we got a 400 error, it means that either the sync time is too + // old or the channel list is too long or too many events need to be + // synced. In this case, we should just flush the persistence client + // and start over. + if (error is StreamChatNetworkError && error.statusCode == 400) { + logger.warning( + 'Failed to sync events due to stale or oversized state. ' + 'Resetting the persistence client to enable a fresh start.', + ); + + await chatPersistenceClient?.flush(); + return chatPersistenceClient?.updateLastSyncAt(DateTime.now()); + } + + logger.warning('Error syncing events', error, stk); } }); } @@ -2102,7 +2103,6 @@ class StreamChatClient { // resetting state. state.dispose(); state = ClientState(this); - _lastSyncedAt = null; // resetting credentials. _tokenManager.reset(); diff --git a/packages/stream_chat/lib/src/db/chat_persistence_client.dart b/packages/stream_chat/lib/src/db/chat_persistence_client.dart index f63536d1b..80fe63875 100644 --- a/packages/stream_chat/lib/src/db/chat_persistence_client.dart +++ b/packages/stream_chat/lib/src/db/chat_persistence_client.dart @@ -34,6 +34,9 @@ abstract class ChatPersistenceClient { /// If [flush] is true, the data will also be deleted Future disconnect({bool flush = false}); + /// Clears all the data stored in the persistence client. + Future flush(); + /// Get stored replies by messageId Future> getReplies( String parentId, { diff --git a/packages/stream_chat/test/src/client/client_test.dart b/packages/stream_chat/test/src/client/client_test.dart index 50182411b..bd7b9b455 100644 --- a/packages/stream_chat/test/src/client/client_test.dart +++ b/packages/stream_chat/test/src/client/client_test.dart @@ -1,3 +1,5 @@ +// ignore_for_file: avoid_redundant_argument_values + import 'package:mocktail/mocktail.dart'; import 'package:stream_chat/src/core/http/token.dart'; import 'package:stream_chat/stream_chat.dart'; @@ -3668,5 +3670,69 @@ void main() { ); }, ); + + group('Sync Method Tests', () { + test( + 'should retrieve data from persistence client and sync successfully', + () async { + final cids = ['channel1', 'channel2']; + final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1)); + final fakeClient = FakePersistenceClient( + channelCids: cids, + lastSyncAt: lastSyncAt, + ); + + client.chatPersistenceClient = fakeClient; + when(() => api.general.sync(cids, lastSyncAt)).thenAnswer( + (_) async => SyncResponse()..events = [], + ); + + await client.sync(); + + verify(() => api.general.sync(cids, lastSyncAt)).called(1); + + final newLastSyncAt = await fakeClient.getLastSyncAt(); + expect(newLastSyncAt?.isAfter(lastSyncAt), isTrue); + }, + ); + + test('should set lastSyncAt on first sync when null', () async { + final fakeClient = FakePersistenceClient( + channelCids: ['channel1'], + lastSyncAt: null, + ); + + client.chatPersistenceClient = fakeClient; + + await client.sync(); + + expectLater(fakeClient.getLastSyncAt(), completion(isNotNull)); + verifyNever(() => api.general.sync(any(), any())); + }); + + test('should flush persistence client on 400 error', () async { + final cids = ['channel1']; + final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1)); + final fakeClient = FakePersistenceClient( + channelCids: cids, + lastSyncAt: lastSyncAt, + ); + + client.chatPersistenceClient = fakeClient; + when(() => api.general.sync(cids, lastSyncAt)).thenThrow( + StreamChatNetworkError.raw( + code: 4, + statusCode: 400, + message: 'Too many events', + ), + ); + + await client.sync(); + + expect(await fakeClient.getChannelCids(), isEmpty); // Should be flushed + + verify(() => api.general.sync(cids, lastSyncAt)).called(1); + }); + }); }); } diff --git a/packages/stream_chat/test/src/db/chat_persistence_client_test.dart b/packages/stream_chat/test/src/db/chat_persistence_client_test.dart index 3ebdfc32e..faf69c799 100644 --- a/packages/stream_chat/test/src/db/chat_persistence_client_test.dart +++ b/packages/stream_chat/test/src/db/chat_persistence_client_test.dart @@ -67,6 +67,9 @@ class TestPersistenceClient extends ChatPersistenceClient { @override Future disconnect({bool flush = false}) => throw UnimplementedError(); + @override + Future flush() => throw UnimplementedError(); + @override Future getChannelByCid(String cid) async => ChannelModel(cid: cid); diff --git a/packages/stream_chat/test/src/fakes.dart b/packages/stream_chat/test/src/fakes.dart index b1873d1ea..c55c444ee 100644 --- a/packages/stream_chat/test/src/fakes.dart +++ b/packages/stream_chat/test/src/fakes.dart @@ -43,6 +43,63 @@ class FakeTokenManager extends Fake implements TokenManager { class FakeMultiPartFile extends Fake implements MultipartFile {} +/// Fake persistence client for testing persistence client reliability features +class FakePersistenceClient extends Fake implements ChatPersistenceClient { + FakePersistenceClient({ + DateTime? lastSyncAt, + List? channelCids, + }) : _lastSyncAt = lastSyncAt, + _channelCids = channelCids ?? []; + + String? _userId; + bool _isConnected = false; + DateTime? _lastSyncAt; + List _channelCids; + + // Track method calls for testing + int connectCallCount = 0; + int disconnectCallCount = 0; + + @override + bool get isConnected => _isConnected; + + @override + String? get userId => _userId; + + @override + Future connect(String userId) async { + _userId = userId; + _isConnected = true; + connectCallCount++; + } + + @override + Future disconnect({bool flush = false}) async { + if (flush) await this.flush(); + + _userId = null; + _isConnected = false; + disconnectCallCount++; + } + + @override + Future flush() async { + _lastSyncAt = null; + _channelCids = []; + } + + @override + Future getLastSyncAt() async => _lastSyncAt; + + @override + Future updateLastSyncAt(DateTime lastSyncAt) async { + _lastSyncAt = lastSyncAt; + } + + @override + Future> getChannelCids() async => _channelCids; +} + class FakeChatApi extends Fake implements StreamChatApi { UserApi? _user; diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 8d343e8a1..33ef8a8a2 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -1,5 +1,8 @@ ## Upcoming +✅ Added + +- Added support for `client.flush()` method to clear database. - Added support for `Channel.messageCount` field. ## 9.17.0 diff --git a/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart b/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart index e7f4bd141..4934f4e0b 100644 --- a/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart +++ b/packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart @@ -73,11 +73,18 @@ class DriftChatDatabase extends _$DriftChatDatabase { ); /// Deletes all the tables - Future flush() => batch((batch) { - allTables.forEach((table) { - delete(table).go(); - }); + Future flush() async { + await customStatement('PRAGMA foreign_keys = OFF'); + try { + await transaction(() async { + for (final table in allTables) { + await delete(table).go(); + } }); + } finally { + await customStatement('PRAGMA foreign_keys = ON'); + } + } /// Closes the database instance Future disconnect() => close(); diff --git a/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart b/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart index f136a9fe5..588510381 100644 --- a/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart +++ b/packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart @@ -461,6 +461,13 @@ class StreamChatPersistenceClient extends ChatPersistenceClient { return db!.transaction(() => super.updateChannelStates(channelStates)); } + @override + Future flush() { + assert(_debugIsConnected, ''); + _logger.info('flush'); + return db!.flush(); + } + @override Future disconnect({bool flush = false}) async { _logger.info('disconnect'); diff --git a/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart b/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart index 3b0cb7314..018327e92 100644 --- a/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart +++ b/packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart @@ -54,6 +54,37 @@ void main() { expect(client.isConnected, false); }); + test('flush', () async { + const userId = 'testUserId'; + final client = StreamChatPersistenceClient(logLevel: Level.ALL); + + await client.connect(userId, databaseProvider: testDatabaseProvider); + addTearDown(() async => client.disconnect()); + + final connectionEvent = Event( + type: EventType.healthCheck, + createdAt: DateTime.timestamp(), + me: OwnUser(id: userId, name: 'Test User'), + ); + + await client.updateConnectionInfo(connectionEvent); + + // Add some test data + final testDate = DateTime.now(); + await client.updateLastSyncAt(testDate); + + // Verify data exists + final lastSyncAtBeforeFlush = await client.getLastSyncAt(); + expect(lastSyncAtBeforeFlush, isNotNull); + + // Flush the database + await client.flush(); + + // Verify data is cleared + final lastSyncAtAfterFlush = await client.getLastSyncAt(); + expect(lastSyncAtAfterFlush, isNull); + }); + test('client function throws stateError if db is not yet connected', () { final client = StreamChatPersistenceClient(logLevel: Level.ALL); expect(