Skip to content

Commit e1f179b

Browse files
committed
feat: add ExpireSnapshots following 3-goal update pattern
Add ExpireSnapshots as a concrete implementation in iceberg/update/ directory, following the 3-goal architecture for table updates. Goals accomplished: 1. TableMetadataBuilder::RemoveSnapshots() methods exist 2. table::RemoveSnapshots TableUpdate class exists 3. Expose via Table::ExpireSnapshots() and Transaction::ExpireSnapshots() Implementation details: - ExpireSnapshots extends PendingUpdateTyped<vector<shared_ptr<Snapshot>>> - Fluent API with builder methods: ExpireSnapshotId, ExpireOlderThan, RetainLast, DeleteWith, SetCleanupLevel - Table::ExpireSnapshots() factory method creates instances - Transaction::ExpireSnapshots() interface method added - Organized in src/iceberg/update/ subdirectory structure Builder methods: - ExpireSnapshotId: Mark specific snapshots for removal by ID - ExpireOlderThan: Expire snapshots older than timestamp - RetainLast: Keep N most recent snapshots - DeleteWith: Custom file deletion callback - SetCleanupLevel: Control cleanup scope (None/MetadataOnly/All) Apply() and Commit() have placeholder implementations that will be completed when snapshot expiration logic is fully implemented.
1 parent 9805fae commit e1f179b

File tree

12 files changed

+461
-0
lines changed

12 files changed

