Skip to content

Conversation

xsahil03x
Copy link
Member

@xsahil03x xsahil03x commented Sep 22, 2025

Submit a pull request

Fixes: FLU-264

Description of the pull request

This commit enhances the sync method in StreamChatClient to be more robust and handle potential issues during the synchronization process.

Key changes include:

  • Initialization of lastSyncAt: If lastSyncAt is null (e.g., on a fresh app start or after a flush), it's now initialized to the current time. This prevents unnecessary sync attempts with an invalid timestamp.
  • Handling 400 Errors: If the sync operation returns a 400 error (often indicating that the sync time is too old or the data to sync is too large), the persistence client is now flushed, and lastSyncAt is reset. This allows the client to recover by performing a fresh sync.
  • lastSyncAt Update Logic: lastSyncAt is now updated to the createdAt timestamp of the last synced event, or to the current time if no events were synced. This ensures more accurate tracking of the synchronization state.
  • Removal of _lastSyncedAt: The internal _lastSyncedAt property in StreamChatClient has been removed, as its functionality is now covered by the persistence client's lastSyncAt.
  • flush method in ChatPersistenceClient: A new flush method has been added to the ChatPersistenceClient interface to allow for clearing all stored data.

Additionally, tests have been added to cover these new behaviors, including scenarios with null lastSyncAt and 400 errors during sync. A FakePersistenceClient has been introduced for testing purposes.

Summary by CodeRabbit

  • New Features
    • Added a cache flush capability to persistence clients to clear locally stored data without disconnecting.
  • Bug Fixes
    • Improved sync reliability with smarter first-sync initialization, better error handling (recover from certain 400 errors) and reduced duplicate channel handling during recovery.
  • Documentation
    • Updated changelogs with an Upcoming section outlining reliability improvements.
  • Tests
    • Added tests for first-sync behavior, cache flushing, and error-recovery during sync; test fakes added for persistence.

This commit enhances the `sync` method in `StreamChatClient` to be more robust and handle potential issues during the synchronization process.

Key changes include:
- **Initialization of `lastSyncAt`:** If `lastSyncAt` is null (e.g., on a fresh app start or after a flush), it's now initialized to the current time. This prevents unnecessary sync attempts with an invalid timestamp.
- **Handling 400 Errors:** If the sync operation returns a 400 error (often indicating that the sync time is too old or the data to sync is too large), the persistence client is now flushed, and `lastSyncAt` is reset. This allows the client to recover by performing a fresh sync.
- **`lastSyncAt` Update Logic:** `lastSyncAt` is now updated to the `createdAt` timestamp of the last synced event, or to the current time if no events were synced. This ensures more accurate tracking of the synchronization state.
- **Removal of `_lastSyncedAt`:** The internal `_lastSyncedAt` property in `StreamChatClient` has been removed, as its functionality is now covered by the persistence client's `lastSyncAt`.
- **`flush` method in `ChatPersistenceClient`:** A new `flush` method has been added to the `ChatPersistenceClient` interface to allow for clearing all stored data.

Additionally, tests have been added to cover these new behaviors, including scenarios with null `lastSyncAt` and 400 errors during sync. A `FakePersistenceClient` has been introduced for testing purposes.
This commit introduces a new `flush()` method to the `StreamChatPersistenceClient`. This method allows users to clear the entire database.

A corresponding test case has been added to verify that the `flush()` method correctly removes all data from the database.
Copy link
Contributor

coderabbitai bot commented Sep 22, 2025

Walkthrough

Adds a persistence flush API and integrates it into StreamChatClient sync logic: initializes lastSyncAt on first run, sorts and applies sync events, and on a 400 sync error flushes persistence and resets sync state. Tests and fakes updated; database flush implementation changed to transactional deletes.

Changes

