Skip to content

Commit 8576463

Browse files
Allow SyncObjectsPool entries to contain additional data
This is preparation for tombstoning an object per [1]; we'll need the ObjectMessage's serialTimestamp. [1] ably/specification#350
1 parent 88b2252 commit 8576463

File tree

4 files changed

+44
-26
lines changed

4 files changed

+44
-26
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
4545
internal var id: String
4646

4747
/// The `ObjectMessage`s gathered during this sync sequence.
48-
internal var syncObjectsPool: [ObjectState]
48+
internal var syncObjectsPool: [SyncObjectsPoolEntry]
4949

5050
/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
5151
internal var bufferedObjectOperations: [InboundObjectMessage]
@@ -276,7 +276,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
276276
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
277277

278278
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
279-
let completedSyncObjectsPool: [ObjectState]?
279+
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
280280
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
281281
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
282282

@@ -305,7 +305,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
305305
}
306306

307307
// RTO5b
308-
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))
308+
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in
309+
if let object = objectMessage.object {
310+
.init(state: object)
311+
} else {
312+
nil
313+
}
314+
})
309315

310316
syncSequence = updatedSyncSequence
311317

@@ -316,7 +322,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
316322
}
317323
} else {
318324
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
319-
completedSyncObjectsPool = objectMessages.compactMap(\.object)
325+
completedSyncObjectsPool = objectMessages.compactMap { objectMessage in
326+
if let object = objectMessage.object {
327+
.init(state: object)
328+
} else {
329+
nil
330+
}
331+
}
320332
completedSyncBufferedObjectOperations = nil
321333
}
322334

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ internal struct ObjectsPool {
177177

178178
/// Applies the objects gathered during an `OBJECT_SYNC` to this `ObjectsPool`, per RTO5c1 and RTO5c2.
179179
internal mutating func applySyncObjectsPool(
180-
_ syncObjectsPool: [ObjectState],
180+
_ syncObjectsPool: [SyncObjectsPoolEntry],
181181
logger: AblyPlugin.Logger,
182182
userCallbackQueue: DispatchQueue,
183183
clock: SimpleClock,
@@ -191,46 +191,46 @@ internal struct ObjectsPool {
191191
var updatesToExistingObjects: [ObjectsPool.Entry.DeferredUpdate] = []
192192

193193
// RTO5c1: For each ObjectState member in the SyncObjectsPool list
194-
for objectState in syncObjectsPool {
195-
receivedObjectIds.insert(objectState.objectId)
194+
for syncObjectsPoolEntry in syncObjectsPool {
195+
receivedObjectIds.insert(syncObjectsPoolEntry.state.objectId)
196196

197197
// RTO5c1a: If an object with ObjectState.objectId exists in the internal ObjectsPool
198-
if let existingEntry = entries[objectState.objectId] {
199-
logger.log("Updating existing object with ID: \(objectState.objectId)", level: .debug)
198+
if let existingEntry = entries[syncObjectsPoolEntry.state.objectId] {
199+
logger.log("Updating existing object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
200200

201201
// RTO5c1a1: Override the internal data for the object as per RTLC6, RTLM6
202-
let deferredUpdate = existingEntry.replaceData(using: objectState, objectsPool: &self)
202+
let deferredUpdate = existingEntry.replaceData(using: syncObjectsPoolEntry.state, objectsPool: &self)
203203
// RTO5c1a2: Store this update to emit at end
204204
updatesToExistingObjects.append(deferredUpdate)
205205
} else {
206206
// RTO5c1b: If an object with ObjectState.objectId does not exist in the internal ObjectsPool
207-
logger.log("Creating new object with ID: \(objectState.objectId)", level: .debug)
207+
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
208208

209209
// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
210210
let newEntry: Entry?
211211

212-
if objectState.counter != nil {
212+
if syncObjectsPoolEntry.state.counter != nil {
213213
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
214214
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
215-
let counter = InternalDefaultLiveCounter.createZeroValued(objectID: objectState.objectId, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
216-
_ = counter.replaceData(using: objectState)
215+
let counter = InternalDefaultLiveCounter.createZeroValued(objectID: syncObjectsPoolEntry.state.objectId, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
216+
_ = counter.replaceData(using: syncObjectsPoolEntry.state)
217217
newEntry = .counter(counter)
218-
} else if let objectsMap = objectState.map {
218+
} else if let objectsMap = syncObjectsPoolEntry.state.map {
219219
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
220220
// set its private objectId equal to ObjectState.objectId, set its private semantics
221221
// equal to ObjectState.map.semantics and override its internal data per RTLM6
222-
let map = InternalDefaultLiveMap.createZeroValued(objectID: objectState.objectId, semantics: objectsMap.semantics, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
223-
_ = map.replaceData(using: objectState, objectsPool: &self)
222+
let map = InternalDefaultLiveMap.createZeroValued(objectID: syncObjectsPoolEntry.state.objectId, semantics: objectsMap.semantics, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
223+
_ = map.replaceData(using: syncObjectsPoolEntry.state, objectsPool: &self)
224224
newEntry = .map(map)
225225
} else {
226226
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action
227-
logger.log("Unsupported object state message received for objectId: \(objectState.objectId)", level: .warn)
227+
logger.log("Unsupported object state message received for objectId: \(syncObjectsPoolEntry.state.objectId)", level: .warn)
228228
newEntry = nil
229229
}
230230

231231
if let newEntry {
232232
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
233-
entries[objectState.objectId] = newEntry
233+
entries[syncObjectsPoolEntry.state.objectId] = newEntry
234234
}
235235
}
236236
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import Foundation
2+
3+
/// The contents of the spec's `SyncObjectsPool` that is built during an `OBJECT_SYNC` sync sequence.
4+
internal struct SyncObjectsPoolEntry {
5+
internal var state: ObjectState
6+
}

Tests/AblyLiveObjectsTests/ObjectsPoolTests.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ struct ObjectsPoolTests {
100100
entries: [key: entry],
101101
)
102102

103-
pool.applySyncObjectsPool([objectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
103+
pool.applySyncObjectsPool([.init(state: objectState)], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
104104

105105
// Verify the existing map was updated by checking side effects of InternalDefaultLiveMap.replaceData(using:)
106106
let updatedMap = try #require(pool.entries["map:hash@123"]?.mapValue)
@@ -135,7 +135,7 @@ struct ObjectsPoolTests {
135135
count: 10,
136136
)
137137

138-
pool.applySyncObjectsPool([objectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
138+
pool.applySyncObjectsPool([.init(state: objectState)], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
139139

140140
// Verify the existing counter was updated by checking side effects of InternalDefaultLiveCounter.replaceData(using:)
141141
let updatedCounter = try #require(pool.entries["counter:hash@123"]?.counterValue)
@@ -163,7 +163,7 @@ struct ObjectsPoolTests {
163163
count: 100,
164164
)
165165

166-
pool.applySyncObjectsPool([objectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
166+
pool.applySyncObjectsPool([.init(state: objectState)], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
167167

168168
// Verify a new counter was created and data was set by checking side effects of InternalDefaultLiveCounter.replaceData(using:)
169169
let newCounter = try #require(pool.entries["counter:hash@456"]?.counterValue)
@@ -191,7 +191,7 @@ struct ObjectsPoolTests {
191191
entries: [key: entry],
192192
)
193193

194-
pool.applySyncObjectsPool([objectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
194+
pool.applySyncObjectsPool([.init(state: objectState)], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
195195

196196
// Verify a new map was created and data was set by checking side effects of InternalDefaultLiveMap.replaceData(using:)
197197
let newMap = try #require(pool.entries["map:hash@789"]?.mapValue)
@@ -218,7 +218,7 @@ struct ObjectsPoolTests {
218218

219219
let invalidObjectState = TestFactories.objectState(objectId: "invalid")
220220

221-
pool.applySyncObjectsPool([invalidObjectState, validObjectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
221+
pool.applySyncObjectsPool([invalidObjectState, validObjectState].map { .init(state: $0) }, logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
222222

223223
// Check that there's no entry for the key that we don't know how to handle, and that it didn't interfere with the insertion of the we one that we do know how to handle
224224
#expect(Set(pool.entries.keys) == ["root", "counter:hash@456"])
@@ -243,7 +243,7 @@ struct ObjectsPoolTests {
243243
// Only sync one of the existing objects
244244
let objectState = TestFactories.mapObjectState(objectId: "map:hash@1")
245245

246-
pool.applySyncObjectsPool([objectState], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
246+
pool.applySyncObjectsPool([.init(state: objectState)], logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
247247

248248
// Verify only synced object and root remain
249249
#expect(pool.entries.count == 2) // root + map:hash@1
@@ -324,7 +324,7 @@ struct ObjectsPoolTests {
324324
// Note: "map:toremove@1" is not in sync, so it should be removed
325325
]
326326

327-
pool.applySyncObjectsPool(syncObjects, logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
327+
pool.applySyncObjectsPool(syncObjects.map { .init(state: $0) }, logger: logger, userCallbackQueue: .main, clock: MockSimpleClock())
328328

329329
// Verify final state
330330
#expect(pool.entries.count == 5) // root + 4 synced objects

0 commit comments

Comments
 (0)