@@ -11,45 +11,43 @@ internal final class InternalDefaultLiveCounter: Sendable {
1111
1212 internal var testsOnly_siteTimeserials : [ String : String ] {
1313 mutex. withLock {
14- mutableState. liveObject . siteTimeserials
14+ mutableState. liveObjectMutableState . siteTimeserials
1515 }
1616 }
1717
1818 internal var testsOnly_createOperationIsMerged : Bool {
1919 mutex. withLock {
20- mutableState. liveObject. createOperationIsMerged
21- }
22- }
23-
24- internal var testsOnly_objectID : String {
25- mutex. withLock {
26- mutableState. liveObject. objectID
20+ mutableState. liveObjectMutableState. createOperationIsMerged
2721 }
2822 }
2923
3024 private let logger : AblyPlugin . Logger
3125 private let userCallbackQueue : DispatchQueue
26+ private let clock : SimpleClock
3227
3328 // MARK: - Initialization
3429
3530 internal convenience init (
3631 testsOnly_data data: Double ,
3732 objectID: String ,
3833 logger: AblyPlugin . Logger ,
39- userCallbackQueue: DispatchQueue
34+ userCallbackQueue: DispatchQueue ,
35+ clock: SimpleClock
4036 ) {
41- self . init ( data: data, objectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue)
37+ self . init ( data: data, objectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue, clock : clock )
4238 }
4339
4440 private init (
4541 data: Double ,
4642 objectID: String ,
4743 logger: AblyPlugin . Logger ,
48- userCallbackQueue: DispatchQueue
44+ userCallbackQueue: DispatchQueue ,
45+ clock: SimpleClock
4946 ) {
50- mutableState = . init( liveObject : . init( objectID: objectID) , data: data)
47+ mutableState = . init( liveObjectMutableState : . init( objectID: objectID) , data: data)
5148 self . logger = logger
5249 self . userCallbackQueue = userCallbackQueue
50+ self . clock = clock
5351 }
5452
5553 /// Creates a "zero-value LiveCounter", per RTLC4.
@@ -60,27 +58,30 @@ internal final class InternalDefaultLiveCounter: Sendable {
6058 objectID: String ,
6159 logger: AblyPlugin . Logger ,
6260 userCallbackQueue: DispatchQueue ,
61+ clock: SimpleClock ,
6362 ) -> Self {
6463 . init(
6564 data: 0 ,
6665 objectID: objectID,
6766 logger: logger,
6867 userCallbackQueue: userCallbackQueue,
68+ clock: clock,
6969 )
7070 }
7171
72+ // MARK: - Data access
73+
74+ internal var objectID : String {
75+ mutex. withLock {
76+ mutableState. liveObjectMutableState. objectID
77+ }
78+ }
79+
7280 // MARK: - Internal methods that back LiveCounter conformance
7381
7482 internal func value( coreSDK: CoreSDK ) throws ( ARTErrorInfo) -> Double {
7583 // RTLC5b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
76- let currentChannelState = coreSDK. channelState
77- if currentChannelState == . detached || currentChannelState == . failed {
78- throw LiveObjectsError . objectsOperationFailedInvalidChannelState (
79- operationDescription: " LiveCounter.value " ,
80- channelState: currentChannelState,
81- )
82- . toARTErrorInfo ( )
83- }
84+ try coreSDK. validateChannelState ( notIn: [ . detached, . failed] , operationDescription: " LiveCounter.value " )
8485
8586 return mutex. withLock {
8687 // RTLC5c
@@ -100,21 +101,21 @@ internal final class InternalDefaultLiveCounter: Sendable {
100101 internal func subscribe( listener: @escaping LiveObjectUpdateCallback < DefaultLiveCounterUpdate > , coreSDK: CoreSDK ) throws ( ARTErrorInfo) -> any SubscribeResponse {
101102 try mutex. ablyLiveObjects_withLockWithTypedThrow { ( ) throws ( ARTErrorInfo) in
102103 // swiftlint:disable:next trailing_closure
103- try mutableState. liveObject . subscribe ( listener: listener, coreSDK: coreSDK, updateSelfLater: { [ weak self] action in
104+ try mutableState. liveObjectMutableState . subscribe ( listener: listener, coreSDK: coreSDK, updateSelfLater: { [ weak self] action in
104105 guard let self else {
105106 return
106107 }
107108
108109 mutex. withLock {
109- action ( & mutableState. liveObject )
110+ action ( & mutableState. liveObjectMutableState )
110111 }
111112 } )
112113 }
113114 }
114115
115116 internal func unsubscribeAll( ) {
116117 mutex. withLock {
117- mutableState. liveObject . unsubscribeAll ( )
118+ mutableState. liveObjectMutableState . unsubscribeAll ( )
118119 }
119120 }
120121
@@ -134,21 +135,27 @@ internal final class InternalDefaultLiveCounter: Sendable {
134135 /// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
135136 internal func emit( _ update: LiveObjectUpdate < DefaultLiveCounterUpdate > ) {
136137 mutex. withLock {
137- mutableState. liveObject . emit ( update, on: userCallbackQueue)
138+ mutableState. liveObjectMutableState . emit ( update, on: userCallbackQueue)
138139 }
139140 }
140141
141142 // MARK: - Data manipulation
142143
143144 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
144- internal func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
145+ ///
146+ /// - Parameters:
147+ /// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
148+ internal func replaceData(
149+ using state: ObjectState ,
150+ objectMessageSerialTimestamp: Date ? ,
151+ ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
145152 mutex. withLock {
146- mutableState. replaceData ( using: state)
153+ mutableState. replaceData ( using: state, objectMessageSerialTimestamp : objectMessageSerialTimestamp )
147154 }
148155 }
149156
150- /// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
151- internal func testsOnly_mergeInitialValue ( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
157+ /// Merges the initial value from an ObjectOperation into this LiveCounter , per RTLC10.
158+ internal func mergeInitialValue ( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
152159 mutex. withLock {
153160 mutableState. mergeInitialValue ( from: operation)
154161 }
@@ -173,13 +180,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
173180 _ operation: ObjectOperation ,
174181 objectMessageSerial: String ? ,
175182 objectMessageSiteCode: String ? ,
183+ objectMessageSerialTimestamp: Date ? ,
176184 objectsPool: inout ObjectsPool ,
177185 ) {
178186 mutex. withLock {
179187 mutableState. apply (
180188 operation,
181189 objectMessageSerial: objectMessageSerial,
182190 objectMessageSiteCode: objectMessageSiteCode,
191+ objectMessageSerialTimestamp: objectMessageSerialTimestamp,
183192 objectsPool: & objectsPool,
184193 logger: logger,
185194 userCallbackQueue: userCallbackQueue,
@@ -189,20 +198,26 @@ internal final class InternalDefaultLiveCounter: Sendable {
189198
190199 // MARK: - Mutable state and the operations that affect it
191200
192- private struct MutableState {
201+ private struct MutableState : InternalLiveObject {
193202 /// The mutable state common to all LiveObjects.
194- internal var liveObject : LiveObjectMutableState < DefaultLiveCounterUpdate >
203+ internal var liveObjectMutableState : LiveObjectMutableState < DefaultLiveCounterUpdate >
195204
196205 /// The internal data that this map holds, per RTLC3.
197206 internal var data : Double
198207
199208 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
200- internal mutating func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
209+ ///
210+ /// - Parameters:
211+ /// - objectMessageSerialTimestamp: The `serialTimestamp` of the containing `ObjectMessage`. Used if we need to tombstone this counter.
212+ internal mutating func replaceData(
213+ using state: ObjectState ,
214+ objectMessageSerialTimestamp: Date ? ,
215+ ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
201216 // RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
202- liveObject . siteTimeserials = state. siteTimeserials
217+ liveObjectMutableState . siteTimeserials = state. siteTimeserials
203218
204219 // RTLC6b: Set the private flag createOperationIsMerged to false
205- liveObject . createOperationIsMerged = false
220+ liveObjectMutableState . createOperationIsMerged = false
206221
207222 // RTLC6c: Set data to the value of ObjectState.counter.count, or to 0 if it does not exist
208223 data = state. counter? . count? . doubleValue ?? 0
@@ -231,7 +246,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
231246 }
232247
233248 // RTLC10b: Set the private flag createOperationIsMerged to true
234- liveObject . createOperationIsMerged = true
249+ liveObjectMutableState . createOperationIsMerged = true
235250
236251 return update
237252 }
@@ -241,18 +256,19 @@ internal final class InternalDefaultLiveCounter: Sendable {
241256 _ operation: ObjectOperation ,
242257 objectMessageSerial: String ? ,
243258 objectMessageSiteCode: String ? ,
259+ objectMessageSerialTimestamp: Date ? ,
244260 objectsPool: inout ObjectsPool ,
245261 logger: Logger ,
246262 userCallbackQueue: DispatchQueue ,
247263 ) {
248- guard let applicableOperation = liveObject . canApplyOperation ( objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
264+ guard let applicableOperation = liveObjectMutableState . canApplyOperation ( objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
249265 // RTLC7b
250266 logger. log ( " Operation \( operation) (serial: \( String ( describing: objectMessageSerial) ) , siteCode: \( String ( describing: objectMessageSiteCode) ) ) should not be applied; discarding " , level: . debug)
251267 return
252268 }
253269
254270 // RTLC7c
255- liveObject . siteTimeserials [ applicableOperation. objectMessageSiteCode] = applicableOperation. objectMessageSerial
271+ liveObjectMutableState . siteTimeserials [ applicableOperation. objectMessageSiteCode] = applicableOperation. objectMessageSerial
256272
257273 switch operation. action {
258274 case . known( . counterCreate) :
@@ -262,12 +278,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
262278 logger: logger,
263279 )
264280 // RTLC7d1a
265- liveObject . emit ( update, on: userCallbackQueue)
281+ liveObjectMutableState . emit ( update, on: userCallbackQueue)
266282 case . known( . counterInc) :
267283 // RTLC7d2
268284 let update = applyCounterIncOperation ( operation. counterOp)
269285 // RTLC7d2a
270- liveObject . emit ( update, on: userCallbackQueue)
286+ liveObjectMutableState . emit ( update, on: userCallbackQueue)
271287 default :
272288 // RTLC7d3
273289 logger. log ( " Operation \( operation) has unsupported action for LiveCounter; discarding " , level: . warn)
@@ -279,7 +295,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
279295 _ operation: ObjectOperation ,
280296 logger: Logger ,
281297 ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
282- if liveObject . createOperationIsMerged {
298+ if liveObjectMutableState . createOperationIsMerged {
283299 // RTLC8b
284300 logger. log ( " Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied " , level: . warn)
285301 return . noop
0 commit comments