Cohort / File(s) Summary
Client sync logic
packages/stream_chat/lib/src/client/client.dart
Removes private _lastSyncedAt, deduplicates channel IDs, logs sync phases, initializes lastSyncAt when null (avoiding /sync), sorts events with collection.sorted, updates lastSyncAt on success, and on 400 errors calls persistence.flush() and resets sync state.
Persistence interface
packages/stream_chat/lib/src/db/chat_persistence_client.dart
Adds public method Future<void> flush() to clear all persisted data.
Persistence implementations
packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart, packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart
Implements flush() on StreamChatPersistenceClient delegating to db!.flush(); drift DB flush replaced with FK-toggle + transactional per-table deletes and proper finally cleanup.
Tests — client & persistence
packages/stream_chat/test/src/client/client_test.dart, packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart
Adds sync tests (initial sync, successful sync, 400 handling flushing persistence) and a persistence flush test verifying lastSyncAt cleared.
Test helpers / fakes
packages/stream_chat/test/src/fakes.dart, packages/stream_chat/test/src/db/chat_persistence_client_test.dart
Adds FakePersistenceClient implementing ChatPersistenceClient (including flush()); adds flush() stub to TestPersistenceClient.
Changelogs
packages/stream_chat/CHANGELOG.md, packages/stream_chat_persistence/CHANGELOG.md
Adds Upcoming section documenting sync reliability and 400 handling (stream_chat) and formatting/Upcoming edits (persistence).

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant App
    participant Client as StreamChatClient
    participant Persist as ChatPersistenceClient
    participant API as /sync API

    App->>Client: connect / recover
    Client->>Persist: getLastSyncAt()
    alt lastSyncAt is null
        Client->>Persist: updateLastSyncAt(now)
        Note right of Client: initialize and exit (no /sync)
    else lastSyncAt present
        Client->>Persist: getChannelCids()
        Client->>API: sync(cids, lastSyncAt)
        alt 200 OK
            API-->>Client: events
            Client->>Client: sort & process events
            Client->>Persist: updateLastSyncAt(from last event or now)
        else 400 Bad Request
            API-->>Client: error(code=4)
            Client->>Persist: flush()
            Client->>Persist: updateLastSyncAt(null)
            Note right of Client: cleared persistence for fresh sync
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • renefloor

Poem

A rabbit hops, nose to the log,
I clear the crumbs and tidy the bog.
Thirty moons old? I sweep the lane,
Fresh tracks ahead, we sync again.
Thump, hop—persistence neat as a carrot. 🥕🐇

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "fix(llc, persistence): improve sync reliability and error handling" concisely and accurately summarizes the primary changes to sync and persistence error handling, matching the implemented behavior (flush on 400, lastSyncAt initialization and updates, removal of _lastSyncedAt). It is specific, focused, and readable without noisy details, so a reviewer can quickly grasp the main intent. The title aligns with the code changes and PR objectives.
Linked Issues Check ✅ Passed The changes directly implement the recovery behavior requested in [FLU-264] by detecting sync failures returning HTTP 400 and invoking the new persistence flush(), resetting or initializing lastSyncAt, and allowing a fresh sync start; the PR also updates lastSyncAt from the last event's createdAt (or now) and adds tests (including a FakePersistenceClient) to validate null-lastSyncAt and 400-error flows. The flush API is added to the persistence interface and implemented in persistence clients, and the persistence DB flush is implemented transactionally. These coding changes address the linked issue's requirements.
Out of Scope Changes Check ✅ Passed The diff stays focused on sync and persistence behavior with only minor, related refactors (sorting via collection, channel deduplication in recovery) and changelog formatting; these are consistent with improving sync reliability. One test-only item to note is TestPersistenceClient.flush currently throwing UnimplementedError (a test stub), which is unrelated to production behavior but should be addressed to avoid confusion. No broad or unrelated feature additions were introduced.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/sync-errors

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (13)
packages/stream_chat/CHANGELOG.md (1)

5-6: Clarify behavior; avoid implying a time-based auto-flush.

