Skip to content

Commit 480687c

Browse files
Perform garbage collection per RTO10
Have not yet implemented RTO10b2's server-specified grace period; have split this work into #32. TODO note based on spec at 488e932
1 parent 2b13132 commit 480687c

File tree

4 files changed

+137
-1
lines changed

4 files changed

+137
-1
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
216216
}
217217
}
218218

219+
/// Returns the object's RTLO3e `tombstonedAt` property.
220+
internal var tombstonedAt: Date? {
221+
mutex.withLock {
222+
mutableState.liveObjectMutableState.tombstonedAt
223+
}
224+
}
225+
219226
// MARK: - Mutable state and the operations that affect it
220227

221228
private struct MutableState: InternalLiveObject {

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,13 @@ internal final class InternalDefaultLiveMap: Sendable {
358358
}
359359
}
360360

361+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
362+
internal func releaseTombstonedEntries(gracePeriod: TimeInterval, clock: SimpleClock) {
363+
mutex.withLock {
364+
mutableState.releaseTombstonedEntries(gracePeriod: gracePeriod, logger: logger, clock: clock)
365+
}
366+
}
367+
361368
// MARK: - LiveObject
362369

363370
/// Returns the object's RTLO3d `isTombstone` property.
@@ -367,6 +374,13 @@ internal final class InternalDefaultLiveMap: Sendable {
367374
}
368375
}
369376

377+
/// Returns the object's RTLO3e `tombstonedAt` property.
378+
internal var tombstonedAt: Date? {
379+
mutex.withLock {
380+
mutableState.liveObjectMutableState.tombstonedAt
381+
}
382+
}
383+
370384
// MARK: - Mutable state and the operations that affect it
371385

372386
private struct MutableState: InternalLiveObject {
@@ -772,6 +786,29 @@ internal final class InternalDefaultLiveMap: Sendable {
772786
// RTLM4
773787
data = [:]
774788
}
789+
790+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
791+
internal mutating func releaseTombstonedEntries(
792+
gracePeriod: TimeInterval,
793+
logger: Logger,
794+
clock: SimpleClock,
795+
) {
796+
let now = clock.now
797+
798+
// RTLM19a, RTLM19a1
799+
data = data.filter { key, entry in
800+
let shouldRelease = {
801+
guard let tombstonedAt = entry.tombstonedAt else {
802+
return false
803+
}
804+
805+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
806+
}()
807+
808+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
809+
return !shouldRelease
810+
}
811+
}
775812
}
776813

777814
// MARK: - Helper Methods

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,26 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
1818
private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]>
1919
private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation
2020

21+
/// The RTO10a interval at which we will perform garbage collection.
22+
private let garbageCollectionInterval: TimeInterval
23+
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
24+
private nonisolated(unsafe) var garbageCollectionGracePeriod: TimeInterval
25+
// The task that runs the periodic garbage collection described in RTO10.
26+
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!
27+
28+
/// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10.
29+
internal struct GarbageCollectionOptions {
30+
/// The RTO10a interval at which we will perform garbage collection.
31+
///
32+
/// The default value comes from the suggestion in RTO10a.
33+
internal var interval: TimeInterval = 5 * 60
34+
35+
/// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `gcGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime.
36+
///
37+
/// This default value comes from RTO10b3; can be overridden for testing.
38+
internal var gracePeriod: TimeInterval = 24 * 60 * 60
39+
}
40+
2141
internal var testsOnly_objectsPool: ObjectsPool {
2242
mutex.withLock {
2343
mutableState.objectsPool
@@ -71,14 +91,38 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
7191
}
7292
}
7393

74-
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock) {
94+
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock, garbageCollectionOptions: GarbageCollectionOptions = .init()) {
7595
self.logger = logger
7696
self.userCallbackQueue = userCallbackQueue
7797
self.clock = clock
7898
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
7999
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
80100
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
81101
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
102+
garbageCollectionInterval = garbageCollectionOptions.interval
103+
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
104+
105+
garbageCollectionTask = Task { [weak self, garbageCollectionInterval] in
106+
do {
107+
while true {
108+
logger.log("Will perform garbage collection in \(garbageCollectionInterval)s", level: .debug)
109+
try await Task.sleep(nanoseconds: UInt64(garbageCollectionInterval) * NSEC_PER_SEC)
110+
111+
guard let self else {
112+
return
113+
}
114+
115+
performGarbageCollection()
116+
}
117+
} catch {
118+
precondition(error is CancellationError)
119+
logger.log("Garbage collection task terminated due to cancellation", level: .debug)
120+
}
121+
}
122+
}
123+
124+
deinit {
125+
garbageCollectionTask.cancel()
82126
}
83127

84128
// MARK: - LiveMapObjectPoolDelegate
@@ -217,6 +261,19 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
217261
try await coreSDK.sendObject(objectMessages: objectMessages)
218262
}
219263

264+
// MARK: - Garbage collection of deleted objects and map entries
265+
266+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
267+
internal func performGarbageCollection() {
268+
mutex.withLock {
269+
mutableState.objectsPool.performGarbageCollection(
270+
gracePeriod: garbageCollectionGracePeriod,
271+
clock: clock,
272+
logger: logger,
273+
)
274+
}
275+
}
276+
220277
// MARK: - Testing
221278

222279
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ internal struct ObjectsPool {
114114
map.isTombstone
115115
}
116116
}
117+
118+
internal var tombstonedAt: Date? {
119+
switch self {
120+
case let .counter(counter):
121+
counter.tombstonedAt
122+
case let .map(map):
123+
map.tombstonedAt
124+
}
125+
}
117126
}
118127

119128
/// Keyed by `objectId`.
@@ -308,4 +317,30 @@ internal struct ObjectsPool {
308317
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458. I believe that the answer is that we should just clear its data but the spec point needs to be clearer, see https://github.com/ably/specification/pull/346/files#r2201434895.
309318
root.resetData()
310319
}
320+
321+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
322+
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
323+
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)
324+
325+
let now = clock.now
326+
327+
entries = entries.filter { key, entry in
328+
if case let .map(map) = entry {
329+
// RTO10c1a
330+
map.releaseTombstonedEntries(gracePeriod: gracePeriod, clock: clock)
331+
}
332+
333+
// RTO10c1b
334+
let shouldRelease = {
335+
guard let tombstonedAt = entry.tombstonedAt else {
336+
return false
337+
}
338+
339+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
340+
}()
341+
342+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
343+
return !shouldRelease
344+
}
345+
}
311346
}

0 commit comments

Comments
 (0)