Skip to content

Commit 48e1458

Browse files
Buffer OBJECT ProtocolMessages during a sync
Based on [1] at 29276a5. I wrote the implementation, and for the tests followed the development approach described in cb427d8. [1] ably/specification#343
1 parent 6430358 commit 48e1458

File tree

2 files changed

+145
-21
lines changed

2 files changed

+145
-21
lines changed

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
2323
}
2424
}
2525

26-
/// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool
26+
/// If this returns false, it means that there is currently no stored sync sequence ID, SyncObjectsPool, or BufferedObjectOperations.
2727
internal var testsOnly_hasSyncSequence: Bool {
2828
mutex.withLock {
2929
mutableState.syncSequence != nil
@@ -45,6 +45,11 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
4545

4646
/// The `ObjectMessage`s gathered during this sync sequence.
4747
internal var syncObjectsPool: [ObjectState]
48+
49+
/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
50+
///
51+
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and hence there's no way for an OBJECT to be processed in the middle of processing the sync data in those cases.
52+
internal var bufferedObjectOperations: [InboundObjectMessage]
4853
}
4954

5055
/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
@@ -255,7 +260,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
255260

256261
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
257262

258-
// RTO4b3, RTO4b4, RTO5c3, RTO5c4
263+
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5
259264
syncSequence = nil
260265
syncStatus.signalSyncComplete()
261266
}
@@ -275,6 +280,8 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
275280

276281
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
277282
let completedSyncObjectsPool: [ObjectState]?
283+
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
284+
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
278285

279286
if let protocolMessageChannelSerial {
280287
let syncCursor: SyncCursor
@@ -292,27 +299,28 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
292299
// RTO5a3: Continue existing sync sequence
293300
syncSequence
294301
} else {
295-
// RTO5a2: new sequence started, discard previous
296-
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
302+
// RTO5a2a, RTO5a2b: new sequence started, discard previous
303+
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
297304
}
298305
} else {
299306
// There's no current sync sequence; start one
300-
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
307+
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
301308
}
302309

303310
// RTO5b
304311
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))
305312

306313
syncSequence = updatedSyncSequence
307314

308-
completedSyncObjectsPool = if syncCursor.isEndOfSequence {
309-
updatedSyncSequence.syncObjectsPool
315+
(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
316+
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
310317
} else {
311-
nil
318+
(nil, nil)
312319
}
313320
} else {
314321
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
315322
completedSyncObjectsPool = objectMessages.compactMap(\.object)
323+
completedSyncBufferedObjectOperations = nil
316324
}
317325

318326
if let completedSyncObjectsPool {
@@ -323,7 +331,21 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
323331
coreSDK: coreSDK,
324332
logger: logger,
325333
)
326-
// RTO5c3, RTO5c4
334+
335+
// RTO5c6
336+
if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty {
337+
logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
338+
for objectMessage in completedSyncBufferedObjectOperations {
339+
applyObjectProtocolMessageObjectMessage(
340+
objectMessage,
341+
logger: logger,
342+
mapDelegate: mapDelegate,
343+
coreSDK: coreSDK,
344+
)
345+
}
346+
}
347+
348+
// RTO5c3, RTO5c4, RTO5c5
327349
syncSequence = nil
328350

329351
syncStatus.signalSyncComplete()
@@ -342,16 +364,22 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
342364

343365
logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug)
344366

