Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Upcoming

🐞 Fixed

- Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400
error recovery, and automatic flushing of stale persistence data after 30 days of inactivity.

## 9.16.0

🐞 Fixed
Expand Down
66 changes: 33 additions & 33 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:dio/dio.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
Expand Down Expand Up @@ -186,9 +187,6 @@ class StreamChatClient {

late final RetryPolicy _retryPolicy;

/// the last dateTime at the which all the channels were synced
DateTime? _lastSyncedAt;

/// The retry policy options getter
RetryPolicy get retryPolicy => _retryPolicy;

Expand Down Expand Up @@ -519,25 +517,17 @@ class StreamChatClient {

if (connectionRecovered) {
// connection recovered
final cids = state.channels.keys.toList(growable: false);
final cids = [...state.channels.keys.toSet()];
if (cids.isNotEmpty) {
await queryChannelsOnline(
filter: Filter.in_('cid', cids),
paginationParams: const PaginationParams(limit: 30),
);
if (persistenceEnabled) {
await sync(cids: cids, lastSyncAt: _lastSyncedAt);
}
} else {
// channels are empty, assuming it's a fresh start
// and making sure `lastSyncAt` is initialized
if (persistenceEnabled) {
final lastSyncAt = await chatPersistenceClient?.getLastSyncAt();
if (lastSyncAt == null) {
await chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}
}

// Sync the persistence client if available
if (persistenceEnabled) await sync(cids: cids);
}

handleEvent(Event(
type: EventType.connectionRecovered,
online: true,
Expand Down Expand Up @@ -569,34 +559,45 @@ class StreamChatClient {
Future<void> sync({List<String>? cids, DateTime? lastSyncAt}) {
return _syncLock.synchronized(() async {
final channels = cids ?? await chatPersistenceClient?.getChannelCids();
if (channels == null || channels.isEmpty) {
return;
}
if (channels == null || channels.isEmpty) return;

final syncAt = lastSyncAt ?? await chatPersistenceClient?.getLastSyncAt();
if (syncAt == null) {
return;
logger.info('Fresh sync start: lastSyncAt initialized to now.');
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}

try {
logger.info('Syncing events since $syncAt for channels: $channels');

final res = await _chatApi.general.sync(channels, syncAt);
final events = res.events
..sort((a, b) => a.createdAt.compareTo(b.createdAt));
final events = res.events.sorted(
(a, b) => a.createdAt.compareTo(b.createdAt),
);

for (final event in events) {
logger.fine('event.type: ${event.type}');
final messageText = event.message?.text;
if (messageText != null) {
logger.fine('event.message.text: $messageText');
}
logger.fine('Syncing event: ${event.type}');
handleEvent(event);
}

final now = DateTime.now();
_lastSyncedAt = now;
chatPersistenceClient?.updateLastSyncAt(now);
} catch (e, stk) {
logger.severe('Error during sync', e, stk);
final updatedSyncAt = events.lastOrNull?.createdAt ?? DateTime.now();
return chatPersistenceClient?.updateLastSyncAt(updatedSyncAt);
} catch (error, stk) {
// If we got a 400 error, it means that either the sync time is too
// old or the channel list is too long or too many events need to be
// synced. In this case, we should just flush the persistence client
// and start over.
if (error is StreamChatNetworkError && error.statusCode == 400) {
logger.warning(
'Failed to sync events due to stale or oversized state. '
'Resetting the persistence client to enable a fresh start.',
);

await chatPersistenceClient?.flush();
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}

logger.warning('Error syncing events', error, stk);
}
});
}
Expand Down Expand Up @@ -2071,7 +2072,6 @@ class StreamChatClient {
// resetting state.
state.dispose();
state = ClientState(this);
_lastSyncedAt = null;

// resetting credentials.
_tokenManager.reset();
Expand Down
3 changes: 3 additions & 0 deletions packages/stream_chat/lib/src/db/chat_persistence_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ abstract class ChatPersistenceClient {
/// If [flush] is true, the data will also be deleted
Future<void> disconnect({bool flush = false});

/// Clears all the data stored in the persistence client.
Future<void> flush();

/// Get stored replies by messageId
Future<List<Message>> getReplies(
String parentId, {
Expand Down
66 changes: 66 additions & 0 deletions packages/stream_chat/test/src/client/client_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ignore_for_file: avoid_redundant_argument_values

import 'package:mocktail/mocktail.dart';
import 'package:stream_chat/src/core/http/token.dart';
import 'package:stream_chat/stream_chat.dart';
Expand Down Expand Up @@ -3590,5 +3592,69 @@ void main() {
);
},
);

group('Sync Method Tests', () {
test(
'should retrieve data from persistence client and sync successfully',
() async {
final cids = ['channel1', 'channel2'];
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
final fakeClient = FakePersistenceClient(
channelCids: cids,
lastSyncAt: lastSyncAt,
);

client.chatPersistenceClient = fakeClient;
when(() => api.general.sync(cids, lastSyncAt)).thenAnswer(
(_) async => SyncResponse()..events = [],
);

await client.sync();

verify(() => api.general.sync(cids, lastSyncAt)).called(1);

final newLastSyncAt = await fakeClient.getLastSyncAt();
expect(newLastSyncAt?.isAfter(lastSyncAt), isTrue);
},
);

test('should set lastSyncAt on first sync when null', () async {
final fakeClient = FakePersistenceClient(
channelCids: ['channel1'],
lastSyncAt: null,
);

client.chatPersistenceClient = fakeClient;

await client.sync();

expectLater(fakeClient.getLastSyncAt(), completion(isNotNull));
verifyNever(() => api.general.sync(any(), any()));
});

test('should flush persistence client on 400 error', () async {
final cids = ['channel1'];
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
final fakeClient = FakePersistenceClient(
channelCids: cids,
lastSyncAt: lastSyncAt,
);

client.chatPersistenceClient = fakeClient;
when(() => api.general.sync(cids, lastSyncAt)).thenThrow(
StreamChatNetworkError.raw(
code: 4,
statusCode: 400,
message: 'Too many events',
),
);

await client.sync();

expect(await fakeClient.getChannelCids(), isEmpty); // Should be flushed

verify(() => api.general.sync(cids, lastSyncAt)).called(1);
});
});
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class TestPersistenceClient extends ChatPersistenceClient {
@override
Future<void> disconnect({bool flush = false}) => throw UnimplementedError();

@override
Future<void> flush() => throw UnimplementedError();

@override
Future<ChannelModel?> getChannelByCid(String cid) async =>
ChannelModel(cid: cid);
Expand Down
57 changes: 57 additions & 0 deletions packages/stream_chat/test/src/fakes.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,63 @@ class FakeTokenManager extends Fake implements TokenManager {

class FakeMultiPartFile extends Fake implements MultipartFile {}

/// Fake persistence client for testing persistence client reliability features
class FakePersistenceClient extends Fake implements ChatPersistenceClient {
FakePersistenceClient({
DateTime? lastSyncAt,
List<String>? channelCids,
}) : _lastSyncAt = lastSyncAt,
_channelCids = channelCids ?? [];

String? _userId;
bool _isConnected = false;
DateTime? _lastSyncAt;
List<String> _channelCids;

// Track method calls for testing
int connectCallCount = 0;
int disconnectCallCount = 0;

@override
bool get isConnected => _isConnected;

@override
String? get userId => _userId;

@override
Future<void> connect(String userId) async {
_userId = userId;
_isConnected = true;
connectCallCount++;
}

@override
Future<void> disconnect({bool flush = false}) async {
if (flush) await this.flush();

_userId = null;
_isConnected = false;
disconnectCallCount++;
}

@override
Future<void> flush() async {
_lastSyncAt = null;
_channelCids = [];
}

@override
Future<DateTime?> getLastSyncAt() async => _lastSyncAt;

@override
Future<void> updateLastSyncAt(DateTime lastSyncAt) async {
_lastSyncAt = lastSyncAt;
}

@override
Future<List<String>> getChannelCids() async => _channelCids;
}

class FakeChatApi extends Fake implements StreamChatApi {
UserApi? _user;

Expand Down
33 changes: 24 additions & 9 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Upcoming

✅ Added

- Added support for `client.flush()` method to clear database.

## 9.16.0

- Updated `stream_chat` dependency to [`9.16.0`](https://pub.dev/packages/stream_chat/changelog).
Expand Down Expand Up @@ -110,7 +116,8 @@

## 7.2.0-hotfix.1

- Updated `stream_chat` dependency to [`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).

## 7.2.0

Expand All @@ -131,7 +138,8 @@
## 7.0.0

- Updated minimum supported `SDK` version to Flutter 3.13/Dart 3.1
- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use `getChannelStates.channelStateSort` instead.
- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use
`getChannelStates.channelStateSort` instead.

## 6.10.0

Expand All @@ -148,7 +156,8 @@

## 6.7.0

- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no such column `messages.state`.
- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no
such column `messages.state`.
- Updated `stream_chat` dependency to [`6.7.0`](https://pub.dev/packages/stream_chat/changelog).

## 6.6.0
Expand All @@ -169,12 +178,14 @@

## 6.2.0

- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is connected to the database.
- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is
connected to the database.
- [[#1422]](https://github.com/GetStream/stream-chat-flutter/issues/1422) Removed default values
from `UserEntity` `createdAt` and `updatedAt` fields.
- Updated `stream_chat` dependency to [`6.2.0`](https://pub.dev/packages/stream_chat/changelog).
- Added support for `StreamChatPersistenceClient.openPersistenceConnection`
and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database connection.
and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database
connection.

## 6.1.0

Expand Down Expand Up @@ -202,7 +213,8 @@

## 5.0.0-beta.1

- Updated `stream_chat` dependency to [`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).

## 4.4.0

Expand Down Expand Up @@ -234,7 +246,8 @@

## 4.0.0-beta.0

- Updated `stream_chat` dependency to [`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).

## 3.1.0

Expand All @@ -243,8 +256,10 @@
## 3.0.0

- Updated `stream_chat` dependency to [`3.0.0`](https://pub.dev/packages/stream_chat/changelog).
- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by enabling `pragma foreign_keys`.
- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for pinned messages.
- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by
enabling `pragma foreign_keys`.
- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for
pinned messages.

## 2.2.0

Expand Down
Loading