Skip to content

Commit efa88b2

Browse files
Added the ability for a persistence to auto-prune snapshot iterations and assets according to the specified retention policy
1 parent 231a9fb commit efa88b2

10 files changed

+660
-15
lines changed

README.md

+2-3
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,7 @@ Note that in the example above, even though the author is persisted first, if an
261261

262262
As this project matures towards release, the project will focus on the functionality and work listed below:
263263
- Force migration methods
264-
- Composite indexes (via macros?)
265-
- Cleaning up old resources on disk
264+
- Composite indexes
266265
- Ranged deletes
267266
- Controls for the edit history
268267
- Helper types to use with SwiftUI/Observability/Combine that can make data available on the main actor and filter and stay up to date
@@ -271,7 +270,7 @@ As this project matures towards release, the project will focus on the functiona
271270
- An example app
272271
- A memory persistence useful for testing apps with
273272
- A pre-configured data store tuned to storing pure Data, useful for types like Images
274-
- Cleaning up memory leaks
273+
- Cleaning up memory and file descriptor leaks
275274

276275
The above list will be kept up to date during development and will likely see additions during that process.
277276

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift

-11
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,6 @@ extension DiskPersistence.Datastore.Index {
9898
case .secondary(let index, let manifest): self = .secondary(index: index, manifest: manifest)
9999
}
100100
}
101-
102-
init(_ id: DatastoreRootManifest.IndexManifestID) {
103-
switch id {
104-
case .primary(let manifest):
105-
self = .primary(manifest: manifest)
106-
case .direct(let index, let manifest):
107-
self = .direct(index: index, manifest: manifest)
108-
case .secondary(let index, let manifest):
109-
self = .secondary(index: index, manifest: manifest)
110-
}
111-
}
112101
}
113102
}
114103

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndexManifest.swift

+9
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ extension DatastoreIndexManifest {
8686
}
8787
}
8888

89+
extension DatastoreIndexManifest {
90+
func pagesToPrune(for mode: SnapshotPruneMode) -> Set<DatastorePageIdentifier> {
91+
switch mode {
92+
case .pruneRemoved: Set(removedPageIDs)
93+
case .pruneAdded: Set(addedPageIDs)
94+
}
95+
}
96+
}
97+
8998
// MARK: - Decoding
9099

91100
extension DatastoreIndexManifest {

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreRootManifest.swift

+22
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,25 @@ extension DatastoreRootManifest {
101101
}
102102
}
103103
}
104+
105+
extension DatastoreRootManifest {
106+
func indexesToPrune(for mode: SnapshotPruneMode) -> Set<IndexID> {
107+
switch mode {
108+
case .pruneRemoved: removedIndexes
109+
case .pruneAdded: addedIndexes
110+
}
111+
}
112+
113+
func indexManifestsToPrune(
114+
for mode: SnapshotPruneMode,
115+
options: SnapshotPruneOptions
116+
) -> Set<IndexManifestID> {
117+
switch (mode, options) {
118+
case (.pruneRemoved, .pruneAndDelete): removedIndexManifests
119+
case (.pruneAdded, .pruneAndDelete): addedIndexManifests
120+
/// Flip the results when we aren't deleting, but only when removing from the bottom end.
121+
case (.pruneRemoved, .pruneOnly): addedIndexManifests
122+
case (.pruneAdded, .pruneOnly): []
123+
}
124+
}
125+
}

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/PersistenceDatastore.swift

+104
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,110 @@ extension DiskPersistence.Datastore {
156156
}
157157
}
158158

