Skip to content

Commit c3f82c3

Browse files
Apply OBJECT ProtocolMessages
Based on [1] at 29276a5. I wrote the implementation, and for the tests followed the development approach described in 4494033. We have a separate issue for applying RTO8a's buffering during a sync, so I haven't done that here. [1] ably/specification#343
1 parent 8ec90e4 commit c3f82c3

File tree

8 files changed

+1034
-1
lines changed

8 files changed

+1034
-1
lines changed

Sources/AblyLiveObjects/DefaultRealtimeObjects.swift

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,17 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
166166
receivedObjectProtocolMessages
167167
}
168168

169+
/// Implements the `OBJECT` handling of RTO8.
169170
internal func handleObjectProtocolMessage(objectMessages: [InboundObjectMessage]) {
170-
receivedObjectProtocolMessagesContinuation.yield(objectMessages)
171+
mutex.withLock {
172+
mutableState.handleObjectProtocolMessage(
173+
objectMessages: objectMessages,
174+
logger: logger,
175+
receivedObjectProtocolMessagesContinuation: receivedObjectProtocolMessagesContinuation,
176+
mapDelegate: self,
177+
coreSDK: coreSDK,
178+
)
179+
}
171180
}
172181

173182
internal var testsOnly_receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]> {
@@ -320,5 +329,80 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
320329
syncStatus.signalSyncComplete()
321330
}
322331
}
332+
333+
/// Implements the `OBJECT` handling of RTO8.
334+
internal mutating func handleObjectProtocolMessage(
335+
objectMessages: [InboundObjectMessage],
336+
logger: Logger,
337+
receivedObjectProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation,
338+
mapDelegate: LiveMapObjectPoolDelegate,
339+
coreSDK: CoreSDK,
340+
) {
341+
receivedObjectProtocolMessagesContinuation.yield(objectMessages)
342+
343+
logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug)
344+
345+
// TODO: RTO8a's buffering
346+
347+
// RTO8b
348+
for objectMessage in objectMessages {
349+
applyObjectProtocolMessageObjectMessage(
350+
objectMessage,
351+
logger: logger,
352+
mapDelegate: mapDelegate,
353+
coreSDK: coreSDK,
354+
)
355+
}
356+
}
357+
358+
/// Implements the `OBJECT` application of RTO9.
359+
private mutating func applyObjectProtocolMessageObjectMessage(
360+
_ objectMessage: InboundObjectMessage,
361+
logger: Logger,
362+
mapDelegate: LiveMapObjectPoolDelegate,
363+
coreSDK: CoreSDK,
364+
) {
365+
guard let operation = objectMessage.operation else {
366+
// RTO9a1
367+
logger.log("Unsupported OBJECT message received (no operation); \(objectMessage)", level: .warn)
368+
return
369+
}
370+
371+
// RTO9a2a1, RTO9a2a2
372+
let entry: ObjectsPool.Entry
373+
if let existingEntry = objectsPool.entries[operation.objectId] {
374+
entry = existingEntry
375+
} else {
376+
guard let newEntry = objectsPool.createZeroValueObject(
377+
forObjectID: operation.objectId,
378+
mapDelegate: mapDelegate,
379+
coreSDK: coreSDK,
380+
logger: logger,
381+
) else {
382+
logger.log("Unable to create zero-value object for \(operation.objectId) when processing OBJECT message; dropping", level: .warn)
383+
return
384+
}
385+
386+
entry = newEntry
387+
}
388+
389+
switch operation.action {
390+
case let .known(action):
391+
switch action {
392+
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete:
393+
// RTO9a2a3
394+
entry.apply(
395+
operation,
396+
objectMessageSerial: objectMessage.serial,
397+
objectMessageSiteCode: objectMessage.siteCode,
398+
objectsPool: &objectsPool,
399+
)
400+
}
401+
case let .unknown(rawValue):
402+
// RTO9a2b
403+
logger.log("Unsupported OBJECT operation action \(rawValue) received", level: .warn)
404+
return
405+
}
406+
}
323407
}
324408
}

Sources/AblyLiveObjects/Internal/DefaultLiveCounter.swift

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,38 @@ internal final class DefaultLiveCounter: LiveCounter {
130130
}
131131
}
132132

