Skip to content

Handle message moves for recent senders data model #1418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 113 additions & 13 deletions lib/model/recent_senders.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import 'package:collection/collection.dart';
import 'package:collection/collection.dart' hide binarySearch;
import 'package:flutter/foundation.dart';

import '../api/model/events.dart';
@@ -68,21 +68,73 @@ class RecentSenders {
[senderId] ??= MessageIdTracker()).add(messageId);
}

/// Handles channel/topic updates when messages are moved.
///
/// [cachedMessages] should just be a map of messages we know about, i.e.
/// [MessageStore.messages]. It doesn't matter whether the same
/// [UpdateMessageEvent] has been handled by the [MessageStore],
/// since only the sender IDs, which do not change, are looked at.
///
/// This is a no-op if no message move happened.
void handleUpdateMessageEvent(UpdateMessageEvent event, Map<int, Message> cachedMessages) {
if (event.moveData == null) {
return;
}
final UpdateMessageMoveData(
:origStreamId, :newStreamId, :origTopic, :newTopic) = event.moveData!;

final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages);
final sendersInStream = streamSenders[origStreamId];
final topicsInStream = topicSenders[origStreamId];
final sendersInTopic = topicsInStream?[origTopic];
for (final MapEntry(key: senderId, value: messages) in messagesBySender.entries) {
// The later `popAll` calls require the message IDs to be sorted in
// ascending order. Only sort as many as we need: the message IDs
// with the same sender, instead of all of them in `event.messageIds`.
// TOOD(server) make this an API guarantee. CZO discussion:
// https://chat.zulip.org/#narrow/channel/412-api-documentation/topic/Make.20message_ids.20from.20message.20update.20event.20sorted/near/2143785
messages.sort();

if (newStreamId != origStreamId) {
final streamTracker = sendersInStream?[senderId];
// All messages from both `messages` and `streamTracker` are from the
// same sender and the same channel. `messages` contain only messages
// known to `store.messages`; all of them should have made there way
// to the recent senders data structure as well.
assert(messages.every((id) => streamTracker!.ids.contains(id)));
streamTracker?.removeAll(messages);
if (streamTracker?.maxId == null) sendersInStream?.remove(senderId);
if (messages.isNotEmpty) {
((streamSenders[newStreamId] ??= {})
[senderId] ??= MessageIdTracker()).addAll(messages);
}
}

// This does not need a check like the stream trackers one above,
// because the conversation is guaranteed to have moved. This is an
// invariant [UpdateMessageMoveData] offers.
final topicTracker = sendersInTopic?[senderId];
final movedMessagesInTopicTracker = topicTracker?.popAll(messages);
if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId);
if (movedMessagesInTopicTracker != null) {
(((topicSenders[newStreamId] ??= {})[newTopic] ??= {})
[senderId] ??= MessageIdTracker()).addAll(movedMessagesInTopicTracker);
}
}
if (sendersInStream?.isEmpty ?? false) streamSenders.remove(origStreamId);
if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(origTopic);
if (topicsInStream?.isEmpty ?? false) topicSenders.remove(origStreamId);
}

void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
if (event.messageType != MessageType.stream) return;

final messagesByUser = <int, List<int>>{};
for (final id in event.messageIds) {
final message = cachedMessages[id] as StreamMessage?;
if (message == null) continue;
(messagesByUser[message.senderId] ??= []).add(id);
}

