Skip to content

Commit 965eb05

Browse files
Implement subscriptions spec
Based on [1] at 2963300. Have not implemented RTL04b1's channel mode checking for same reason as mentioned in 8d881e2. Have not currently tested `replaceData`'s return value; will do once [2] clarified. [1] ably/specification#346 [2] https://github.com/ably/specification/pull/346/files#r2201363446
1 parent 2c3bd97 commit 965eb05

17 files changed

+874
-132
lines changed

CONTRIBUTING.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ To check formatting and code quality, run `swift run BuildTool lint`. Run with `
4444
### Throwing errors
4545

4646
- The public API of the SDK should use typed throws, and the thrown errors should be of type `ARTErrorInfo`.
47-
- `Dictionary.mapValues` does not support typed throws. We have our own extension `ablyLiveObjects_mapValuesWithTypedThrow` which does; use this.
47+
- Some platform methods do not support typed throws. In these cases, we have our own extension which does; use this instead. They are:
48+
- `Dictionary.mapValues`; use `ablyLiveObjects_mapValuesWithTypedThrow`.
49+
- `NSLock.withLock`; use `ablyLiveObjects_mapValuesWithTypedThrow`.
4850

4951
### Testing guidelines
5052

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ internal final class DefaultRealtimeObjects: Sendable, LiveMapObjectPoolDelegate
138138
notYetImplemented()
139139
}
140140

141+
@discardableResult
141142
internal func on(event _: ObjectsEvent, callback _: ObjectsEventCallback) -> any OnObjectsEventResponse {
142143
notYetImplemented()
143144
}

Sources/AblyLiveObjects/Internal/DefaultLiveCounter.swift

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,29 @@ internal final class DefaultLiveCounter: Sendable {
9696
notYetImplemented()
9797
}
9898

99-
internal func subscribe(listener _: @escaping LiveObjectUpdateCallback<LiveCounterUpdate>) -> any SubscribeResponse {
100-
notYetImplemented()
99+
@discardableResult
100+
internal func subscribe(listener: @escaping LiveObjectUpdateCallback<DefaultLiveCounterUpdate>, coreSDK: CoreSDK) throws(ARTErrorInfo) -> any SubscribeResponse {
101+
try mutex.ablyLiveObjects_withLockWithTypedThrow { () throws(ARTErrorInfo) in
102+
// swiftlint:disable:next trailing_closure
103+
try mutableState.liveObject.subscribe(listener: listener, coreSDK: coreSDK, updateSelfLater: { [weak self] updater in
104+
guard let self else {
105+
return
106+
}
107+
108+
mutex.withLock {
109+
updater(&mutableState.liveObject)
110+
}
111+
})
112+
}
101113
}
102114

103115
internal func unsubscribeAll() {
104-
notYetImplemented()
116+
mutex.withLock {
117+
mutableState.liveObject.unsubscribeAll()
118+
}
105119
}
106120

121+
@discardableResult
107122
internal func on(event _: LiveObjectLifecycleEvent, callback _: @escaping LiveObjectLifecycleEventCallback) -> any OnLiveObjectLifecycleEventResponse {
108123
notYetImplemented()
109124
}
@@ -112,31 +127,42 @@ internal final class DefaultLiveCounter: Sendable {
112127
notYetImplemented()
113128
}
114129

130+
// MARK: - Emitting update from external sources
131+
132+
/// Emit an event from this `LiveCounter`.
133+
///
134+
/// This is used to instruct this counter to emit updates during an `OBJECT_SYNC`.
135+
internal func emit(_ update: LiveObjectUpdate<DefaultLiveCounterUpdate>) {
136+
mutex.withLock {
137+
mutableState.liveObject.emit(update, on: userCallbackQueue)
138+
}
139+
}
140+
115141
// MARK: - Data manipulation
116142

117143
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
118-
internal func replaceData(using state: ObjectState) {
144+
internal func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
119145
mutex.withLock {
120146
mutableState.replaceData(using: state)
121147
}
122148
}
123149

124150
/// Test-only method to merge initial value from an ObjectOperation, per RTLC10.
125-
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) {
151+
internal func testsOnly_mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
126152
mutex.withLock {
127153
mutableState.mergeInitialValue(from: operation)
128154
}
129155
}
130156

131157
/// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
132-
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) {
158+
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
133159
mutex.withLock {
134160
mutableState.applyCounterCreateOperation(operation, logger: logger)
135161
}
136162
}
137163

138164
/// Test-only method to apply a COUNTER_INC operation, per RTLC9.
139-
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
165+
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
140166
mutex.withLock {
141167
mutableState.applyCounterIncOperation(operation)
142168
}
@@ -156,6 +182,7 @@ internal final class DefaultLiveCounter: Sendable {
156182
objectMessageSiteCode: objectMessageSiteCode,
157183
objectsPool: &objectsPool,
158184
logger: logger,
185+
userCallbackQueue: userCallbackQueue,
159186
)
160187
}
161188
}
@@ -164,13 +191,13 @@ internal final class DefaultLiveCounter: Sendable {
164191

165192
private struct MutableState {
166193
/// The mutable state common to all LiveObjects.
167-
internal var liveObject: LiveObjectMutableState
194+
internal var liveObject: LiveObjectMutableState<DefaultLiveCounterUpdate>
168195

169196
/// The internal data that this map holds, per RTLC3.
170197
internal var data: Double
171198

172199
/// Replaces the internal data of this counter with the provided ObjectState, per RTLC6.
173-
internal mutating func replaceData(using state: ObjectState) {
200+
internal mutating func replaceData(using state: ObjectState) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
174201
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
175202
liveObject.siteTimeserials = state.siteTimeserials
176203

@@ -181,19 +208,32 @@ internal final class DefaultLiveCounter: Sendable {
181208
data = state.counter?.count?.doubleValue ?? 0
182209

183210
// RTLC6d: If ObjectState.createOp is present, merge the initial value into the LiveCounter as described in RTLC10
184-
if let createOp = state.createOp {
211+
return if let createOp = state.createOp {
185212
mergeInitialValue(from: createOp)
213+
} else {
214+
// TODO: I assume this is what to do, clarify in https://github.com/ably/specification/pull/346/files#r2201363446
215+
.noop
186216
}
187217
}
188218

189219
/// Merges the initial value from an ObjectOperation into this LiveCounter, per RTLC10.
190-
internal mutating func mergeInitialValue(from operation: ObjectOperation) {
220+
internal mutating func mergeInitialValue(from operation: ObjectOperation) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
221+
let update: LiveObjectUpdate<DefaultLiveCounterUpdate>
222+
191223
// RTLC10a: Add ObjectOperation.counter.count to data, if it exists
192224
if let operationCount = operation.counter?.count?.doubleValue {
193225
data += operationCount
226+
// RTLC10c
227+
update = .update(DefaultLiveCounterUpdate(amount: operationCount))
228+
} else {
229+
// RTLC10d
230+
update = .noop
194231
}
232+
195233
// RTLC10b: Set the private flag createOperationIsMerged to true
196234
liveObject.createOperationIsMerged = true
235+
236+
return update
197237
}
198238

199239
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
@@ -203,6 +243,7 @@ internal final class DefaultLiveCounter: Sendable {
203243
objectMessageSiteCode: String?,
204244
objectsPool: inout ObjectsPool,
205245
logger: Logger,
246+
userCallbackQueue: DispatchQueue,
206247
) {
207248
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
208249
// RTLC7b
@@ -216,13 +257,17 @@ internal final class DefaultLiveCounter: Sendable {
216257
switch operation.action {
217258
case .known(.counterCreate):
218259
// RTLC7d1
219-
applyCounterCreateOperation(
260+
let update = applyCounterCreateOperation(
220261
operation,
221262
logger: logger,
222263
)
264+
// RTLC7d1a
265+
liveObject.emit(update, on: userCallbackQueue)
223266
case .known(.counterInc):
224267
// RTLC7d2
225-
applyCounterIncOperation(operation.counterOp)
268+
let update = applyCounterIncOperation(operation.counterOp)
269+
// RTLC7d2a
270+
liveObject.emit(update, on: userCallbackQueue)
226271
default:
227272
// RTLC7d3
228273
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
@@ -233,25 +278,28 @@ internal final class DefaultLiveCounter: Sendable {
233278
internal mutating func applyCounterCreateOperation(
234279
_ operation: ObjectOperation,
235280
logger: Logger,
236-
) {
281+
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
237282
if liveObject.createOperationIsMerged {
238283
// RTLC8b
239284
logger.log("Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied", level: .warn)
240-
return
285+
return .noop
241286
}
242287

243-
// RTLC8c
244-
mergeInitialValue(from: operation)
288+
// RTLC8c, RTLC8e
289+
return mergeInitialValue(from: operation)
245290
}
246291

247292
/// Applies a `COUNTER_INC` operation, per RTLC9.
248-
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
293+
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
249294
guard let operation else {
250-
return
295+
// RTL9e
296+
return .noop
251297
}
252298

253-
// RTLC9b
254-
data += operation.amount.doubleValue
299+
// RTLC9b, RTLC9d
300+
let amount = operation.amount.doubleValue
301+
data += amount
302+
return .update(DefaultLiveCounterUpdate(amount: amount))
255303
}
256304
}
257305
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
internal struct DefaultLiveCounterUpdate: LiveCounterUpdate, Equatable {
2+
internal var amount: Double
3+
}

0 commit comments

Comments
 (0)