133+
/// Test-only method to apply a COUNTER_CREATE operation, per RTLC8.
134+
internal func testsOnly_applyCounterCreateOperation(_ operation: ObjectOperation) {
135+
mutex.withLock {
136+
mutableState.applyCounterCreateOperation(operation, logger: logger)
137+
}
138+
}
139+
140+
/// Test-only method to apply a COUNTER_INC operation, per RTLC9.
141+
internal func testsOnly_applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
142+
mutex.withLock {
143+
mutableState.applyCounterIncOperation(operation)
144+
}
145+
}
146+
147+
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
148+
internal func apply(
149+
_ operation: ObjectOperation,
150+
objectMessageSerial: String?,
151+
objectMessageSiteCode: String?,
152+
objectsPool: inout ObjectsPool,
153+
) {
154+
mutex.withLock {
155+
mutableState.apply(
156+
operation,
157+
objectMessageSerial: objectMessageSerial,
158+
objectMessageSiteCode: objectMessageSiteCode,
159+
objectsPool: &objectsPool,
160+
logger: logger,
161+
)
162+
}
163+
}
164+
133165
// MARK: - Mutable state and the operations that affect it
134166

135167
private struct MutableState {
@@ -165,5 +197,63 @@ internal final class DefaultLiveCounter: LiveCounter {
165197
// RTLC10b: Set the private flag createOperationIsMerged to true
166198
liveObject.createOperationIsMerged = true
167199
}
200+
201+
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
202+
internal mutating func apply(
203+
_ operation: ObjectOperation,
204+
objectMessageSerial: String?,
205+
objectMessageSiteCode: String?,
206+
objectsPool: inout ObjectsPool,
207+
logger: Logger,
208+
) {
209+
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
210+
// RTLC7b
211+
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
212+
return
213+
}
214+
215+
// RTLC7c
216+
liveObject.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
217+
218+
switch operation.action {
219+
case .known(.counterCreate):
220+
// RTLC7d1
221+
applyCounterCreateOperation(
222+
operation,
223+
logger: logger,
224+
)
225+
case .known(.counterInc):
226+
// RTLC7d2
227+
applyCounterIncOperation(operation.counterOp)
228+
default:
229+
// RTLC7d3
230+
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
231+
}
232+
}
233+
234+
/// Applies a `COUNTER_CREATE` operation, per RTLC8.
235+
internal mutating func applyCounterCreateOperation(
236+
_ operation: ObjectOperation,
237+
logger: Logger,
238+
) {
239+
if liveObject.createOperationIsMerged {
240+
// RTLC8b
241+
logger.log("Not applying COUNTER_CREATE because a COUNTER_CREATE has already been applied", level: .warn)
242+
return
243+
}
244+
245+
// RTLC8c
246+
mergeInitialValue(from: operation)
247+
}
248+
249+
/// Applies a `COUNTER_INC` operation, per RTLC9.
250+
internal mutating func applyCounterIncOperation(_ operation: WireObjectsCounterOp?) {
251+
guard let operation else {
252+
return
253+
}
254+
255+
// RTLC9b
256+
data += operation.amount.doubleValue
257+
}
168258
}
169259
}

Sources/AblyLiveObjects/Internal/DefaultLiveMap.swift

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,39 @@ internal final class DefaultLiveMap: LiveMap {
257257
}
258258
}
259259

260+
/// Test-only method to apply a MAP_CREATE operation, per RTLM16.
261+
internal func testsOnly_applyMapCreateOperation(_ operation: ObjectOperation, objectsPool: inout ObjectsPool) {
262+
mutex.withLock {
263+
mutableState.applyMapCreateOperation(
264+
operation,
265+
objectsPool: &objectsPool,
266+
mapDelegate: delegate.referenced,
267+
coreSDK: coreSDK,
268+
logger: logger,
269+
)
270+
}
271+
}
272+
273+
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15.
274+
internal func apply(
275+
_ operation: ObjectOperation,
276+
objectMessageSerial: String?,
277+
objectMessageSiteCode: String?,
278+
objectsPool: inout ObjectsPool,
279+
) {
280+
mutex.withLock {
281+
mutableState.apply(
282+
operation,
283+
objectMessageSerial: objectMessageSerial,
284+
objectMessageSiteCode: objectMessageSiteCode,
285+
objectsPool: &objectsPool,
286+
mapDelegate: delegate.referenced,
287+
coreSDK: coreSDK,
288+
logger: logger,
289+
)
290+
}
291+
}
292+
260293
/// Applies a `MAP_SET` operation to a key, per RTLM7.
261294
///
262295
/// This is currently exposed just so that the tests can test RTLM7 without having to go through a convoluted replaceData(…) call, but I _think_ that it's going to be used in further contexts when we introduce the handling of incoming object operations in a future spec PR.
@@ -372,6 +405,71 @@ internal final class DefaultLiveMap: LiveMap {
372405
liveObject.createOperationIsMerged = true
373406
}
374407

