Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
objectMessageSerialTimestamp: Date?,
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
mutex.withLock {
mutableState.replaceData(using: state, objectMessageSerialTimestamp: objectMessageSerialTimestamp)
mutableState.replaceData(
using: state,
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
logger: logger,
clock: clock,
)
}
}

Expand Down Expand Up @@ -191,11 +196,28 @@ internal final class InternalDefaultLiveCounter: Sendable {
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
objectsPool: &objectsPool,
logger: logger,
clock: clock,
userCallbackQueue: userCallbackQueue,
)
}
}

// MARK: - LiveObject

/// Returns the object's RTLO3d `isTombstone` property.
internal var isTombstone: Bool {
mutex.withLock {
mutableState.liveObjectMutableState.isTombstone
}
}

/// Returns the object's RTLO3e `tombstonedAt` property.
internal var tombstonedAt: Date? {
mutex.withLock {
mutableState.liveObjectMutableState.tombstonedAt
}
}

// MARK: - Mutable state and the operations that affect it

private struct MutableState: InternalLiveObject {
Expand All @@ -212,10 +234,31 @@ internal final class InternalDefaultLiveCounter: Sendable {
internal mutating func replaceData(
using state: ObjectState,
objectMessageSerialTimestamp: Date?,
logger: Logger,
clock: SimpleClock,
) -> LiveObjectUpdate<DefaultLiveCounterUpdate> {
// RTLC6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
liveObjectMutableState.siteTimeserials = state.siteTimeserials

// RTLC6e, RTLC6e1: No-op if we're already tombstone
if liveObjectMutableState.isTombstone {
return .noop
}

// RTLC6f: Tombstone if state indicates tombstoned
if state.tombstone {
let dataBeforeTombstoning = data

tombstone(
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
logger: logger,
clock: clock,
)

// RTLC6f1
return .update(.init(amount: -dataBeforeTombstoning))
}

// RTLC6b: Set the private flag createOperationIsMerged to false
liveObjectMutableState.createOperationIsMerged = false

Expand Down Expand Up @@ -259,6 +302,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
logger: Logger,
clock: SimpleClock,
userCallbackQueue: DispatchQueue,
) {
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
Expand All @@ -270,6 +314,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
// RTLC7c
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial

// RTLC7e
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
if liveObjectMutableState.isTombstone {
return
}

switch operation.action {
case .known(.counterCreate):
// RTLC7d1
Expand All @@ -284,6 +334,18 @@ internal final class InternalDefaultLiveCounter: Sendable {
let update = applyCounterIncOperation(operation.counterOp)
// RTLC7d2a
liveObjectMutableState.emit(update, on: userCallbackQueue)
case .known(.objectDelete):
let dataBeforeApplyingOperation = data

// RTLC7d4
applyObjectDeleteOperation(
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
logger: logger,
clock: clock,
)

// RTLC7d4a
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
default:
// RTLC7d3
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
Expand Down Expand Up @@ -317,5 +379,11 @@ internal final class InternalDefaultLiveCounter: Sendable {
data += amount
return .update(DefaultLiveCounterUpdate(amount: amount))
}

/// Needed for ``InternalLiveObject`` conformance.
mutating func resetDataToZeroValued() {
// RTLC4
data = 0
}
}
}
Loading