The client flushes only when sync returns HTTP 400 (e.g., “Too old last_sync_at”), not “after 30 days” unconditionally. Suggest rewording for accuracy.

- - Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400
-   error recovery, and automatic flushing of stale persistence data after 30 days of inactivity.
+ - Improved sync reliability and error handling: initialize `lastSyncAt` when null; on HTTP 400
+   "Too old last_sync_at" responses, flush local persistence and reset `lastSyncAt`; set it to the
+   last synced event time or to "now" when no events are returned.
packages/stream_chat/lib/src/db/chat_persistence_client.dart (1)

37-39: Document preconditions and scope of flush().

Make it explicit that flush operates on the currently connected user, requires an active connection, is idempotent, and keeps the connection open (so follow-up writes are possible).

-  /// Clears all the data stored in the persistence client.
+  /// Clears all data stored for the currently connected user.
+  ///
+  /// Preconditions: the client must be connected; implementations SHOULD
+  /// throw a StateError otherwise. The operation SHOULD be idempotent and
+  /// leave the client connected so follow-up calls (e.g. `updateLastSyncAt`)
+  /// can execute.
   Future<void> flush();
packages/stream_chat_persistence/CHANGELOG.md (4)

5-5: Name the API precisely.

This package adds ChatPersistenceClient.flush()/StreamChatPersistenceClient.flush(), not a generic “client.flush()”.

-- Added support for `client.flush()` method to clear database.
+- Added `ChatPersistenceClient.flush()` (implemented by `StreamChatPersistenceClient`) to clear the database.

119-121: Fix markdownlint MD039: remove spaces/newlines inside link text.

-- Updated `stream_chat` dependency to [
-  `7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).
+- Updated `stream_chat` dependency to [`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).

216-218: Fix markdownlint MD039: remove spaces/newlines inside link text.

-- Updated `stream_chat` dependency to [
-  `5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).
+- Updated `stream_chat` dependency to [`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).

249-251: Fix markdownlint MD039: remove spaces/newlines inside link text.

-- Updated `stream_chat` dependency to [
-  `4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).
+- Updated `stream_chat` dependency to [`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).
packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart (1)

57-86: Strengthen assertions post-flush.

Also assert the connection event is cleared to fully validate “clean slate”.

     // Verify data is cleared
     final lastSyncAtAfterFlush = await client.getLastSyncAt();
     expect(lastSyncAtAfterFlush, isNull);
+    final connectionInfoAfterFlush = await client.getConnectionInfo();
+    expect(connectionInfoAfterFlush, isNull);
packages/stream_chat/test/src/db/chat_persistence_client_test.dart (1)

70-72: Avoid throwing in test stub; make flush a no-op.

Throwing here can cascade failures in tests that now expect flush() to exist.

   @override
-  Future<void> flush() => throw UnimplementedError();
+  Future<void> flush() => Future.value();
packages/stream_chat/test/src/client/client_test.dart (2)

1-1: Prefer targeted ignore over file-level.

Scope the lint suppression to the minimal offending lines instead of the whole file.


3596-3658: Add test for “Too old last_sync_at” path.

Current 400 path test uses “Too many events”. Add a case for the commonly returned “Too old last_sync_at” to cover the primary recovery flow.

[ suggest_recommended_refactor ]

+      test('should flush persistence client on 400 "Too old last_sync_at"', () async {
+        final cids = ['channel1'];
+        final lastSyncAt = DateTime.now().subtract(const Duration(days: 40));
+        final fakeClient = FakePersistenceClient(channelCids: cids, lastSyncAt: lastSyncAt);
+        client.chatPersistenceClient = fakeClient;
+        when(() => api.general.sync(cids, lastSyncAt)).thenThrow(
+          StreamChatNetworkError.raw(
+            code: 4,
+            statusCode: 400,
+            message: 'Too old last_sync_at time, please have last_sync_at argument no later than 30 days ago',
+          ),
+        );
+        await client.sync();
+        expect(await fakeClient.getChannelCids(), isEmpty);
+      });
packages/stream_chat/lib/src/client/client.dart (2)