159+
func pruneRootObject(with identifier: RootObject.ID, mode: SnapshotPruneMode, shouldDelete: Bool) async throws {
160+
let fileManager = FileManager()
161+
let rootObject = try loadRootObject(for: identifier, shouldCache: false)
162+
// let rootObject = trackedRootObjects[identifier]?.value ?? RootObject(datastore: self, id: identifier, rootObject: rootObjectManifest) // TODO: Investigate why this is commented out
163+
164+
/// Collect the indexes and related manifests we'll be deleting.
165+
/// - For indexes, only collect the ones we'll be deleting since the ones we are keeping won't be making references to other deletable assets.
166+
/// - For the manifests, we'll be deleting the entries that are being removed (relative to the direction we are removing from, so the removed ones from the oldest edge, and the added ones from the newest edge, as determined by the caller), while we'll be checking for pages to remove from entries that have just been added, but only when removing from the oldest edge. We only do this for the oldest edge because pages that have been "removed" from the newest edge are actually being _restored_ and not replaced, which maintains symmetry in a non-obvious way.
167+
let indexesToPruneAndDelete = rootObject.indexesToPrune(for: mode)
168+
let indexManifestsToPruneAndDelete = rootObject.indexManifestsToPrune(for: mode, options: .pruneAndDelete)
169+
let indexManifestsToPrune = rootObject.indexManifestsToPrune(for: mode, options: .pruneOnly)
170+
171+
/// Delete the index manifests and pages we know to be removed.
172+
for indexManifestID in indexManifestsToPruneAndDelete {
173+
let indexID = Index.ID(indexManifestID)
174+
defer {
175+
trackedIndexes.removeValue(forKey: indexID)
176+
loadedIndexes.remove(indexID)
177+
}
178+
/// Skip any manifests for indexes being deleted, since we'll just unlink the whole directory in that case.
179+
guard !indexesToPruneAndDelete.contains(indexID.indexID) else { continue }
180+
181+
let manifestURL = manifestURL(for: indexID)
182+
let manifest: DatastoreIndexManifest?
183+
do {
184+
manifest = try await DatastoreIndexManifest(contentsOf: manifestURL, id: indexID.manifestID)
185+
} catch URLError.fileDoesNotExist, CocoaError.fileReadNoSuchFile, CocoaError.fileNoSuchFile, POSIXError.ENOENT {
186+
manifest = nil
187+
} catch {
188+
print("Uncaught Manifest Error: \(error)")
189+
throw error
190+
}
191+
192+
guard let manifest else { continue }
193+
194+
/// Only delete the pages we know to be removed
195+
let pagesToPruneAndDelete = manifest.pagesToPrune(for: mode)
196+
for pageID in pagesToPruneAndDelete {
197+
let indexedPageID = Page.ID(index: indexID, page: pageID)
198+
defer {
199+
trackedPages.removeValue(forKey: indexedPageID.withoutManifest)
200+
loadedPages.remove(indexedPageID.withoutManifest)
201+
}
202+
203+
let pageURL = pageURL(for: indexedPageID)
204+
205+
try? fileManager.removeItem(at: pageURL)
206+
try? fileManager.removeDirectoryIfEmpty(url: pageURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
207+
}
208+
209+
try? fileManager.removeItem(at: manifestURL)
210+
}
211+
212+
/// Prune the index manifests that were just added, as they themselves refer to other deleted pages.
213+
for indexManifestID in indexManifestsToPrune {
214+
let indexID = Index.ID(indexManifestID)
215+
/// Skip any manifests for indexes being deleted, since we'll just unlink the whole directory in that case.
216+
guard !indexesToPruneAndDelete.contains(indexID.indexID) else { continue }
217+
218+
let manifestURL = manifestURL(for: indexID)
219+
let manifest: DatastoreIndexManifest?
220+
do {
221+
manifest = try await DatastoreIndexManifest(contentsOf: manifestURL, id: indexID.manifestID)
222+
} catch URLError.fileDoesNotExist, CocoaError.fileReadNoSuchFile, CocoaError.fileNoSuchFile, POSIXError.ENOENT {
223+
manifest = nil
224+
} catch {
225+
print("Uncaught Manifest Error: \(error)")
226+
throw error
227+
}
228+
229+
guard let manifest else { continue }
230+
231+
/// Only delete the pages we know to be removed
232+
let pagesToPruneAndDelete = manifest.pagesToPrune(for: mode)
233+
for pageID in pagesToPruneAndDelete {
234+
let indexedPageID = Page.ID(index: indexID, page: pageID)
235+
defer {
236+
trackedPages.removeValue(forKey: indexedPageID.withoutManifest)
237+
loadedPages.remove(indexedPageID.withoutManifest)
238+
}
239+
240+
let pageURL = pageURL(for: indexedPageID)
241+
242+
try? fileManager.removeItem(at: pageURL)
243+
try? fileManager.removeDirectoryIfEmpty(url: pageURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
244+
}
245+
}
246+
247+
/// Delete any indexes in their entirety.
248+
for indexID in indexesToPruneAndDelete {
249+
try? fileManager.removeItem(at: indexURL(for: indexID))
250+
}
251+
252+
/// If we are deleting the root object itself, do so at the very end as everything else would have been cleaned up.
253+
if shouldDelete {
254+
trackedRootObjects.removeValue(forKey: identifier)
255+
loadedRootObjects.remove(identifier)
256+
257+
let rootURL = rootURL(for: rootObject.id)
258+
try? fileManager.removeItem(at: rootURL)
259+
try? fileManager.removeDirectoryIfEmpty(url: rootURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
260+
}
261+
}
262+
159263
func index(for identifier: Index.ID) -> Index {
160264
if let index = trackedIndexes[identifier]?.value {
161265
return index

Sources/CodableDatastore/Persistence/Disk Persistence/DiskPersistence.swift

+161-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public actor DiskPersistence<AccessMode: _AccessMode>: Persistence {
2929

3030
var lastTransaction: Transaction?
3131

32+
var _transactionRetentionPolicy: SnapshotRetentionPolicy = .indefinite
33+
34+
var nextSnapshotIterationCandidateToEnforce: (snapshot: Snapshot<ReadWrite>, iteration: SnapshotIteration)?
35+
var snapshotIterationPruningTask: Task<Void, Never>?
36+
3237
/// Shared caches across all snapshots and datastores.
3338
var rollingRootObjectCacheIndex = 0
3439
var rollingRootObjectCache: [Datastore.RootObject] = []
@@ -59,6 +64,10 @@ public actor DiskPersistence<AccessMode: _AccessMode>: Persistence {
5964
storeURL = readOnlyURL
6065
}
6166

67+
deinit {
68+
snapshotIterationPruningTask?.cancel()
69+
}
70+
6271
/// The default URL to use for disk persistences.
6372
static var defaultURL: URL {
6473
// TODO: Make non-throwing: https://github.com/mochidev/CodableDatastore/issues/15
@@ -517,7 +526,7 @@ extension DiskPersistence {
517526
else { throw DiskPersistenceError.cannotWrite }
518527

519528
/// If we are read-write, apply the updated root objects to the snapshot.
520-
try await self.updatingCurrentSnapshot { snapshot in
529+
let (currentSnapshot, persistedIteration) = try await self.updatingCurrentSnapshot { snapshot in
521530
try await snapshot.updatingManifest { manifest, iteration in
522531
iteration.actionName = actionName
523532
iteration.addedDatastoreRoots = addedDatastoreRoots
@@ -529,8 +538,159 @@ extension DiskPersistence {
529538
root: root.id
530539
)
531540
}
541+
return (snapshot, iteration)
542+
}
543+
}
544+
545+
enforceRetentionPolicy(snapshot: currentSnapshot, fromIteration: persistedIteration)
546+
}
547+
}
548+
549+
// MARK: - Retention Policy
550+
551+
extension DiskPersistence where AccessMode == ReadWrite {
552+
/// The current transaction retention policy for snapshot iterations written to disk.
553+
public var transactionRetentionPolicy: SnapshotRetentionPolicy {
554+
get async {
555+
_transactionRetentionPolicy
556+
}
557+
}
558+
559+
/// Update the transaction retention policy for snapshot iterations written to disk.
560+
///
561+
/// - Parameter policy: The new policy to enforce on write.
562+
///
563+
/// - SeeAlso: ``SnapshotRetentionPolicy``.
564+
public func setTransactionRetentionPolicy(_ policy: SnapshotRetentionPolicy) async {
565+
_transactionRetentionPolicy = policy
566+
for (_, snapshot) in snapshots {
567+
await snapshot.setExtendedIterationCacheEnabled(!_transactionRetentionPolicy.isIndefinite)
568+
}
569+
}
570+
571+
/// Enforce the retention policy on the persistence immediately.
572+
///
573+
/// - Note: Transaction retention policies are enforced after ever write transaction, so calling this method directly is often unecessary. However, it can be useful if the user requires disk resources immediately.
574+
public func enforceRetentionPolicy() async {
575+
// TODO: Don't create any snapshots if they don't exist yet
576+
let info = try? await self.readingCurrentSnapshot { snapshot in
577+
try await snapshot.readingManifest { manifest, iteration in
578+
(snapshot: snapshot, iteration: iteration)
579+
}
580+
}
581+
582+
if let (snapshot, iteration) = info {
583+
enforceRetentionPolicy(snapshot: snapshot, fromIteration: iteration)
584+
}
585+
586+
await finishTransactionCleanup()
587+
}
588+
}
589+
590+
extension DiskPersistence {
591+
/// Internal method to envorce the retention policy after a transaction is written.
592+
private func enforceRetentionPolicy(snapshot: Snapshot<ReadWrite>, fromIteration iteration: SnapshotIteration) {
593+
nextSnapshotIterationCandidateToEnforce = (snapshot, iteration)
594+
595+
if let snapshotIterationPruningTask {
596+
/// Update the next snapshot iteration we should be checking, and cancel the existing task so we can move on to checking this iteration.
597+
snapshotIterationPruningTask.cancel()
598+
return
599+
}
600+
601+
/// Update the next snapshot iteration we should be checking, and enqueue a task since we know one isn't currently running.
602+
checkNextSnapshotIterationCandidateForPruning()
603+
}
604+
605+
/// Private method to check the next candidate for pruning.
606+
///
607+
/// First, this method walks down the linked list defining the iteration chain, from newest to oldest, and collects the iterations that should be pruned. Then, it iterates that list in reverse (from oldest to newest) actually removing the iterations as they are encountered.
608+
/// - Note: This method should only ever be called when it is known that no `snapshotIterationPruningTask` is ongoing (it is nil), or when one just finishes.
609+
@discardableResult
610+
private func checkNextSnapshotIterationCandidateForPruning() -> Task<Void, Never>? {
611+
let transactionRetentionPolicy = _transactionRetentionPolicy
612+
let iterationCandidate = nextSnapshotIterationCandidateToEnforce
613+
614+
snapshotIterationPruningTask = nil
615+
nextSnapshotIterationCandidateToEnforce = nil
616+
617+
guard let (snapshot, iteration) = iterationCandidate, !transactionRetentionPolicy.isIndefinite
618+
else { return nil }
619+
620+
snapshotIterationPruningTask = Task.detached(priority: .background) {
621+
await snapshot.setExtendedIterationCacheEnabled(true)
622+
do {
623+
var iterations: [SnapshotIteration] = []
624+
var distance = 1
625+
var mainlineSuccessorIteration = iteration
626+
var currentIteration = iteration
627+
628+
/// First, walk the preceding iteration chain to the oldest iteration we can open, collecting the ones that should be pruned.
629+
while let precedingIterationID = currentIteration.precedingIteration, let precedingIteration = try? await snapshot.loadIteration(for: precedingIterationID) {
630+
try Task.checkCancellation()
631+
632+
if !iterations.isEmpty || transactionRetentionPolicy.shouldIterationBePruned(iteration: precedingIteration, distance: distance) {
633+
iterations.append(precedingIteration)
634+
} else {
635+
mainlineSuccessorIteration = precedingIteration
636+
}
637+
currentIteration = precedingIteration
638+
639+
distance += 1
640+
await Task.yield()
641+
}
642+
643+
/// Prune iterations from oldest to newest.
644+
for (index, iteration) in iterations.enumerated().reversed() {
645+
let mainlineSuccessorIteration = index > 0 ? iterations[index-1] : mainlineSuccessorIteration
646+
647+
var iterationsToPrune: [SnapshotIteration] = []
648+
var successorCandidatesToCheck = iteration.successiveIterations
649+
successorCandidatesToCheck.removeAll { $0 == mainlineSuccessorIteration.id }
650+
651+
/// Walk the successor candidates all the way back up so newer iterations are pruned before the ones that reference them. We pull items off from the end, and add new ones to the beginning to make sure they stay in graph order.
652+
while let successorCandidateID = successorCandidatesToCheck.popLast() {
653+
try Task.checkCancellation()
654+
guard let successorIteration = try? await snapshot.loadIteration(for: successorCandidateID)
655+
else { continue }
656+
657+
iterationsToPrune.append(successorIteration)
658+
successorCandidatesToCheck.insert(contentsOf: successorIteration.successiveIterations, at: 0)
659+
await Task.yield()
660+
}
661+
662+
/// First, remove the branch of iterations based on the one we are removing, but representing a history that was previously reverted.
663+
/// Prune the iterations in atomic tasks so they don't get cancelled mid-way, and instead check for cancellation in between iterations.
664+
for iteration in iterationsToPrune.reversed() {
665+
try Task.checkCancellation()
666+
try await Task { try await snapshot.pruneIteration(iteration, mode: .pruneAdded, shouldDelete: true) }.value
667+
await Task.yield()
668+
}
669+
670+
/// Finally, prune the iteration itself.
671+
try Task.checkCancellation()
672+
try await Task { try await snapshot.pruneIteration(iteration, mode: .pruneRemoved, shouldDelete: true) }.value
673+
await Task.yield()
674+
}
675+
676+
try Task.checkCancellation()
677+
try await Task { try await snapshot.pruneIteration(mainlineSuccessorIteration, mode: .pruneRemoved, shouldDelete: false) }.value
678+
await Task.yield()
679+
} catch {
680+
print("Pruning stopped: \(error)")
532681
}
682+
683+
await self.checkNextSnapshotIterationCandidateForPruning()?.value
533684
}
685+
686+
return snapshotIterationPruningTask
687+
}
688+
689+
/// Await any cleanup since the last complete write transaction to the persistence.
690+
///
691+
/// - Note: An application is not required to await cleanup, as it'll be eventually completed on future runs. It is however useful in cases when disk resources must be cleared before progressing to another step.
692+
public func finishTransactionCleanup() async {
693+
await snapshotIterationPruningTask?.value
534694
}
535695
}
536696

0 commit comments

Comments
 (0)