diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart index a5c4bca778..a2b07cf9d8 100644 --- a/lib/model/recent_senders.dart +++ b/lib/model/recent_senders.dart @@ -1,4 +1,4 @@ -import 'package:collection/collection.dart'; +import 'package:collection/collection.dart' hide binarySearch; import 'package:flutter/foundation.dart'; import '../api/model/events.dart'; @@ -68,21 +68,73 @@ class RecentSenders { [senderId] ??= MessageIdTracker()).add(messageId); } + /// Handles channel/topic updates when messages are moved. + /// + /// [cachedMessages] should just be a map of messages we know about, i.e. + /// [MessageStore.messages]. It doesn't matter whether the same + /// [UpdateMessageEvent] has been handled by the [MessageStore], + /// since only the sender IDs, which do not change, are looked at. + /// + /// This is a no-op if no message move happened. + void handleUpdateMessageEvent(UpdateMessageEvent event, Map cachedMessages) { + if (event.moveData == null) { + return; + } + final UpdateMessageMoveData( + :origStreamId, :newStreamId, :origTopic, :newTopic) = event.moveData!; + + final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages); + final sendersInStream = streamSenders[origStreamId]; + final topicsInStream = topicSenders[origStreamId]; + final sendersInTopic = topicsInStream?[origTopic]; + for (final MapEntry(key: senderId, value: messages) in messagesBySender.entries) { + // The later `popAll` calls require the message IDs to be sorted in + // ascending order. Only sort as many as we need: the message IDs + // with the same sender, instead of all of them in `event.messageIds`. + // TOOD(server) make this an API guarantee. CZO discussion: + // https://chat.zulip.org/#narrow/channel/412-api-documentation/topic/Make.20message_ids.20from.20message.20update.20event.20sorted/near/2143785 + messages.sort(); + + if (newStreamId != origStreamId) { + final streamTracker = sendersInStream?[senderId]; + // All messages from both `messages` and `streamTracker` are from the + // same sender and the same channel. `messages` contain only messages + // known to `store.messages`; all of them should have made there way + // to the recent senders data structure as well. + assert(messages.every((id) => streamTracker!.ids.contains(id))); + streamTracker?.removeAll(messages); + if (streamTracker?.maxId == null) sendersInStream?.remove(senderId); + if (messages.isNotEmpty) { + ((streamSenders[newStreamId] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(messages); + } + } + + // This does not need a check like the stream trackers one above, + // because the conversation is guaranteed to have moved. This is an + // invariant [UpdateMessageMoveData] offers. + final topicTracker = sendersInTopic?[senderId]; + final movedMessagesInTopicTracker = topicTracker?.popAll(messages); + if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId); + if (movedMessagesInTopicTracker != null) { + (((topicSenders[newStreamId] ??= {})[newTopic] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(movedMessagesInTopicTracker); + } + } + if (sendersInStream?.isEmpty ?? false) streamSenders.remove(origStreamId); + if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(origTopic); + if (topicsInStream?.isEmpty ?? false) topicSenders.remove(origStreamId); + } + void handleDeleteMessageEvent(DeleteMessageEvent event, Map cachedMessages) { if (event.messageType != MessageType.stream) return; - final messagesByUser = >{}; - for (final id in event.messageIds) { - final message = cachedMessages[id] as StreamMessage?; - if (message == null) continue; - (messagesByUser[message.senderId] ??= []).add(id); - } - + final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages); final DeleteMessageEvent(:streamId!, :topic!) = event; final sendersInStream = streamSenders[streamId]; final topicsInStream = topicSenders[streamId]; final sendersInTopic = topicsInStream?[topic]; - for (final entry in messagesByUser.entries) { + for (final entry in messagesBySender.entries) { final MapEntry(key: senderId, value: messages) = entry; final streamTracker = sendersInStream?[senderId]; @@ -97,6 +149,19 @@ class RecentSenders { if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(topic); if (topicsInStream?.isEmpty ?? false) topicSenders.remove(streamId); } + + Map> _groupStreamMessageIdsBySender( + Iterable messageIds, + Map cachedMessages, + ) { + final messagesBySender = >{}; + for (final id in messageIds) { + final message = cachedMessages[id] as StreamMessage?; + if (message == null) continue; + (messagesBySender[message.senderId] ??= QueueList()).add(id); + } + return messagesBySender; + } } @visibleForTesting @@ -130,6 +195,7 @@ class MessageIdTracker { /// /// [newIds] should be sorted ascending. void addAll(QueueList newIds) { + assert(isSortedWithoutDuplicates(newIds)); if (ids.isEmpty) { ids = newIds; return; @@ -137,11 +203,45 @@ class MessageIdTracker { ids = setUnion(ids, newIds); } + /// Remove message IDs found in [idsToRemove] from the tracker list. + /// + /// [idsToRemove] should be sorted ascending. void removeAll(List idsToRemove) { - ids.removeWhere((id) { - final i = lowerBound(idsToRemove, id); - return i < idsToRemove.length && idsToRemove[i] == id; - }); + assert(isSortedWithoutDuplicates(idsToRemove)); + ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1); + } + + /// Remove message IDs found in [idsToRemove] from the tracker list. + /// + /// Returns the removed message IDs sorted in ascending order, or `null` if + /// nothing is removed. + /// + /// [idsToRemove] should be sorted ascending. + /// + /// Consider using [removeAll] if the returned message IDs are not needed. + // Part of this is adapted from [ListBase.removeWhere]. + QueueList? popAll(List idsToRemove) { + assert(isSortedWithoutDuplicates(idsToRemove)); + final retainedMessageIds = + ids.where((id) => binarySearch(idsToRemove, id) == -1).toList(); + + if (retainedMessageIds.isEmpty) { + // All message IDs in this tracker are removed; this is an optimization + // to clear all ids and return the removed ones without making a new copy. + final result = ids; + ids = QueueList(); + return result; + } + + QueueList? poppedMessageIds; + if (retainedMessageIds.length != ids.length) { + poppedMessageIds = QueueList.from( + ids.where((id) => binarySearch(idsToRemove, id) != -1)); + ids.setRange(0, retainedMessageIds.length, retainedMessageIds); + ids.length = retainedMessageIds.length; + assert(isSortedWithoutDuplicates(poppedMessageIds)); + } + return poppedMessageIds; } @override diff --git a/lib/model/store.dart b/lib/model/store.dart index c3aaa501db..5bab3eba3c 100644 --- a/lib/model/store.dart +++ b/lib/model/store.dart @@ -767,6 +767,7 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, UserStore, Channel case UpdateMessageEvent(): assert(debugLog("server event: update_message ${event.messageId}")); + recentSenders.handleUpdateMessageEvent(event, messages); _messages.handleUpdateMessageEvent(event); unreads.handleUpdateMessageEvent(event); diff --git a/test/model/recent_senders_test.dart b/test/model/recent_senders_test.dart index 602362cbcc..9ddba5cebe 100644 --- a/test/model/recent_senders_test.dart +++ b/test/model/recent_senders_test.dart @@ -2,12 +2,15 @@ import 'package:checks/checks.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:zulip/api/model/model.dart'; import 'package:zulip/model/recent_senders.dart'; +import 'package:zulip/model/store.dart'; import '../example_data.dart' as eg; +import 'test_store.dart'; /// [messages] should be sorted by [id] ascending. void checkMatchesMessages(RecentSenders model, List messages) { final Map>> messagesByUserInStream = {}; final Map>>> messagesByUserInTopic = {}; + messages.sort((a, b) => a.id - b.id); for (final message in messages) { if (message is! StreamMessage) { throw UnsupportedError('Message of type ${message.runtimeType} is not expected.'); @@ -142,6 +145,156 @@ void main() { }); }); + group('RecentSenders.handleUpdateMessageEvent', () { + late PerAccountStore store; + late RecentSenders model; + + final origChannel = eg.stream(); final newChannel = eg.stream(); + final origTopic = 'origTopic'; final newTopic = 'newTopic'; + final userX = eg.user(); final userY = eg.user(); + + Future prepare(List messages) async { + store = eg.store(); + await store.addMessages(messages); + await store.addStreams([origChannel, newChannel]); + await store.addUsers([userX, userY]); + model = store.recentSenders; + } + + List copyMessagesWith(Iterable messages, { + ZulipStream? newChannel, + String? newTopic, + }) { + assert(newChannel != null || newTopic != null); + return messages.map((message) => StreamMessage.fromJson( + message.toJson() + ..['stream_id'] = newChannel?.streamId ?? message.streamId + // See [StreamMessage.displayRecipient] for why this is needed. + ..['display_recipient'] = newChannel?.name ?? message.displayRecipient! + + ..['subject'] = newTopic ?? message.topic + )).toList(); + } + + test('move a conversation entirely, with additional unknown messages', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + await prepare(messages); + final unknownMessages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + checkMatchesMessages(model, messages); + + final messageIdsByUserInTopicBefore = + model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages + unknownMessages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel)); + + // Check we avoided creating a new list for the moved message IDs. + check(messageIdsByUserInTopicBefore).identicalTo( + model.topicSenders[newChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids); + }); + + test('move a conversation exactly', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + await prepare(messages); + + final messageIdsByUserInTopicBefore = + model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId, + newTopicStr: newTopic)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel, newTopic: newTopic)); + + // Check we avoided creating a new list for the moved message IDs. + check(messageIdsByUserInTopicBefore).identicalTo( + model.topicSenders[newChannel.streamId]![eg.t(newTopic)]![userX.userId]!.ids); + }); + + test('move a conversation partially to a different channel', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic)); + final movedMessages = messages.take(5).toList(); + final otherMessages = messages.skip(5); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: movedMessages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, [ + ...copyMessagesWith(movedMessages, newChannel: newChannel), + ...otherMessages, + ]); + }); + + test('move a conversation partially to a different topic, within the same channel', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + final movedMessages = messages.take(5).toList(); + final otherMessages = messages.skip(5); + await prepare(messages); + + final messageIdsByUserInStreamBefore = + model.streamSenders[origChannel.streamId]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: movedMessages, + newTopicStr: newTopic)); + checkMatchesMessages(model, [ + ...copyMessagesWith(movedMessages, newTopic: newTopic), + ...otherMessages, + ]); + + // Check that we did not touch stream message IDs tracker + // when there wasn't a stream move. + check(messageIdsByUserInStreamBefore).identicalTo( + model.streamSenders[origChannel.streamId]![userX.userId]!.ids); + }); + + test('move a conversation with multiple senders', () async { + final messages = [ + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX), + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX), + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userY), + ]; + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel)); + }); + + test('move a converstion, but message IDs from the event are not sorted in ascending order', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + id: 100-i, stream: origChannel, topic: origTopic)); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, + copyMessagesWith(messages, newChannel: newChannel)); + }); + + test('message edit update without move', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic)); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEditEvent(messages[0])); + checkMatchesMessages(model, messages); + }); + }); + test('RecentSenders.handleDeleteMessageEvent', () { final model = RecentSenders(); final stream = eg.stream();