Skip to content

Commit e139e76

Browse files
Perform garbage collection per RTO10
Have not yet implemented RTO10b2's server-specified grace period; have split this work into #32. TODO note based on spec at 488e932
1 parent 0bba3a7 commit e139e76

File tree

4 files changed

+137
-1
lines changed

4 files changed

+137
-1
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
242242
}
243243
}
244244

245+
/// Returns the object's RTLO3e `tombstonedAt` property.
246+
internal var tombstonedAt: Date? {
247+
mutex.withLock {
248+
mutableState.liveObjectMutableState.tombstonedAt
249+
}
250+
}
251+
245252
// MARK: - Mutable state and the operations that affect it
246253

247254
private struct MutableState: InternalLiveObject {

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,13 @@ internal final class InternalDefaultLiveMap: Sendable {
392392
}
393393
}
394394

395+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
396+
internal func releaseTombstonedEntries(gracePeriod: TimeInterval, clock: SimpleClock) {
397+
mutex.withLock {
398+
mutableState.releaseTombstonedEntries(gracePeriod: gracePeriod, logger: logger, clock: clock)
399+
}
400+
}
401+
395402
// MARK: - LiveObject
396403

397404
/// Returns the object's RTLO3d `isTombstone` property.
@@ -401,6 +408,13 @@ internal final class InternalDefaultLiveMap: Sendable {
401408
}
402409
}
403410

411+
/// Returns the object's RTLO3e `tombstonedAt` property.
412+
internal var tombstonedAt: Date? {
413+
mutex.withLock {
414+
mutableState.liveObjectMutableState.tombstonedAt
415+
}
416+
}
417+
404418
// MARK: - Mutable state and the operations that affect it
405419

406420
private struct MutableState: InternalLiveObject {
@@ -806,6 +820,29 @@ internal final class InternalDefaultLiveMap: Sendable {
806820
// RTLM4
807821
data = [:]
808822
}
823+
824+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
825+
internal mutating func releaseTombstonedEntries(
826+
gracePeriod: TimeInterval,
827+
logger: Logger,
828+
clock: SimpleClock,
829+
) {
830+
let now = clock.now
831+
832+
// RTLM19a, RTLM19a1
833+
data = data.filter { key, entry in
834+
let shouldRelease = {
835+
guard let tombstonedAt = entry.tombstonedAt else {
836+
return false
837+
}
838+
839+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
840+
}()
841+
842+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
843+
return !shouldRelease
844+
}
845+
}
809846
}
810847

811848
// MARK: - Helper Methods

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,26 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
1818
private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]>
1919
private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation
2020

21+
/// The RTO10a interval at which we will perform garbage collection.
22+
private let garbageCollectionInterval: TimeInterval
23+
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
24+
private nonisolated(unsafe) var garbageCollectionGracePeriod: TimeInterval
25+
// The task that runs the periodic garbage collection described in RTO10.
26+
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!
27+
28+
/// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10.
29+
internal struct GarbageCollectionOptions {
30+
/// The RTO10a interval at which we will perform garbage collection.
31+
///
32+
/// The default value comes from the suggestion in RTO10a.
33+
internal var interval: TimeInterval = 5 * 60
34+
35+
/// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `gcGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime.
36+
///
37+
/// This default value comes from RTO10b3; can be overridden for testing.
38+
internal var gracePeriod: TimeInterval = 24 * 60 * 60
39+
}
40+
2141
internal var testsOnly_objectsPool: ObjectsPool {
2242
mutex.withLock {
2343
mutableState.objectsPool
@@ -71,14 +91,38 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
7191
}
7292
}
7393

74-
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock) {
94+
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock, garbageCollectionOptions: GarbageCollectionOptions = .init()) {
7595
self.logger = logger
7696
self.userCallbackQueue = userCallbackQueue
7797
self.clock = clock
7898
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
7999
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
80100
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
81101
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
102+
garbageCollectionInterval = garbageCollectionOptions.interval
103+
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
104+
105+
garbageCollectionTask = Task { [weak self, garbageCollectionInterval] in
106+
do {
107+
while true {
108+
logger.log("Will perform garbage collection in \(garbageCollectionInterval)s", level: .debug)
109+
try await Task.sleep(nanoseconds: UInt64(garbageCollectionInterval) * NSEC_PER_SEC)
110+
111+
guard let self else {
112+
return
113+
}
114+
115+
performGarbageCollection()
116+
}
117+
} catch {
118+
precondition(error is CancellationError)
119+
logger.log("Garbage collection task terminated due to cancellation", level: .debug)
120+
}
121+
}
122+
}
123+
124+
deinit {
125+
garbageCollectionTask.cancel()
82126
}
83127

84128
// MARK: - LiveMapObjectPoolDelegate
@@ -297,6 +341,19 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
297341
try await coreSDK.publish(objectMessages: objectMessages)
298342
}
299343

344+
// MARK: - Garbage collection of deleted objects and map entries
345+
346+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
347+
internal func performGarbageCollection() {
348+
mutex.withLock {
349+
mutableState.objectsPool.performGarbageCollection(
350+
gracePeriod: garbageCollectionGracePeriod,
351+
clock: clock,
352+
logger: logger,
353+
)
354+
}
355+
}
356+
300357
// MARK: - Testing
301358

302359
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ internal struct ObjectsPool {
114114
map.isTombstone
115115
}
116116
}
117+
118+
internal var tombstonedAt: Date? {
119+
switch self {
120+
case let .counter(counter):
121+
counter.tombstonedAt
122+
case let .map(map):
123+
map.tombstonedAt
124+
}
125+
}
117126
}
118127

119128
/// Keyed by `objectId`.
@@ -399,4 +408,30 @@ internal struct ObjectsPool {
399408
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458. I believe that the answer is that we should just clear its data but the spec point needs to be clearer, see https://github.com/ably/specification/pull/346/files#r2201434895.
400409
root.resetData()
401410
}
411+
412+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
413+
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
414+
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)
415+
416+
let now = clock.now
417+
418+
entries = entries.filter { key, entry in
419+
if case let .map(map) = entry {
420+
// RTO10c1a
421+
map.releaseTombstonedEntries(gracePeriod: gracePeriod, clock: clock)
422+
}
423+
424+
// RTO10c1b
425+
let shouldRelease = {
426+
guard let tombstonedAt = entry.tombstonedAt else {
427+
return false
428+
}
429+
430+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
431+
}()
432+
433+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
434+
return !shouldRelease
435+
}
436+
}
402437
}

0 commit comments

Comments
 (0)