final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages);
final DeleteMessageEvent(:streamId!, :topic!) = event;
final sendersInStream = streamSenders[streamId];
final topicsInStream = topicSenders[streamId];
final sendersInTopic = topicsInStream?[topic];
for (final entry in messagesByUser.entries) {
for (final entry in messagesBySender.entries) {
final MapEntry(key: senderId, value: messages) = entry;

final streamTracker = sendersInStream?[senderId];
@@ -97,6 +149,19 @@ class RecentSenders {
if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(topic);
if (topicsInStream?.isEmpty ?? false) topicSenders.remove(streamId);
}

Map<int, QueueList<int>> _groupStreamMessageIdsBySender(
Iterable<int> messageIds,
Map<int, Message> cachedMessages,
) {
final messagesBySender = <int, QueueList<int>>{};
for (final id in messageIds) {
final message = cachedMessages[id] as StreamMessage?;
if (message == null) continue;
(messagesBySender[message.senderId] ??= QueueList()).add(id);
}
return messagesBySender;
}
}

@visibleForTesting
@@ -130,18 +195,53 @@ class MessageIdTracker {
///
/// [newIds] should be sorted ascending.
void addAll(QueueList<int> newIds) {
assert(isSortedWithoutDuplicates(newIds));
if (ids.isEmpty) {
ids = newIds;
return;
}
ids = setUnion(ids, newIds);
}

/// Remove message IDs found in [idsToRemove] from the tracker list.
///
/// [idsToRemove] should be sorted ascending.
void removeAll(List<int> idsToRemove) {
ids.removeWhere((id) {
final i = lowerBound(idsToRemove, id);
return i < idsToRemove.length && idsToRemove[i] == id;
});
assert(isSortedWithoutDuplicates(idsToRemove));
ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1);
}

/// Remove message IDs found in [idsToRemove] from the tracker list.
///
/// Returns the removed message IDs sorted in ascending order, or `null` if
/// nothing is removed.
///
/// [idsToRemove] should be sorted ascending.
///
/// Consider using [removeAll] if the returned message IDs are not needed.
// Part of this is adapted from [ListBase.removeWhere].
QueueList<int>? popAll(List<int> idsToRemove) {
assert(isSortedWithoutDuplicates(idsToRemove));
final retainedMessageIds =
ids.where((id) => binarySearch(idsToRemove, id) == -1).toList();

if (retainedMessageIds.isEmpty) {
// All message IDs in this tracker are removed; this is an optimization
// to clear all ids and return the removed ones without making a new copy.
final result = ids;
ids = QueueList();
return result;
}

QueueList<int>? poppedMessageIds;
if (retainedMessageIds.length != ids.length) {
poppedMessageIds = QueueList.from(
ids.where((id) => binarySearch(idsToRemove, id) != -1));
ids.setRange(0, retainedMessageIds.length, retainedMessageIds);
ids.length = retainedMessageIds.length;
assert(isSortedWithoutDuplicates(poppedMessageIds));
}
return poppedMessageIds;
}

@override
1 change: 1 addition & 0 deletions lib/model/store.dart
Original file line number Diff line number Diff line change
@@ -767,6 +767,7 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, UserStore, Channel

case UpdateMessageEvent():
assert(debugLog("server event: update_message ${event.messageId}"));
recentSenders.handleUpdateMessageEvent(event, messages);
_messages.handleUpdateMessageEvent(event);
Comment on lines +770 to 771
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's essential that these lines come in this order. Right?

Let's tweak the interface to make that evident here at the call site:

Suggested change
recentSenders.handleUpdateMessageEvent(event, messages);
_messages.handleUpdateMessageEvent(event);
recentSenders.handleUpdateMessageEvent(event, oldMessages: messages);
_messages.handleUpdateMessageEvent(event);

Hmm but tests don't break if I swap these lines. Maybe it doesn't matter? If it does matter, let's add a test that would catch that.

Copy link
Member Author

@PIG208 PIG208 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it shouldn't matter because we only need the message IDs and their corresponding sender user IDs; those are not subject to updates.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that makes sense.

Let's I guess write that conclusion down in dartdoc on that handleUpdateMessageEvent method: the messages parameter should be store.messages, and it doesn't matter whether the method is called before or after MessageStore itself is updated.

unreads.handleUpdateMessageEvent(event);

153 changes: 153 additions & 0 deletions test/model/recent_senders_test.dart
Original file line number Diff line number Diff line change
@@ -2,12 +2,15 @@ import 'package:checks/checks.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:zulip/api/model/model.dart';
import 'package:zulip/model/recent_senders.dart';
import 'package:zulip/model/store.dart';
import '../example_data.dart' as eg;
import 'test_store.dart';

/// [messages] should be sorted by [id] ascending.
void checkMatchesMessages(RecentSenders model, List<Message> messages) {
final Map<int, Map<int, Set<int>>> messagesByUserInStream = {};
final Map<int, Map<TopicName, Map<int, Set<int>>>> messagesByUserInTopic = {};
messages.sort((a, b) => a.id - b.id);
for (final message in messages) {
if (message is! StreamMessage) {
throw UnsupportedError('Message of type ${message.runtimeType} is not expected.');
@@ -142,6 +145,156 @@ void main() {
});
});

group('RecentSenders.handleUpdateMessageEvent', () {
late PerAccountStore store;
late RecentSenders model;

final origChannel = eg.stream(); final newChannel = eg.stream();
final origTopic = 'origTopic'; final newTopic = 'newTopic';
final userX = eg.user(); final userY = eg.user();

Future<void> prepare(List<Message> messages) async {
store = eg.store();
await store.addMessages(messages);
await store.addStreams([origChannel, newChannel]);
await store.addUsers([userX, userY]);
model = store.recentSenders;
}

List<StreamMessage> copyMessagesWith(Iterable<StreamMessage> messages, {
ZulipStream? newChannel,
String? newTopic,
}) {
assert(newChannel != null || newTopic != null);
return messages.map((message) => StreamMessage.fromJson(
message.toJson()
..['stream_id'] = newChannel?.streamId ?? message.streamId
// See [StreamMessage.displayRecipient] for why this is needed.
..['display_recipient'] = newChannel?.name ?? message.displayRecipient!

..['subject'] = newTopic ?? message.topic
)).toList();
}

test('move a conversation entirely, with additional unknown messages', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic, sender: userX));
await prepare(messages);
final unknownMessages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic, sender: userX));
checkMatchesMessages(model, messages);