520-529: Dedup CIDs is good; minor: consider stable ordering.

toSet() loses order; if downstream relies on deterministic order (e.g., logs, tests), consider sorting: final cids = state.channels.keys.toSet().toList()..sort();


566-568: Consider storing UTC timestamps.

Using DateTime.now().toUtc() prevents timezone drift across devices/backends.

packages/stream_chat/test/src/fakes.dart (1)

48-52: Consider defensive copying for channelCids parameter.

The constructor directly assigns the channelCids parameter to the internal field without creating a defensive copy. This could lead to unexpected mutations if the caller modifies the original list after construction.

  FakePersistenceClient({
    DateTime? lastSyncAt,
    List<String>? channelCids,
  })  : _lastSyncAt = lastSyncAt,
-       _channelCids = channelCids ?? [];
+       _channelCids = List.from(channelCids ?? []);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ee78069 and 46d07dc.

📒 Files selected for processing (9)
  • packages/stream_chat/CHANGELOG.md (1 hunks)
  • packages/stream_chat/lib/src/client/client.dart (3 hunks)
  • packages/stream_chat/lib/src/db/chat_persistence_client.dart (1 hunks)
  • packages/stream_chat/test/src/client/client_test.dart (2 hunks)
  • packages/stream_chat/test/src/db/chat_persistence_client_test.dart (1 hunks)
  • packages/stream_chat/test/src/fakes.dart (1 hunks)
  • packages/stream_chat_persistence/CHANGELOG.md (8 hunks)
  • packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart (1 hunks)
  • packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart (1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
packages/stream_chat_persistence/CHANGELOG.md

120-120: Spaces inside link text

(MD039, no-space-in-links)


217-217: Spaces inside link text

(MD039, no-space-in-links)


250-250: Spaces inside link text

(MD039, no-space-in-links)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: stream_chat_localizations
  • GitHub Check: analyze_legacy_versions
  • GitHub Check: stream_chat_persistence
  • GitHub Check: stream_chat_flutter
  • GitHub Check: stream_chat_flutter_core
  • GitHub Check: stream_chat
  • GitHub Check: test
  • GitHub Check: build (android)
  • GitHub Check: build (ios)
  • GitHub Check: analyze
🔇 Additional comments (8)
packages/stream_chat/lib/src/client/client.dart (4)

3-3: Import looks good.

Used for sorted and lastOrNull. ✔


563-563: Early return on empty channels is correct.

Prevents unnecessary work.


571-577: Sorting events by createdAt is appropriate.

This ensures deterministic processing order.


583-585: Updating lastSyncAt from last event or now is correct.

Matches intended semantics.

packages/stream_chat/test/src/fakes.dart (4)

46-101: Well-structured fake implementation for persistence testing.

The FakePersistenceClient correctly implements the ChatPersistenceClient interface and provides all the necessary methods for testing sync reliability features. The implementation is clean and follows proper testing patterns.


85-89: LGTM! Flush implementation aligns with sync requirements.

The flush method correctly clears both lastSyncAt and channelCids, which aligns with the PR objective of handling 400 errors by resetting local persistence state for a fresh sync.


94-97: LGTM! updateLastSyncAt supports sync timestamp tracking.

The method correctly updates the sync timestamp, which is essential for the enhanced sync reliability features described in the PR objectives.


59-61: Useful call tracking for test verification.

The public counters for tracking connect and disconnect calls are well-designed for test assertions and debugging test scenarios.

Comment on lines +464 to +470
@override
Future<void> flush() {
assert(_debugIsConnected, '');
_logger.info('flush');
return db!.flush();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make flush atomic.

Wrap the destructive flush in a DB transaction to avoid partial states if other operations run concurrently.

   @override
   Future<void> flush() {
     assert(_debugIsConnected, '');
     _logger.info('flush');
-    return db!.flush();
+    // Ensure atomicity.
+    return db!.transaction(() => db!.flush());
   }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart
around lines 464–470, the flush() method calls db!.flush() directly which can
leave the DB in a partial state if concurrent operations run; wrap the
destructive flush inside a database transaction instead. Change the
implementation to begin a transaction, perform the flush inside the transaction
(await it), commit on success and roll back on failure (rethrow the error), keep
the existing assert and _logger.info call, and ensure the Future<void> returned
awaits the transactional operation so callers observe completion or errors.

Copy link

codecov bot commented Sep 22, 2025

Codecov Report

❌ Patch coverage is 90.00000% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.77%. Comparing base (ee78069) to head (46d07dc).

Files with missing lines Patch % Lines
packages/stream_chat/lib/src/client/client.dart 87.50% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##           master    #2390   +/-   ##
=======================================
  Coverage   63.76%   63.77%           
=======================================
  Files         412      412           
  Lines       25821    25824    +3     
=======================================
+ Hits        16466    16470    +4     
+ Misses       9355     9354    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

renefloor
renefloor previously approved these changes Sep 23, 2025
This commit modifies the `flush` method in `DriftChatDatabase` to temporarily disable foreign key constraints using `PRAGMA foreign_keys = OFF` before deleting all table data.

This prevents foreign key constraint violations that could occur when deleting tables in an order that doesn't respect dependencies. The constraints are re-enabled using `PRAGMA foreign_keys = ON` in a `finally` block to ensure they are restored even if an error occurs during the deletion process.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart (2)

77-78: Restore the previous FK setting instead of hard‑forcing ON

If a caller had FK enforcement intentionally disabled (e.g., tests, migrations), this method will force-enable it. Safer to snapshot and restore the prior state.

Apply this diff:

   Future<void> flush() async {
-    await customStatement('PRAGMA foreign_keys = OFF');
+    // Snapshot current FK mode to restore later.
+    final fkRow =
+        await customSelect('PRAGMA foreign_keys').getSingle();
+    final wasFkOn = fkRow.read<int>('foreign_keys') == 1;
+    await customStatement('PRAGMA foreign_keys = OFF');
     try {
       await transaction(() async {
         for (final table in allTables) {
           await delete(table).go();
         }
       });
     } finally {
-      await customStatement('PRAGMA foreign_keys = ON');
+      if (wasFkOn) {
+        await customStatement('PRAGMA foreign_keys = ON');
+      }
     }
   }

Also applies to: 85-86


87-87: Optional: Reclaim file space after a full wipe

Consider running a checkpoint+VACUUM after the flush to reclaim disk space (outside the transaction). Gate it behind a flag if startup latency is a concern.

Example:

   } finally {
     if (wasFkOn) {
       await customStatement('PRAGMA foreign_keys = ON');
     }
   }
+
+  // Optional: reclaim disk space after a full wipe.
+  // If using WAL, checkpoint first.
+  // await customStatement('PRAGMA wal_checkpoint(TRUNCATE)');
+  // await customStatement('VACUUM');
 }

Please verify whether you're on WAL mode and whether VACUUM is acceptable for your startup path.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 46d07dc and ef27ce5.

📒 Files selected for processing (1)
  • packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart (1 hunks)
🔇 Additional comments (2)
packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart (2)

76-87: Transactional flush with FK restoration looks good

Nice improvement: wrapping deletes in a single transaction and ensuring FK restoration in a finally block is solid.


76-87: Tiny window with FK checks disabled; confirm executor serialization

FKs are turned OFF before the transaction starts. On single-connection, serialized executors (Drift default), this is effectively atomic. If you ever use multiple executors or isolates hitting the same DB, a brief window exists where other statements could run with FKs disabled.

Please confirm:

  • All DB access goes through this Drift instance’s single connection.
  • No other queries can interleave between the OFF and transaction calls.

If either is not guaranteed, we should add a higher-level mutex around flush or switch to a defer strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants