Skip to content

Commit 06e8ac2

Browse files
Accept a serialTimestamp in replaceData and applySyncObjectsPool
Preparation for making these methods perform tombstoning per [1] (not implemented yet). [1] ably/specification#350
1 parent 8576463 commit 06e8ac2

File tree

8 files changed

+144
-52
lines changed

8 files changed

+144
-52
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
149149
// MARK: - Data manipulation
150150

151151
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
152-
internal func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
152+
///
153+
/// - Parameters:
154+
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
155+
internal func replaceData(
156+
using state: ObjectState,
157+
objectMessageSerialTimestamp: Date?,
158+
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
153159
mutex.withLock {
154-
mutableState.replaceData(using: state)
160+
mutableState.replaceData(using: state, objectMessageSerialTimestamp: objectMessageSerialTimestamp)
155161
}
156162
}
157163

@@ -181,13 +187,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
181187
_ operation: ObjectOperation,
182188
objectMessageSerial: String?,
183189
objectMessageSiteCode: String?,
190+
objectMessageSerialTimestamp: Date?,
184191
objectsPool: inout ObjectsPool,
185192
) {
186193
mutex.withLock {
187194
mutableState.apply(
188195
operation,
189196
objectMessageSerial: objectMessageSerial,
190197
objectMessageSiteCode: objectMessageSiteCode,
198+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
191199
objectsPool: &objectsPool,
192200
logger: logger,
193201
userCallbackQueue: userCallbackQueue,
@@ -205,7 +213,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
205213
internal var data: Double
206214

207215
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
208-
internal mutating func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
216+
///
217+
/// - Parameters:
218+
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
219+
internal mutating func replaceData(
220+
using state: ObjectState,
221+
objectMessageSerialTimestamp: Date?,
222+
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
209223
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
210224
liveObjectMutableState.siteTimeserials = state.siteTimeserials
211225

@@ -249,6 +263,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
249263
_ operation: ObjectOperation,
250264
objectMessageSerial: String?,
251265
objectMessageSiteCode: String?,
266+
objectMessageSerialTimestamp: Date?,
252267
objectsPool: inout ObjectsPool,
253268
logger: Logger,
254269
userCallbackQueue: DispatchQueue,

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,16 @@ internal final class InternalDefaultLiveMap: Sendable {
247247
///
248248
/// - Parameters:
249249
/// - objectsPool: The pool into which should be inserted any objects created by a `MAP_SET` operation.
250-
internal func replaceData(using state: ObjectState, objectsPool: inout ObjectsPool) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
250+
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this map.
251+
internal func replaceData(
252+
using state: ObjectState,
253+
objectMessageSerialTimestamp: Date?,
254+
objectsPool: inout ObjectsPool,
255+
) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
251256
mutex.withLock {
252257
mutableState.replaceData(
253258
using: state,
259+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
254260
objectsPool: &objectsPool,
255261
logger: logger,
256262
clock: clock,
@@ -290,13 +296,15 @@ internal final class InternalDefaultLiveMap: Sendable {
290296
_ operation: ObjectOperation,
291297
objectMessageSerial: String?,
292298
objectMessageSiteCode: String?,
299+
objectMessageSerialTimestamp: Date?,
293300
objectsPool: inout ObjectsPool,
294301
) {
295302
mutex.withLock {
296303
mutableState.apply(
297304
operation,
298305
objectMessageSerial: objectMessageSerial,
299306
objectMessageSiteCode: objectMessageSiteCode,
307+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
300308
objectsPool: &objectsPool,
301309
logger: logger,
302310
userCallbackQueue: userCallbackQueue,
@@ -362,8 +370,10 @@ internal final class InternalDefaultLiveMap: Sendable {
362370
///
363371
/// - Parameters:
364372
/// - objectsPool: The pool into which should be inserted any objects created by a `MAP_SET` operation.
373+
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this map.
365374
internal mutating func replaceData(
366375
using state: ObjectState,
376+
objectMessageSerialTimestamp: Date?,
367377
objectsPool: inout ObjectsPool,
368378
logger: AblyPlugin.Logger,
369379
clock: SimpleClock,
@@ -454,6 +464,7 @@ internal final class InternalDefaultLiveMap: Sendable {
454464
_ operation: ObjectOperation,
455465
objectMessageSerial: String?,
456466
objectMessageSiteCode: String?,
467+
objectMessageSerialTimestamp: Date?,
457468
objectsPool: inout ObjectsPool,
458469
logger: Logger,
459470
userCallbackQueue: DispatchQueue,

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
307307
// RTO5b
308308
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in
309309
if let object = objectMessage.object {
310-
.init(state: object)
310+
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
311311
} else {
312312
nil
313313
}
@@ -324,7 +324,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
324324
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
325325
completedSyncObjectsPool = objectMessages.compactMap { objectMessage in
326326
if let object = objectMessage.object {
327-
.init(state: object)
327+
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
328328
} else {
329329
nil
330330
}
@@ -432,6 +432,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
432432
operation,
433433
objectMessageSerial: objectMessage.serial,
434434
objectMessageSiteCode: objectMessage.siteCode,
435+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
435436
objectsPool: &objectsPool,
436437
)
437438
}

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ internal struct ObjectsPool {
3434
_ operation: ObjectOperation,
3535
objectMessageSerial: String?,
3636
objectMessageSiteCode: String?,
37+
objectMessageSerialTimestamp: Date?,
3738
objectsPool: inout ObjectsPool,
3839
) {
3940
switch self {
@@ -42,13 +43,15 @@ internal struct ObjectsPool {
4243
operation,
4344
objectMessageSerial: objectMessageSerial,
4445
objectMessageSiteCode: objectMessageSiteCode,
46+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
4547
objectsPool: &objectsPool,
4648
)
4749
case let .counter(counter):
4850
counter.apply(
4951
operation,
5052
objectMessageSerial: objectMessageSerial,
5153
objectMessageSiteCode: objectMessageSiteCode,
54+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
5255
objectsPool: &objectsPool,
5356
)
5457
}
@@ -73,12 +76,32 @@ internal struct ObjectsPool {
7376
/// Overrides the internal data for the object as per RTLC6, RTLM6.
7477
///
7578
/// Returns a ``DeferredUpdate`` which contains the object plus an update that should be emitted on this object once the `SyncObjectsPool` has been applied.
76-
fileprivate func replaceData(using state: ObjectState, objectsPool: inout ObjectsPool) -> DeferredUpdate {
79+
///
80+
/// - Parameters:
81+
/// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone the object.
82+
fileprivate func replaceData(
83+
using state: ObjectState,
84+
objectMessageSerialTimestamp: Date?,
85+
objectsPool: inout ObjectsPool,
86+
) -> DeferredUpdate {
7787
switch self {
7888
case let .map(map):
79-
.map(map, map.replaceData(using: state, objectsPool: &objectsPool))
89+
.map(
90+
map,
91+
map.replaceData(
92+
using: state,
93+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
94+
objectsPool: &objectsPool,
95+
),
96+
)
8097
case let .counter(counter):
81-
.counter(counter, counter.replaceData(using: state))
98+
.counter(
99+
counter,
100+
counter.replaceData(
101+
using: state,
102+
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
103+
),
104+
)
82105
}
83106
}
84107
}
@@ -199,7 +222,11 @@ internal struct ObjectsPool {
199222
logger.log("Updating existing object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
200223

201224
// RTO5c1a1: Override the internal data for the object as per RTLC6, RTLM6
202-
let deferredUpdate = existingEntry.replaceData(using: syncObjectsPoolEntry.state, objectsPool: &self)
225+
let deferredUpdate = existingEntry.replaceData(
226+
using: syncObjectsPoolEntry.state,
227+
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
228+
objectsPool: &self,
229+
)
203230
// RTO5c1a2: Store this update to emit at end
204231
updatesToExistingObjects.append(deferredUpdate)
205232
} else {
@@ -213,14 +240,21 @@ internal struct ObjectsPool {
213240
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
214241
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
215242
let counter = InternalDefaultLiveCounter.createZeroValued(objectID: syncObjectsPoolEntry.state.objectId, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
216-
_ = counter.replaceData(using: syncObjectsPoolEntry.state)
243+
_ = counter.replaceData(
244+
using: syncObjectsPoolEntry.state,
245+
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
246+
)
217247
newEntry = .counter(counter)
218248
} else if let objectsMap = syncObjectsPoolEntry.state.map {
219249
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
220250
// set its private objectId equal to ObjectState.objectId, set its private semantics
221251
// equal to ObjectState.map.semantics and override its internal data per RTLM6
222252
let map = InternalDefaultLiveMap.createZeroValued(objectID: syncObjectsPoolEntry.state.objectId, semantics: objectsMap.semantics, logger: logger, userCallbackQueue: userCallbackQueue, clock: clock)
223-
_ = map.replaceData(using: syncObjectsPoolEntry.state, objectsPool: &self)
253+
_ = map.replaceData(
254+
using: syncObjectsPoolEntry.state,
255+
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
256+
objectsPool: &self,
257+
)
224258
newEntry = .map(map)
225259
} else {
226260
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action

Sources/AblyLiveObjects/Internal/SyncObjectsPoolEntry.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,13 @@ import Foundation
33
/// The contents of the spec's `SyncObjectsPool` that is built during an `OBJECT_SYNC` sync sequence.
44
internal struct SyncObjectsPoolEntry {
55
internal var state: ObjectState
6+
/// The `serialTimestamp` of the `ObjectMessage` that generated this entry.
7+
internal var objectMessageSerialTimestamp: Date?
8+
9+
// We replace the default memberwise initializer because we don't want a default argument for objectMessageSerialTimestamp (want to make sure we don't forget to set it whenever we create an entry).
10+
// swiftlint:disable:next unneeded_synthesized_initializer
11+
internal init(state: ObjectState, objectMessageSerialTimestamp: Date?) {
12+
self.state = state
13+
self.objectMessageSerialTimestamp = objectMessageSerialTimestamp
14+
}
615
}

0 commit comments

Comments
 (0)