408+
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLM15.
409+
internal mutating func apply(
410+
_ operation: ObjectOperation,
411+
objectMessageSerial: String?,
412+
objectMessageSiteCode: String?,
413+
objectsPool: inout ObjectsPool,
414+
mapDelegate: LiveMapObjectPoolDelegate?,
415+
coreSDK: CoreSDK,
416+
logger: Logger,
417+
) {
418+
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
419+
// RTLM15b
420+
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
421+
return
422+
}
423+
424+
// RTLM15c
425+
liveObject.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
426+
427+
switch operation.action {
428+
case .known(.mapCreate):
429+
// RTLM15d1
430+
applyMapCreateOperation(
431+
operation,
432+
objectsPool: &objectsPool,
433+
mapDelegate: mapDelegate,
434+
coreSDK: coreSDK,
435+
logger: logger,
436+
)
437+
case .known(.mapSet):
438+
guard let mapOp = operation.mapOp else {
439+
logger.log("Could not apply MAP_SET since operation.mapOp is missing", level: .warn)
440+
return
441+
}
442+
guard let data = mapOp.data else {
443+
logger.log("Could not apply MAP_SET since operation.data is missing", level: .warn)
444+
return
445+
}
446+
447+
// RTLM15d2
448+
applyMapSetOperation(
449+
key: mapOp.key,
450+
operationTimeserial: applicableOperation.objectMessageSerial,
451+
operationData: data,
452+
objectsPool: &objectsPool,
453+
mapDelegate: mapDelegate,
454+
coreSDK: coreSDK,
455+
logger: logger,
456+
)
457+
case .known(.mapRemove):
458+
guard let mapOp = operation.mapOp else {
459+
return
460+
}
461+
462+
// RTLM15d3
463+
applyMapRemoveOperation(
464+
key: mapOp.key,
465+
operationTimeserial: applicableOperation.objectMessageSerial,
466+
)
467+
default:
468+
// RTLM15d4
469+
logger.log("Operation \(operation) has unsupported action for LiveMap; discarding", level: .warn)
470+
}
471+
}
472+
375473
/// Applies a `MAP_SET` operation to a key, per RTLM7.
376474
internal mutating func applyMapSetOperation(
377475
key: String,
@@ -477,6 +575,32 @@ internal final class DefaultLiveMap: LiveMap {
477575
false
478576
}
479577
}
578+
579+
/// Applies a `MAP_CREATE` operation, per RTLM16.
580+
internal mutating func applyMapCreateOperation(
581+
_ operation: ObjectOperation,
582+
objectsPool: inout ObjectsPool,
583+
mapDelegate: LiveMapObjectPoolDelegate?,
584+
coreSDK: CoreSDK,
585+
logger: AblyPlugin.Logger,
586+
) {
587+
if liveObject.createOperationIsMerged {
588+
// RTLM16b
589+
logger.log("Not applying MAP_CREATE because a MAP_CREATE has already been applied", level: .warn)
590+
return
591+
}
592+
593+
// TODO: RTLM16c `semantics` comparison; outstanding question in https://github.com/ably/specification/pull/343/files#r2192784482
594+
595+
// RTLM16d
596+
mergeInitialValue(
597+
from: operation,
598+
objectsPool: &objectsPool,
599+
mapDelegate: mapDelegate,
600+
coreSDK: coreSDK,
601+
logger: logger,
602+
)
603+
}
480604
}
481605

482606
// MARK: - Helper Methods

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@ internal struct ObjectsPool {
2828
counter
2929
}
3030
}
31+
32+
/// Applies an operation to a LiveObject, per RTO9a2a3.
33+
internal func apply(
34+
_ operation: ObjectOperation,
35+
objectMessageSerial: String?,
36+
objectMessageSiteCode: String?,
37+
objectsPool: inout ObjectsPool,
38+
) {
39+
switch self {
40+
case let .map(map):
41+
map.apply(
42+
operation,
43+
objectMessageSerial: objectMessageSerial,
44+
objectMessageSiteCode: objectMessageSiteCode,
45+
objectsPool: &objectsPool,
46+
)
47+
case let .counter(counter):
48+
counter.apply(
49+
operation,
50+
objectMessageSerial: objectMessageSerial,
51+
objectMessageSiteCode: objectMessageSiteCode,
52+
objectsPool: &objectsPool,
53+
)
54+
}
55+
}
3156
}
3257

3358
/// Keyed by `objectId`.

0 commit comments

Comments
 (0)