345-
// TODO: RTO8a's buffering
346-
347-
// RTO8b
348-
for objectMessage in objectMessages {
349-
applyObjectProtocolMessageObjectMessage(
350-
objectMessage,
351-
logger: logger,
352-
mapDelegate: mapDelegate,
353-
coreSDK: coreSDK,
354-
)
367+
if let existingSyncSequence = syncSequence {
368+
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
369+
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
370+
var newSyncSequence = existingSyncSequence
371+
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
372+
syncSequence = newSyncSequence
373+
} else {
374+
// RTO8b: Handle the OBJECT message immediately
375+
for objectMessage in objectMessages {
376+
applyObjectProtocolMessageObjectMessage(
377+
objectMessage,
378+
logger: logger,
379+
mapDelegate: mapDelegate,
380+
coreSDK: coreSDK,
381+
)
382+
}
355383
}
356384
}
357385

Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct DefaultRealtimeObjectsTests {
5353
// @spec RTO5b
5454
// @spec RTO5c3
5555
// @spec RTO5c4
56+
// @spec RTO5c5
5657
@Test
5758
func handlesMultiProtocolMessageSync() async throws {
5859
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -94,7 +95,7 @@ struct DefaultRealtimeObjectsTests {
9495
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
9596
)
9697

97-
// Verify sync sequence is cleared and there is no SyncObjectsPool (RTO5c3, RTO5c4)
98+
// Verify sync sequence is cleared and there is no SyncObjectsPool or BufferedObjectOperations (RTO5c3, RTO5c4, RTO5c5)
9899
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
99100

100101
// Verify all objects were applied to pool (side effect of applySyncObjectsPool per RTO5c1b1b)
@@ -108,6 +109,7 @@ struct DefaultRealtimeObjectsTests {
108109

109110
// @spec RTO5a2
110111
// @spec RTO5a2a
112+
// @spec RTO5a2b
111113
@Test
112114
func newSequenceIdDiscardsInFlightSync() async throws {
113115
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -123,6 +125,11 @@ struct DefaultRealtimeObjectsTests {
123125

124126
#expect(realtimeObjects.testsOnly_hasSyncSequence)
125127

128+
// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
129+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [
130+
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
131+
])
132+
126133
// Start new sequence with different ID (RTO5a2)
127134
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
128135
realtimeObjects.handleObjectSyncProtocolMessage(
@@ -142,6 +149,7 @@ struct DefaultRealtimeObjectsTests {
142149
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
143150
let pool = realtimeObjects.testsOnly_objectsPool
144151
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
152+
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
145153
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
146154
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
147155
}
@@ -336,6 +344,7 @@ struct DefaultRealtimeObjectsTests {
336344
// @spec RTO4b2
337345
// @spec RTO4b3
338346
// @spec RTO4b4
347+
// @spec RTO4b5
339348
@Test
340349
func handlesHasObjectsFalse() {
341350
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
@@ -381,7 +390,7 @@ struct DefaultRealtimeObjectsTests {
381390
#expect(newRoot as AnyObject !== originalPool.root as AnyObject) // Should be a new instance
382391
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)
383392

384-
// RTO4b3, RTO4b4: SyncObjectsPool must be cleared, sync sequence cleared
393+
// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared
385394
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
386395
}
387396

@@ -922,5 +931,92 @@ struct DefaultRealtimeObjectsTests {
922931
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts2")
923932
}
924933
}
934+
935+
// Tests that when an OBJECT ProtocolMessage is received during a sync sequence, its operations are buffered per RTO8a and applied after sync completion per RTO5c6.
936+
struct BufferOperationTests {
937+
// @spec RTO8a
938+
// @spec RTO5c6
939+
@Test
940+
func buffersObjectOperationsDuringSyncAndAppliesAfterCompletion() async throws {
941+
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
942+
let sequenceId = "seq123"
943+
944+
// Start sync sequence with first OBJECT_SYNC message
945+
let (entryKey, entry) = TestFactories.stringMapEntry(key: "existingKey", value: "existingValue")
946+
let firstSyncMessages = [
947+
TestFactories.mapObjectMessage(
948+
objectId: "map:1@123",
949+
siteTimeserials: ["site1": "ts1"], // Explicit sync data siteCode and serial
950+
entries: [entryKey: entry],
951+
),
952+
]
953+
realtimeObjects.handleObjectSyncProtocolMessage(
954+
objectMessages: firstSyncMessages,
955+
protocolMessageChannelSerial: "\(sequenceId):cursor1",
956+
)
957+
958+
// Verify sync sequence is active
959+
#expect(realtimeObjects.testsOnly_hasSyncSequence)
960+
961+
// Inject first OBJECT ProtocolMessage during sync (RTO8a)
962+
let firstObjectMessage = TestFactories.mapSetOperationMessage(
963+
objectId: "map:1@123",
964+
key: "key1",
965+
value: "value1",
966+
serial: "ts3", // Higher than sync data "ts1"
967+
siteCode: "site1",
968+
)
969+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [firstObjectMessage])
970+
971+
// Verify the operation was buffered and not applied yet
972+
let poolAfterFirstObject = realtimeObjects.testsOnly_objectsPool
973+
#expect(poolAfterFirstObject.entries["map:1@123"] == nil) // Object not yet created from sync
974+
975+
// Inject second OBJECT ProtocolMessage during sync (RTO8a)
976+
let secondObjectMessage = TestFactories.counterIncOperationMessage(
977+
objectId: "counter:1@456",
978+
amount: 10,
979+
serial: "ts4", // Higher than sync data "ts2"
980+
siteCode: "site1",
981+
)
982+
realtimeObjects.handleObjectProtocolMessage(objectMessages: [secondObjectMessage])
983+
984+
// Verify the second operation was also buffered and not applied yet
985+
let poolAfterSecondObject = realtimeObjects.testsOnly_objectsPool
986+
#expect(poolAfterSecondObject.entries["counter:1@456"] == nil) // Object not yet created from sync
987+
988+
// Complete sync sequence with final OBJECT_SYNC message
989+
let finalSyncMessages = [
990+
TestFactories.counterObjectMessage(
991+
objectId: "counter:1@456",
992+
siteTimeserials: ["site1": "ts2"],
993+
count: 5,
994+
),
995+
]
996+
realtimeObjects.handleObjectSyncProtocolMessage(
997+
objectMessages: finalSyncMessages,
998+
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
999+
)
1000+
1001+
// Verify sync sequence is cleared
1002+
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
1003+
1004+
// Verify all objects were applied to pool from sync
1005+
let finalPool = realtimeObjects.testsOnly_objectsPool
1006+
let map = try #require(finalPool.entries["map:1@123"]?.mapValue)
1007+
let counter = try #require(finalPool.entries["counter:1@456"]?.counterValue)
1008+
1009+
// Verify the buffered operations were applied after sync completion (RTO5c6)
1010+
// Check that MAP_SET operation was applied to the map
1011+
let mapValue = try #require(map.get(key: "key1")?.stringValue)
1012+
#expect(mapValue == "value1")
1013+
#expect(map.testsOnly_siteTimeserials["site1"] == "ts3")
1014+
1015+
// Check that COUNTER_INC operation was applied to the counter
1016+
let counterValue = try counter.value
1017+
#expect(counterValue == 15) // 5 (from sync) + 10 (from buffered operation)
1018+
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts4")
1019+
}
1020+
}
9251021
}
9261022
}

0 commit comments

Comments
 (0)