@@ -96,14 +96,29 @@ internal final class InternalDefaultLiveCounter: Sendable {
9696 notYetImplemented ( )
9797 }
9898
99- internal func subscribe( listener _: @escaping LiveObjectUpdateCallback < LiveCounterUpdate > ) -> any SubscribeResponse {
100- notYetImplemented ( )
99+ @discardableResult
100+ internal func subscribe( listener: @escaping LiveObjectUpdateCallback < DefaultLiveCounterUpdate > , coreSDK: CoreSDK ) throws ( ARTErrorInfo) -> any SubscribeResponse {
101+ try mutex. ablyLiveObjects_withLockWithTypedThrow { ( ) throws ( ARTErrorInfo) in
102+ // swiftlint:disable:next trailing_closure
103+ try mutableState. liveObject. subscribe ( listener: listener, coreSDK: coreSDK, updateSelfLater: { [ weak self] action in
104+ guard let self else {
105+ return
106+ }
107+
108+ mutex. withLock {
109+ action ( & mutableState. liveObject)
110+ }
111+ } )
112+ }
101113 }
102114
103115 internal func unsubscribeAll( ) {
104- notYetImplemented ( )
116+ mutex. withLock {
117+ mutableState. liveObject. unsubscribeAll ( )
118+ }
105119 }
106120
121+ @discardableResult
107122 internal func on( event _: LiveObjectLifecycleEvent , callback _: @escaping LiveObjectLifecycleEventCallback ) -> any OnLiveObjectLifecycleEventResponse {
108123 notYetImplemented ( )
109124 }
@@ -112,31 +127,42 @@ internal final class InternalDefaultLiveCounter: Sendable {
112127 notYetImplemented ( )
113128 }
114129
130+ // MARK: - Emitting update from external sources
131+
132+ /// Emit an event from this `LiveCounter`.
133+ ///
134+ /// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
135+ internal func emit( _ update: LiveObjectUpdate < DefaultLiveCounterUpdate > ) {
136+ mutex. withLock {
137+ mutableState. liveObject. emit ( update, on: userCallbackQueue)
138+ }
139+ }
140+
115141 // MARK: - Data manipulation
116142
117143 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
118- internal func replaceData( using state: ObjectState ) {
144+ internal func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
119145 mutex. withLock {
120146 mutableState. replaceData ( using: state)
121147 }
122148 }
123149
124150 /// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
125- internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) {
151+ internal func testsOnly_mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
126152 mutex. withLock {
127153 mutableState. mergeInitialValue ( from: operation)
128154 }
129155 }
130156
131157 /// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
132- internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) {
158+ internal func testsOnly_applyCounterCreateOperation( _ operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
133159 mutex. withLock {
134160 mutableState. applyCounterCreateOperation ( operation, logger: logger)
135161 }
136162 }
137163
138164 /// Test-only method to apply a COUNTER_INC operation, per RTLC9.
139- internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
165+ internal func testsOnly_applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
140166 mutex. withLock {
141167 mutableState. applyCounterIncOperation ( operation)
142168 }
@@ -156,6 +182,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
156182 objectMessageSiteCode: objectMessageSiteCode,
157183 objectsPool: & objectsPool,
158184 logger: logger,
185+ userCallbackQueue: userCallbackQueue,
159186 )
160187 }
161188 }
@@ -164,13 +191,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
164191
165192 private struct MutableState {
166193 /// The mutable state common to all LiveObjects.
167- internal var liveObject : LiveObjectMutableState
194+ internal var liveObject : LiveObjectMutableState < DefaultLiveCounterUpdate >
168195
169196 /// The internal data that this map holds, per RTLC3.
170197 internal var data : Double
171198
172199 /// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
173- internal mutating func replaceData( using state: ObjectState ) {
200+ internal mutating func replaceData( using state: ObjectState ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
174201 // RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
175202 liveObject. siteTimeserials = state. siteTimeserials
176203
@@ -181,19 +208,32 @@ internal final class InternalDefaultLiveCounter: Sendable {
181208 data = state. counter? . count? . doubleValue ?? 0
182209
183210 // RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
184- if let createOp = state. createOp {
211+ return if let createOp = state. createOp {
185212 mergeInitialValue ( from: createOp)
213+ } else {
214+ // TODO: I assume this is what to do, clarify in https://github.com/ably/specification/pull/346/files#r2201363446
215+ . noop
186216 }
187217 }
188218
189219 /// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
190- internal mutating func mergeInitialValue( from operation: ObjectOperation ) {
220+ internal mutating func mergeInitialValue( from operation: ObjectOperation ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
221+ let update : LiveObjectUpdate < DefaultLiveCounterUpdate >
222+
191223 // RTLC10a: Add ObjectOperation.counter.count to data, if it exists
192224 if let operationCount = operation. counter? . count? . doubleValue {
193225 data += operationCount
226+ // RTLC10c
227+ update = . update( DefaultLiveCounterUpdate ( amount: operationCount) )
228+ } else {
229+ // RTLC10d
230+ update = . noop
194231 }
232+
195233 // RTLC10b: Set the private flag createOperationIsMerged to true
196234 liveObject. createOperationIsMerged = true
235+
236+ return update
197237 }
198238
199239 /// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
@@ -203,6 +243,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
203243 objectMessageSiteCode: String ? ,
204244 objectsPool: inout ObjectsPool ,
205245 logger: Logger ,
246+ userCallbackQueue: DispatchQueue ,
206247 ) {
207248 guard let applicableOperation = liveObject. canApplyOperation ( objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
208249 // RTLC7b
@@ -216,13 +257,17 @@ internal final class InternalDefaultLiveCounter: Sendable {
216257 switch operation. action {
217258 case . known( . counterCreate) :
218259 // RTLC7d1
219- applyCounterCreateOperation (
260+ let update = applyCounterCreateOperation (
220261 operation,
221262 logger: logger,
222263 )
264+ // RTLC7d1a
265+ liveObject. emit ( update, on: userCallbackQueue)
223266 case . known( . counterInc) :
224267 // RTLC7d2
225- applyCounterIncOperation ( operation. counterOp)
268+ let update = applyCounterIncOperation ( operation. counterOp)
269+ // RTLC7d2a
270+ liveObject. emit ( update, on: userCallbackQueue)
226271 default :
227272 // RTLC7d3
228273 logger. log ( " Operation \( operation) has unsupported action for LiveCounter; discarding " , level: . warn)
@@ -233,25 +278,28 @@ internal final class InternalDefaultLiveCounter: Sendable {
233278 internal mutating func applyCounterCreateOperation(
234279 _ operation: ObjectOperation ,
235280 logger: Logger ,
236- ) {
281+ ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
237282 if liveObject. createOperationIsMerged {
238283 // RTLC8b
239284 logger. log ( " Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied " , level: . warn)
240- return
285+ return . noop
241286 }
242287
243- // RTLC8c
244- mergeInitialValue ( from: operation)
288+ // RTLC8c, RTLC8e
289+ return mergeInitialValue ( from: operation)
245290 }
246291
247292 /// Applies a `COUNTER_INC` operation, per RTLC9.
248- internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) {
293+ internal mutating func applyCounterIncOperation( _ operation: WireObjectsCounterOp ? ) -> LiveObjectUpdate < DefaultLiveCounterUpdate > {
249294 guard let operation else {
250- return
295+ // RTL9e
296+ return . noop
251297 }
252298
253- // RTLC9b
254- data += operation. amount. doubleValue
299+ // RTLC9b, RTLC9d
300+ let amount = operation. amount. doubleValue
301+ data += amount
302+ return . update( DefaultLiveCounterUpdate ( amount: amount) )
255303 }
256304 }
257305}
0 commit comments