final messageIdsByUserInTopicBefore =
model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids;

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: messages + unknownMessages,
newStreamId: newChannel.streamId));
checkMatchesMessages(model, copyMessagesWith(
messages, newChannel: newChannel));

// Check we avoided creating a new list for the moved message IDs.
check(messageIdsByUserInTopicBefore).identicalTo(
model.topicSenders[newChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids);
});

test('move a conversation exactly', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic, sender: userX));
await prepare(messages);

final messageIdsByUserInTopicBefore =
model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids;

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: messages,
newStreamId: newChannel.streamId,
newTopicStr: newTopic));
checkMatchesMessages(model, copyMessagesWith(
messages, newChannel: newChannel, newTopic: newTopic));

// Check we avoided creating a new list for the moved message IDs.
check(messageIdsByUserInTopicBefore).identicalTo(
model.topicSenders[newChannel.streamId]![eg.t(newTopic)]![userX.userId]!.ids);
});

test('move a conversation partially to a different channel', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic));
final movedMessages = messages.take(5).toList();
final otherMessages = messages.skip(5);
await prepare(messages);

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: movedMessages,
newStreamId: newChannel.streamId));
checkMatchesMessages(model, [
...copyMessagesWith(movedMessages, newChannel: newChannel),
...otherMessages,
]);
});

test('move a conversation partially to a different topic, within the same channel', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic, sender: userX));
final movedMessages = messages.take(5).toList();
final otherMessages = messages.skip(5);
await prepare(messages);

final messageIdsByUserInStreamBefore =
model.streamSenders[origChannel.streamId]![userX.userId]!.ids;

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: movedMessages,
newTopicStr: newTopic));
checkMatchesMessages(model, [
...copyMessagesWith(movedMessages, newTopic: newTopic),
...otherMessages,
]);

// Check that we did not touch stream message IDs tracker
// when there wasn't a stream move.
check(messageIdsByUserInStreamBefore).identicalTo(
model.streamSenders[origChannel.streamId]![userX.userId]!.ids);
});

test('move a conversation with multiple senders', () async {
final messages = [
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX),
eg.streamMessage(stream: origChannel, topic: origTopic, sender: userY),
];
await prepare(messages);

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: messages,
newStreamId: newChannel.streamId));
checkMatchesMessages(model, copyMessagesWith(
messages, newChannel: newChannel));
});

test('move a converstion, but message IDs from the event are not sorted in ascending order', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
id: 100-i, stream: origChannel, topic: origTopic));
await prepare(messages);

await store.handleEvent(eg.updateMessageEventMoveFrom(
origMessages: messages,
newStreamId: newChannel.streamId));
checkMatchesMessages(model,
copyMessagesWith(messages, newChannel: newChannel));
});

test('message edit update without move', () async {
final messages = List.generate(10, (i) => eg.streamMessage(
stream: origChannel, topic: origTopic));
await prepare(messages);

await store.handleEvent(eg.updateMessageEditEvent(messages[0]));
checkMatchesMessages(model, messages);
});
});

test('RecentSenders.handleDeleteMessageEvent', () {
final model = RecentSenders();
final stream = eg.stream();