+461
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ set(ICEBERG_SOURCES
6666
transform.cc
6767
transform_function.cc
6868
type.cc
69+
update/expire_snapshots.cc
6970
util/bucket_util.cc
7071
util/conversions.cc
7172
util/decimal.cc
@@ -132,6 +133,7 @@ iceberg_install_all_headers(iceberg)
132133
add_subdirectory(catalog)
133134
add_subdirectory(expression)
134135
add_subdirectory(row)
136+
add_subdirectory(update)
135137
add_subdirectory(util)
136138

137139
if(ICEBERG_BUILD_BUNDLE)

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ iceberg_sources = files(
8888
'transform.cc',
8989
'transform_function.cc',
9090
'type.cc',
91+
'update/expire_snapshots.cc',
9192
'util/bucket_util.cc',
9293
'util/conversions.cc',
9394
'util/decimal.cc',
@@ -201,6 +202,7 @@ install_headers(
201202
subdir('catalog')
202203
subdir('expression')
203204
subdir('row')
205+
subdir('update')
204206
subdir('util')
205207

206208
if get_option('tests').enabled()

src/iceberg/table.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "iceberg/table_metadata.h"
2929
#include "iceberg/table_properties.h"
3030
#include "iceberg/table_scan.h"
31+
#include "iceberg/update/expire_snapshots.h"
3132
#include "iceberg/util/macros.h"
3233

3334
namespace iceberg {
@@ -114,6 +115,10 @@ std::unique_ptr<Transaction> Table::NewTransaction() const {
114115
throw NotImplemented("Table::NewTransaction is not implemented");
115116
}
116117

118+
std::shared_ptr<iceberg::ExpireSnapshots> Table::NewExpireSnapshots() {
119+
return std::make_shared<iceberg::ExpireSnapshots>(this);
120+
}
121+
117122
const std::shared_ptr<FileIO>& Table::io() const { return io_; }
118123

119124
std::unique_ptr<TableScanBuilder> Table::NewScan() const {

src/iceberg/table.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ class ICEBERG_EXPORT Table {
115115
/// \return a pointer to the new Transaction
116116
virtual std::unique_ptr<Transaction> NewTransaction() const;
117117

118+
/// \brief Create a new expire snapshots operation for this table
119+
///
120+
/// \return a shared pointer to the new ExpireSnapshots operation
121+
virtual std::shared_ptr<ExpireSnapshots> NewExpireSnapshots();
122+
118123
/// \brief Returns a FileIO to read and write table data and metadata files
119124
const std::shared_ptr<FileIO>& io() const;
120125

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ add_iceberg_test(expression_test
9797
literal_test.cc
9898
predicate_test.cc)
9999

100+
add_iceberg_test(update_test SOURCES ../update/test/expire_snapshots_test.cc)
101+
100102
add_iceberg_test(json_serde_test
101103
SOURCES
102104
test_common.cc

src/iceberg/transaction.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ class ICEBERG_EXPORT Transaction {
4444
/// \return a new AppendFiles
4545
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
4646

47+
/// \brief Create a new expire snapshots operation for this transaction
48+
///
49+
/// \return a shared pointer to the new ExpireSnapshots operation
50+
virtual std::shared_ptr<ExpireSnapshots> NewExpireSnapshots() = 0;
51+
4752
/// \brief Apply the pending changes from all actions and commit
4853
///
4954
/// This method applies all pending data operations and metadata updates in the

src/iceberg/type_fwd.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ class PendingUpdate;
161161
template <typename T>
162162
class PendingUpdateTyped;
163163

164+
enum class CleanupLevel;
165+
class ExpireSnapshots;
166+
164167
/// ----------------------------------------------------------------------------
165168
/// TODO: Forward declarations below are not added yet.
166169
/// ----------------------------------------------------------------------------

src/iceberg/update/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/update)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/expire_snapshots.h"
21+
22+
#include "iceberg/result.h"
23+
#include "iceberg/snapshot.h"
24+
#include "iceberg/table.h"
25+
#include "iceberg/table_metadata.h"
26+
27+
namespace iceberg {
28+
29+
ExpireSnapshots::ExpireSnapshots(Table* table) : table_(table) {}
30+
31+
ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) {
32+
snapshot_ids_to_expire_.push_back(snapshot_id);
33+
return *this;
34+
}
35+
36+
ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) {
37+
expire_older_than_ms_ = timestamp_millis;
38+
return *this;
39+
}
40+
41+
ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) {
42+
retain_last_ = num_snapshots;
43+
return *this;
44+
}
45+
46+
ExpireSnapshots& ExpireSnapshots::DeleteWith(
47+
std::function<void(std::string_view)> delete_func) {
48+
delete_func_ = std::move(delete_func);
49+
return *this;
50+
}
51+
52+
ExpireSnapshots& ExpireSnapshots::SetCleanupLevel(CleanupLevel level) {
53+
cleanup_level_ = level;
54+
return *this;
55+
}
56+
57+
Result<std::vector<std::shared_ptr<Snapshot>>> ExpireSnapshots::Apply() {
58+
// Placeholder implementation - full snapshot expiration logic to be implemented
59+
return NotImplemented("ExpireSnapshots::Apply() is not yet implemented");
60+
}
61+
62+
Status ExpireSnapshots::Commit() {
63+
// Placeholder implementation - full commit logic to be implemented
64+
return NotImplemented("ExpireSnapshots::Commit() is not yet implemented");
65+
}
66+
67+
} // namespace iceberg
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/update/expire_snapshots.h
23+
/// API for removing old snapshots from a table
24+
25+
#include <cstdint>
26+
#include <functional>
27+
#include <memory>
28+
#include <optional>
29+
#include <string_view>
30+
#include <vector>
31+
32+
#include "iceberg/iceberg_export.h"
33+
#include "iceberg/pending_update.h"
34+
#include "iceberg/type_fwd.h"
35+
36+
namespace iceberg {
37+
38+
/// \brief Cleanup level for snapshot expiration
39+
///
40+
/// Controls which files are deleted during snapshot expiration.
41+
enum class CleanupLevel {
42+
/// Skip all file cleanup, only remove snapshot metadata
43+
kNone,
44+
/// Clean up only metadata files (manifests, manifest lists, statistics),
45+
/// retain data files
46+
kMetadataOnly,
47+
/// Clean up both metadata and data files (default)
48+
kAll,
49+
};
50+
51+
/// \brief API for removing old snapshots from a table
52+
///
53+
/// ExpireSnapshots accumulates snapshot deletions and commits the new snapshot
54+
/// list to the table. This API does not allow deleting the current snapshot.
55+
///
56+
/// When committing, changes are applied to the latest table metadata. Commit
57+
/// conflicts are resolved by applying the changes to the new latest metadata
58+
/// and reattempting the commit.
59+
///
60+
/// Manifest files that are no longer used by valid snapshots will be deleted.
61+
/// Data files that were deleted by snapshots that are expired will be deleted.
62+
/// DeleteWith() can be used to pass an alternative deletion method.
63+
///
64+
/// Apply() returns a list of the snapshots that will be removed (preview mode).
65+
///
66+
/// Example usage:
67+
/// \code
68+
/// table.ExpireSnapshots()
69+
/// .ExpireOlderThan(timestampMillis)
70+
/// .RetainLast(5)
71+
/// .Commit();
72+
/// \endcode
73+
class ICEBERG_EXPORT ExpireSnapshots
74+
: public PendingUpdateTyped<std::vector<std::shared_ptr<Snapshot>>> {
75+
public:
76+
/// \brief Constructor for ExpireSnapshots operation
77+
///
78+
/// \param table The table to expire snapshots from
79+
explicit ExpireSnapshots(Table* table);
80+
~ExpireSnapshots() override = default;
81+
82+
/// \brief Expire a specific snapshot identified by id
83+
///
84+
/// Marks a specific snapshot for removal. This method can be called multiple
85+
/// times to expire multiple snapshots. Snapshots marked by this method will
86+
/// be expired even if they would be retained by RetainLast().
87+
///
88+
/// \param snapshot_id ID of the snapshot to expire
89+
/// \return Reference to this for method chaining
90+
ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id);
91+
92+
/// \brief Expire all snapshots older than the given timestamp
93+
///
94+
/// Sets a timestamp threshold - all snapshots created before this time will
95+
/// be expired (unless retained by RetainLast()).
96+
///
97+
/// \param timestamp_millis Timestamp in milliseconds since epoch
98+
/// \return Reference to this for method chaining
99+
ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis);
100+
101+
/// \brief Retain the most recent ancestors of the current snapshot
102+
///
103+
/// If a snapshot would be expired because it is older than the expiration
104+
/// timestamp, but is one of the num_snapshots most recent ancestors of the
105+
/// current state, it will be retained. This will not prevent snapshots
106+
/// explicitly identified by ExpireSnapshotId() from expiring.
107+
///
108+
/// This may keep more than num_snapshots ancestors if snapshots are added
109+
/// concurrently. This may keep less than num_snapshots ancestors if the
110+
/// current table state does not have that many.
111+
///
112+
/// \param num_snapshots The number of snapshots to retain
113+
/// \return Reference to this for method chaining
114+
ExpireSnapshots& RetainLast(int num_snapshots);
115+
116+
/// \brief Set a custom file deletion callback
117+
///
118+
/// Passes an alternative delete implementation that will be used for
119+
/// manifests and data files. If this method is not called, unnecessary
120+
/// manifests and data files will still be deleted using the default method.
121+
///
122+
/// Manifest files that are no longer used by valid snapshots will be deleted.
123+
/// Data files that were deleted by snapshots that are expired will be deleted.
124+
///
125+
/// \param delete_func Callback function that will be called for each file to delete
126+
/// \return Reference to this for method chaining
127+
ExpireSnapshots& DeleteWith(std::function<void(std::string_view)> delete_func);
128+
129+
/// \brief Configure the cleanup level for expired files
130+
///
131+
/// This method provides fine-grained control over which files are cleaned up
132+
/// during snapshot expiration.
133+
///
134+
/// Use CleanupLevel::kMetadataOnly when data files are shared across tables or
135+
/// when using procedures like add-files that may reference the same data files.
136+
///
137+
/// Use CleanupLevel::kNone when data and metadata files may be more efficiently
138+
/// removed using a distributed framework through the actions API.
139+
///
140+
/// \param level The cleanup level to use for expired snapshots
141+
/// \return Reference to this for method chaining
142+
ExpireSnapshots& SetCleanupLevel(CleanupLevel level);
143+
144+
/// \brief Apply the pending changes and return the uncommitted result
145+
///
146+
/// This does not result in a permanent update.
147+
///
148+
/// \return the list of snapshots that would be expired, or an error:
149+
/// - ValidationFailed: if pending changes cannot be applied
150+
Result<std::vector<std::shared_ptr<Snapshot>>> Apply() override;
151+
152+
/// \brief Apply and commit the pending changes to the table
153+
///
154+
/// Changes are committed by calling the underlying table's commit operation.
155+
///
156+
/// Once the commit is successful, the updated table will be refreshed.
157+
///
158+
/// \return Status::OK if the commit was successful, or an error:
159+
/// - ValidationFailed: if update cannot be applied to current metadata
160+
/// - CommitFailed: if update cannot be committed due to conflicts
161+
Status Commit() override;
162+
163+
// Non-copyable, movable (inherited from PendingUpdate)
164+
ExpireSnapshots(const ExpireSnapshots&) = delete;
165+
ExpireSnapshots& operator=(const ExpireSnapshots&) = delete;
166+
167+
private:
168+
Table* table_;
169+
std::vector<int64_t> snapshot_ids_to_expire_;
170+
std::optional<int64_t> expire_older_than_ms_;
171+
std::optional<int> retain_last_;
172+
std::optional<std::function<void(std::string_view)>> delete_func_;
173+
CleanupLevel cleanup_level_ = CleanupLevel::kAll;
174+
};
175+
176+
} // namespace iceberg

0 commit comments

Comments
 (0)