Skip to content

Commit fe7503c

Browse files
Perform garbage collection per RTO10
Based on spec referenced in 133c85b; same comment re testing applies too. Have not yet implemented RTO10b2's server-specified grace period; have split this work into #32.
1 parent ea1790f commit fe7503c

File tree

4 files changed

+137
-1
lines changed

4 files changed

+137
-1
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
211211
}
212212
}
213213

214+
/// Returns the object's RTLO3e `tombstonedAt` property.
215+
internal var tombstonedAt: Date? {
216+
mutex.withLock {
217+
mutableState.liveObjectMutableState.tombstonedAt
218+
}
219+
}
220+
214221
// MARK: - Mutable state and the operations that affect it
215222

216223
private struct MutableState: InternalLiveObject {

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,13 @@ internal final class InternalDefaultLiveMap: Sendable {
339339
}
340340
}
341341

342+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
343+
internal func releaseTombstonedEntries(gracePeriod: TimeInterval, clock: SimpleClock) {
344+
mutex.withLock {
345+
mutableState.releaseTombstonedEntries(gracePeriod: gracePeriod, logger: logger, clock: clock)
346+
}
347+
}
348+
342349
// MARK: - LiveObject
343350

344351
/// Returns the object's RTLO3d `isTombstone` property.
@@ -348,6 +355,13 @@ internal final class InternalDefaultLiveMap: Sendable {
348355
}
349356
}
350357

358+
/// Returns the object's RTLO3e `tombstonedAt` property.
359+
internal var tombstonedAt: Date? {
360+
mutex.withLock {
361+
mutableState.liveObjectMutableState.tombstonedAt
362+
}
363+
}
364+
351365
// MARK: - Mutable state and the operations that affect it
352366

353367
private struct MutableState: InternalLiveObject {
@@ -753,6 +767,29 @@ internal final class InternalDefaultLiveMap: Sendable {
753767
// RTLM4
754768
data = [:]
755769
}
770+
771+
/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
772+
internal mutating func releaseTombstonedEntries(
773+
gracePeriod: TimeInterval,
774+
logger: Logger,
775+
clock: SimpleClock,
776+
) {
777+
let now = clock.now
778+
779+
// RTLM19a, RTLM19a1
780+
data = data.filter { key, entry in
781+
let shouldRelease = {
782+
guard let tombstonedAt = entry.tombstonedAt else {
783+
return false
784+
}
785+
786+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
787+
}()
788+
789+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
790+
return !shouldRelease
791+
}
792+
}
756793
}
757794

758795
// MARK: - Helper Methods

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,26 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
1818
private let receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]>
1919
private let receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation
2020

21+
/// The RTO10a interval at which we will perform garbage collection.
22+
private let garbageCollectionInterval: TimeInterval
23+
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
24+
private nonisolated(unsafe) var garbageCollectionGracePeriod: TimeInterval
25+
// The task that runs the periodic garbage collection described in RTO10.
26+
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!
27+
28+
/// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10.
29+
internal struct GarbageCollectionOptions {
30+
/// The RTO10a interval at which we will perform garbage collection.
31+
///
32+
/// The default value comes from the suggestion in RTO10a.
33+
internal var interval: TimeInterval = 5 * 60
34+
35+
/// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `gcGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime.
36+
///
37+
/// This default value comes from RTO10b3; can be overridden for testing.
38+
internal var gracePeriod: TimeInterval = 24 * 60 * 60
39+
}
40+
2141
internal var testsOnly_objectsPool: ObjectsPool {
2242
mutex.withLock {
2343
mutableState.objectsPool
@@ -71,14 +91,38 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
7191
}
7292
}
7393

74-
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock) {
94+
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue, clock: SimpleClock, garbageCollectionOptions: GarbageCollectionOptions = .init()) {
7595
self.logger = logger
7696
self.userCallbackQueue = userCallbackQueue
7797
self.clock = clock
7898
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
7999
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
80100
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
81101
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
102+
garbageCollectionInterval = garbageCollectionOptions.interval
103+
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
104+
105+
garbageCollectionTask = Task { [weak self, garbageCollectionInterval] in
106+
do {
107+
while true {
108+
logger.log("Will perform garbage collection in \(garbageCollectionInterval)s", level: .debug)
109+
try await Task.sleep(nanoseconds: UInt64(garbageCollectionInterval) * NSEC_PER_SEC)
110+
111+
guard let self else {
112+
return
113+
}
114+
115+
performGarbageCollection()
116+
}
117+
} catch {
118+
precondition(error is CancellationError)
119+
logger.log("Garbage collection task terminated due to cancellation", level: .debug)
120+
}
121+
}
122+
}
123+
124+
deinit {
125+
garbageCollectionTask.cancel()
82126
}
83127

84128
// MARK: - LiveMapObjectPoolDelegate
@@ -210,6 +254,19 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
210254
try await coreSDK.publish(objectMessages: objectMessages)
211255
}
212256

257+
// MARK: - Garbage collection of deleted objects and map entries
258+
259+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
260+
internal func performGarbageCollection() {
261+
mutex.withLock {
262+
mutableState.objectsPool.performGarbageCollection(
263+
gracePeriod: garbageCollectionGracePeriod,
264+
clock: clock,
265+
logger: logger,
266+
)
267+
}
268+
}
269+
213270
// MARK: - Testing
214271

215272
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ internal struct ObjectsPool {
114114
map.isTombstone
115115
}
116116
}
117+
118+
internal var tombstonedAt: Date? {
119+
switch self {
120+
case let .counter(counter):
121+
counter.tombstonedAt
122+
case let .map(map):
123+
map.tombstonedAt
124+
}
125+
}
117126
}
118127

119128
/// Keyed by `objectId`.
@@ -308,4 +317,30 @@ internal struct ObjectsPool {
308317
// TODO: this one is unclear (are we meant to replace the root or just clear its data?) https://github.com/ably/specification/pull/333/files#r2183493458. I believe that the answer is that we should just clear its data but the spec point needs to be clearer, see https://github.com/ably/specification/pull/346/files#r2201434895.
309318
root.resetData()
310319
}
320+
321+
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
322+
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
323+
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)
324+
325+
let now = clock.now
326+
327+
entries = entries.filter { key, entry in
328+
if case let .map(map) = entry {
329+
// RTO10c1a
330+
map.releaseTombstonedEntries(gracePeriod: gracePeriod, clock: clock)
331+
}
332+
333+
// RTO10c1b
334+
let shouldRelease = {
335+
guard let tombstonedAt = entry.tombstonedAt else {
336+
return false
337+
}
338+
339+
return now.timeIntervalSince(tombstonedAt) >= gracePeriod
340+
}()
341+
342+
logger.log("Releasing tombstoned entry \(entry) for key \(key)", level: .debug)
343+
return !shouldRelease
344+
}
345+
}
311346
}

0 commit comments

Comments
 (0)