diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 6e9eb0baf..038b64ff2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -72,6 +72,8 @@ set(ICEBERG_SOURCES table_requirements.cc table_scan.cc table_update.cc + transaction.cc + transaction_catalog.cc transform.cc transform_function.cc type.cc diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index 6c4957ade..32b763347 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -123,8 +123,8 @@ class ICEBERG_EXPORT Catalog { /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists virtual Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) = 0; + const std::vector>& requirements, + const std::vector>& updates) = 0; /// \brief Start a transaction to create a table /// @@ -184,6 +184,11 @@ class ICEBERG_EXPORT Catalog { /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists virtual Result> RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) = 0; + + /// \brief Set whether the last operation in a transaction has been committed + /// + /// \param committed true if the last operation has been committed, false otherwise + virtual void SetLastOperationCommitted(bool committed) = 0; }; } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 9e4a485a0..645ee43eb 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -392,9 +392,8 @@ Result> InMemoryCatalog::CreateTable( Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) { - std::unique_lock lock(mutex_); + const std::vector>& requirements, + const std::vector>& updates) { return NotImplemented("update table"); } diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index e6a9acbce..c15756cda 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -77,8 +77,8 @@ class ICEBERG_EXPORT InMemoryCatalog Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) override; + const std::vector>& requirements, + const std::vector>& updates) override; Result> StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, @@ -97,6 +97,8 @@ class ICEBERG_EXPORT InMemoryCatalog const TableIdentifier& identifier, const std::string& metadata_file_location) override; + void SetLastOperationCommitted(bool committed) override {} + private: std::string catalog_name_; std::unordered_map properties_; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 4a77f6585..c1ef11e55 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -31,7 +31,6 @@ #include "iceberg/catalog/rest/http_client.h" #include "iceberg/catalog/rest/json_internal.h" #include "iceberg/catalog/rest/resource_paths.h" -#include "iceberg/catalog/rest/rest_catalog.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/json_internal.h" #include "iceberg/partition_spec.h" @@ -197,8 +196,9 @@ Result> RestCatalog::CreateTable( Result> RestCatalog::UpdateTable( [[maybe_unused]] const TableIdentifier& identifier, - [[maybe_unused]] const std::vector>& requirements, - [[maybe_unused]] const std::vector>& updates) { + [[maybe_unused]] const std::vector>& + requirements, + [[maybe_unused]] const std::vector>& updates) { return NotImplemented("Not implemented"); } diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 4e191e86f..e3ae95df9 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -76,8 +76,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { Result> UpdateTable( const TableIdentifier& identifier, - const std::vector>& requirements, - const std::vector>& updates) override; + const std::vector>& requirements, + const std::vector>& updates) override; Result> StageCreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, @@ -96,6 +96,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { const TableIdentifier& identifier, const std::string& metadata_file_location) override; + void SetLastOperationCommitted(bool committed) override {} + private: RestCatalog(std::unique_ptr config, std::unique_ptr paths); diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d473d72e1..7eac3f57f 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -94,6 +94,8 @@ iceberg_sources = files( 'table_requirements.cc', 'table_scan.cc', 'table_update.cc', + 'transaction.cc', + 'transaction_catalog.cc', 'transform.cc', 'transform_function.cc', 'type.cc', diff --git a/src/iceberg/result.h b/src/iceberg/result.h index ddc428a23..6f93dc606 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -43,6 +43,7 @@ enum class ErrorKind { kInvalidManifest, kInvalidManifestList, kInvalidSchema, + kInvalidState, kIOError, kJsonParseError, kNamespaceNotEmpty, @@ -104,6 +105,7 @@ DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidManifest) DEFINE_ERROR_FUNCTION(InvalidManifestList) DEFINE_ERROR_FUNCTION(InvalidSchema) +DEFINE_ERROR_FUNCTION(InvalidState) DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 458711255..871394133 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -26,6 +26,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" +#include "iceberg/transaction.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -113,8 +114,8 @@ std::unique_ptr Table::UpdateProperties() const { return std::make_unique(identifier_, catalog_, metadata_); } -std::unique_ptr Table::NewTransaction() const { - throw NotImplemented("Table::NewTransaction is not implemented"); +Result> Table::NewTransaction() const { + return Transaction::Make(shared_from_this(), catalog_); } const std::shared_ptr& Table::io() const { return io_; } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index df3a0c32e..38483584a 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" @@ -33,7 +34,7 @@ namespace iceberg { /// \brief Represents an Iceberg table -class ICEBERG_EXPORT Table { +class ICEBERG_EXPORT Table : public std::enable_shared_from_this { public: ~Table(); @@ -87,6 +88,9 @@ class ICEBERG_EXPORT Table { /// \brief Return the table's base location const std::string& location() const; + /// \brief Return the table's metadata file location + const std::string& metadata_location() const { return metadata_location_; } + /// \brief Return the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; @@ -118,12 +122,15 @@ class ICEBERG_EXPORT Table { /// \brief Create a new transaction for this table /// - /// \return a pointer to the new Transaction - virtual std::unique_ptr NewTransaction() const; + /// \return a new Transaction or an error if the transaction cannot be created + virtual Result> NewTransaction() const; /// \brief Returns a FileIO to read and write table data and metadata files const std::shared_ptr& io() const; + /// \brief Return the underlying table metadata + const std::shared_ptr& metadata() const { return metadata_; } + private: const TableIdentifier identifier_; std::shared_ptr metadata_; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index a48567132..d4957ce35 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -69,6 +69,7 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES + base_transaction_test.cc json_internal_test.cc metrics_config_test.cc schema_json_test.cc diff --git a/src/iceberg/test/base_transaction_test.cc b/src/iceberg/test/base_transaction_test.cc new file mode 100644 index 000000000..0c1becb95 --- /dev/null +++ b/src/iceberg/test/base_transaction_test.cc @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include + +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/transaction.h" +#include "iceberg/update/update_properties.h" + +namespace iceberg { + +class BaseTransactionTest : public ::testing::Test { + protected: + void SetUp() override { + // Create catalog and table identifier + catalog_ = std::make_shared<::testing::NiceMock>(); + + identifier_ = TableIdentifier(Namespace({"test"}), "test_table"); + auto metadata = std::make_shared(); + table_ = + std::make_shared
(identifier_, std::move(metadata), + "s3://bucket/table/metadata.json", nullptr, catalog_); + } + + std::unique_ptr NewTransaction() { + auto transaction_result = BaseTransaction::Make(table_, catalog_); + if (!transaction_result.has_value()) { + ADD_FAILURE() << "Failed to create transaction: " + << transaction_result.error().message; + } + return std::move(transaction_result).value(); + } + + TableIdentifier identifier_; + std::shared_ptr catalog_; + std::shared_ptr
table_; +}; + +TEST_F(BaseTransactionTest, CommitSetPropertiesUsesCatalog) { + auto transaction = NewTransaction(); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); + + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) + .WillOnce( + [](const TableIdentifier& id, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ("test_table", id.name); + EXPECT_EQ(1u, updates.size()); + const auto* set_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("new-key"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("new-value", it->second); + return {std::unique_ptr
()}; + }); + + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); +} + +TEST_F(BaseTransactionTest, RemovePropertiesSkipsMissingKeys) { + auto transaction = NewTransaction(); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Remove("missing").Remove("existing"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); + + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) + .WillOnce( + [](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(1u, updates.size()); + const auto* remove_update = + dynamic_cast(updates.front().get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), + ::testing::UnorderedElementsAre("missing", "existing")); + return {std::unique_ptr
()}; + }); + + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); +} + +TEST_F(BaseTransactionTest, AggregatesMultiplePendingUpdates) { + auto transaction = NewTransaction(); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); + EXPECT_THAT(update_properties.value()->Commit(), IsOk()); + auto remove_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(remove_properties.has_value()); + remove_properties.value()->Remove("existing"); + EXPECT_THAT(remove_properties.value()->Commit(), IsOk()); + + EXPECT_CALL(*catalog_, + UpdateTable(::testing::Eq(identifier_), ::testing::_, ::testing::_)) + .WillOnce( + [](const TableIdentifier&, + const std::vector>& /*requirements*/, + const std::vector>& updates) + -> Result> { + EXPECT_EQ(2u, updates.size()); + + const auto* set_update = + dynamic_cast(updates[0].get()); + EXPECT_NE(set_update, nullptr); + const auto& updated = set_update->updated(); + auto it = updated.find("new-key"); + EXPECT_NE(it, updated.end()); + EXPECT_EQ("new-value", it->second); + + const auto* remove_update = + dynamic_cast(updates[1].get()); + EXPECT_NE(remove_update, nullptr); + EXPECT_THAT(remove_update->removed(), ::testing::ElementsAre("existing")); + + return {std::unique_ptr
()}; + }); + + EXPECT_THAT(transaction->CommitTransaction(), IsOk()); +} + +TEST_F(BaseTransactionTest, FailsIfUpdateNotCommitted) { + auto transaction = NewTransaction(); + auto update_properties = transaction->NewUpdateProperties(); + EXPECT_TRUE(update_properties.has_value()); + update_properties.value()->Set("new-key", "new-value"); + EXPECT_THAT(transaction->CommitTransaction(), IsError(ErrorKind::kInvalidState)); +} + +TEST_F(BaseTransactionTest, NewTransactionFailsWithoutCatalog) { + auto metadata = std::make_shared(); + auto table_without_catalog = + std::make_shared
(identifier_, std::move(metadata), + "s3://bucket/table/metadata.json", nullptr, nullptr); + EXPECT_THAT(table_without_catalog->NewTransaction(), + IsError(ErrorKind::kInvalidArgument)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index 46f01c8db..ae876a792 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -62,8 +62,8 @@ class MockCatalog : public Catalog { MOCK_METHOD((Result>), UpdateTable, (const TableIdentifier&, - (const std::vector>&), - (const std::vector>&)), + const std::vector>&, + const std::vector>&), (override)); MOCK_METHOD((Result>), StageCreateTable, @@ -83,6 +83,8 @@ class MockCatalog : public Catalog { MOCK_METHOD((Result>), RegisterTable, (const TableIdentifier&, const std::string&), (override)); + + MOCK_METHOD(void, SetLastOperationCommitted, (bool), (override)); }; } // namespace iceberg diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 13cfec831..4f40d478b 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -50,7 +50,14 @@ class UpdatePropertiesTest : public ::testing::Test { metadata_->schemas.push_back(schema_); // Create catalog and table identifier - catalog_ = std::make_shared(); + catalog_ = std::make_shared<::testing::NiceMock>(); + ON_CALL(*catalog_, LoadTable(::testing::_)) + .WillByDefault([this](const TableIdentifier&) -> Result> { + return std::make_unique
(identifier_, metadata_, + "s3://bucket/table/metadata.json", nullptr, + catalog_); + }); + identifier_ = TableIdentifier(Namespace({"test"}), "table"); } @@ -159,8 +166,11 @@ TEST_F(UpdatePropertiesTest, InvalidTable) { { // metadata is null - UpdateProperties update(identifier_, catalog_, nullptr); + auto catalog = std::make_shared<::testing::NiceMock>(); + EXPECT_CALL(*catalog, LoadTable(::testing::_)) + .WillOnce(::testing::Return(InvalidArgument("Base table metadata is required"))); + UpdateProperties update(identifier_, catalog, nullptr); auto result = update.Apply(); EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); EXPECT_THAT(result, HasErrorMessage("Base table metadata is required")); @@ -173,7 +183,9 @@ TEST_F(UpdatePropertiesTest, Commit) { UpdateProperties update(identifier_, catalog_, metadata_); update.Set("key1", "value1"); - EXPECT_CALL(*catalog_, UpdateTable).Times(1).WillOnce(::testing::Return(nullptr)); + EXPECT_CALL(*catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .Times(1) + .WillOnce(::testing::Return(nullptr)); auto result = update.Commit(); EXPECT_THAT(result, IsOk()); @@ -184,7 +196,7 @@ TEST_F(UpdatePropertiesTest, Commit) { UpdateProperties update(identifier_, catalog_, metadata_); update.Set("key1", "value1"); - EXPECT_CALL(*catalog_, UpdateTable) + EXPECT_CALL(*catalog_, UpdateTable(::testing::_, ::testing::_, ::testing::_)) .WillOnce(::testing::Return(CommitFailed("Commit update failed"))); auto result = update.Commit(); EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc new file mode 100644 index 000000000..c58cdb367 --- /dev/null +++ b/src/iceberg/transaction.cc @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/transaction.h" + +#include "iceberg/catalog.h" +#include "iceberg/pending_update.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction_catalog.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> Transaction::Make(std::shared_ptr table, + std::shared_ptr catalog) { + return BaseTransaction::Make(std::move(table), std::move(catalog)); +} + +Result> BaseTransaction::Make( + std::shared_ptr table, std::shared_ptr catalog) { + if (!table) { + return InvalidArgument("Transaction::Make requires a table"); + } + if (!catalog) { + return InvalidArgument("Transaction::Make requires a catalog"); + } + + return std::unique_ptr( + new BaseTransaction(std::move(table), std::move(catalog))); +} + +BaseTransaction::BaseTransaction(std::shared_ptr table, + std::shared_ptr catalog) + : table_(std::move(table)) { + context_.identifier = table_->name(); + context_.current_metadata = table_->metadata(); + catalog_ = std::make_shared(std::move(catalog), this); +} + +const std::shared_ptr& BaseTransaction::table() const { return table_; } + +Result> BaseTransaction::NewUpdateProperties() { + if (!HasLastOperationCommitted()) { + return InvalidState( + "Cannot create new update: last operation in transaction has not committed"); + } + SetLastOperationCommitted(false); + + auto metadata = std::make_shared(*context_.current_metadata); + return std::make_unique(table_->name(), catalog_, + std::move(metadata)); +} + +Result> BaseTransaction::NewAppend() { + throw NotImplemented("BaseTransaction::NewAppend not implemented"); +} + +Status BaseTransaction::CommitTransaction() { + if (!HasLastOperationCommitted()) { + return InvalidState("Cannot commit transaction: last operation has not committed"); + } + + if (context_.pending_updates.empty()) { + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE( + auto updated_table, + catalog_->catalog_impl()->UpdateTable( + context_.identifier, context_.pending_requirements, context_.pending_updates)); + + context_.pending_requirements.clear(); + context_.pending_updates.clear(); + + return {}; +} + +Result> BaseTransaction::StageUpdates( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) { + if (identifier != context_.identifier) { + return InvalidArgument("Transaction only supports table '{}'", + context_.identifier.name); + } + + if (!context_.current_metadata) { + return InvalidState("Transaction metadata is not initialized"); + } + + if (updates.empty()) { + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); + } + + ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates)); + context_.pending_requirements.insert(context_.pending_requirements.end(), + requirements.begin(), requirements.end()); + context_.pending_updates.insert(context_.pending_updates.end(), updates.begin(), + updates.end()); + + return std::make_unique
( + context_.identifier, std::make_shared(*context_.current_metadata), + table_->metadata_location(), table_->io(), catalog_->catalog_impl()); +} + +Status BaseTransaction::ApplyUpdates( + const std::vector>& updates) { + if (updates.empty()) { + return {}; + } + + auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get()); + for (const auto& update : updates) { + if (!update) { + continue; + } + update->ApplyTo(*builder); + } + + ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build()); + context_.current_metadata = std::shared_ptr(std::move(new_metadata)); + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 72ba5182c..6a48d7ff3 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -21,9 +21,13 @@ #pragma once #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -33,15 +37,28 @@ class ICEBERG_EXPORT Transaction { public: virtual ~Transaction() = default; + /// \brief Construct a transaction with validation + /// + /// \param table the table to update + /// \param catalog the catalog backing the table + /// \return the constructed transaction or an error if arguments are invalid + static Result> Make(std::shared_ptr table, + std::shared_ptr catalog); + /// \brief Return the Table that this transaction will update /// /// \return this transaction's table - virtual const std::shared_ptr
& table() const = 0; + virtual const std::shared_ptr& table() const = 0; + + /// \brief Create a new update properties operation + /// + /// \return a new UpdateProperties + virtual Result> NewUpdateProperties() = 0; /// \brief Create a new append API to add files to this table /// /// \return a new AppendFiles - virtual std::shared_ptr NewAppend() = 0; + virtual Result> NewAppend() = 0; /// \brief Apply the pending changes from all actions and commit /// @@ -53,4 +70,87 @@ class ICEBERG_EXPORT Transaction { virtual Status CommitTransaction() = 0; }; +/// \brief Base implementation shared by table transactions +class ICEBERG_EXPORT BaseTransaction : public Transaction { + public: + ~BaseTransaction() override = default; + + /// \brief Construct a BaseTransaction with validation + /// + /// \param table the table to update + /// \param catalog the catalog backing the table + /// \return the constructed transaction or an error if arguments are invalid + static Result> Make(std::shared_ptr table, + std::shared_ptr catalog); + + const std::shared_ptr& table() const override; + + Result> NewUpdateProperties() override; + + Result> NewAppend() override; + + Status CommitTransaction() override; + + /// \brief Stage updates to be applied upon commit + /// + /// \param identifier the table identifier + /// \param requirements the list of table requirements to validate + /// \param updates the list of table updates to apply + /// \return a new Table instance with staged updates applied + Result> StageUpdates( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates); + + /// \brief Whether the last operation has been committed + /// + /// \return true if the last operation was committed, false otherwise + bool HasLastOperationCommitted() const { return context_.last_operation_committed; } + + /// \brief Mark the last operation as committed or not + /// + /// \param committed true if the last operation was committed, false otherwise + void SetLastOperationCommitted(bool committed) { + context_.last_operation_committed = committed; + } + + protected: + BaseTransaction(std::shared_ptr table, std::shared_ptr catalog); + + /// \brief Apply a list of table updates to the current metadata + /// + /// \param updates the list of table updates to apply + /// \return Status::OK if the updates were applied successfully, or an error status + Status ApplyUpdates(const std::vector>& updates); + + private: + /// \brief Context for transaction + struct TransactionContext { + TransactionContext() = default; + TransactionContext(TableIdentifier identifier, + std::shared_ptr metadata) + : identifier(std::move(identifier)), current_metadata(std::move(metadata)) {} + + // Non-copyable, movable + TransactionContext(const TransactionContext&) = delete; + TransactionContext& operator=(const TransactionContext&) = delete; + TransactionContext(TransactionContext&&) noexcept = default; + TransactionContext& operator=(TransactionContext&&) noexcept = default; + + bool last_operation_committed = true; + TableIdentifier identifier; + std::shared_ptr current_metadata; + std::vector> pending_requirements; + std::vector> pending_updates; + }; + + std::shared_ptr table_; + std::shared_ptr catalog_; + TransactionContext context_; + + friend Result> Transaction::Make( + std::shared_ptr, std::shared_ptr); + friend class TransactionCatalog; +}; + } // namespace iceberg diff --git a/src/iceberg/transaction_catalog.cc b/src/iceberg/transaction_catalog.cc new file mode 100644 index 000000000..62929793f --- /dev/null +++ b/src/iceberg/transaction_catalog.cc @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/transaction_catalog.h" + +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" + +namespace iceberg { + +Result> TransactionCatalog::UpdateTable( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) { + if (!owner_) { + return InvalidState("Transaction state is unavailable"); + } + + return owner_->StageUpdates(identifier, requirements, updates); +} + +Result> TransactionCatalog::LoadTable( + const TableIdentifier& identifier) { + if (!owner_) { + return InvalidState("Transaction state is unavailable"); + } + + auto metadata = std::make_shared(*owner_->context_.current_metadata); + return std::make_unique
(identifier, std::move(metadata), + owner_->table()->metadata_location(), + owner_->table()->io(), catalog_impl_); +} + +void TransactionCatalog::SetLastOperationCommitted(bool committed) { + if (owner_) { + owner_->SetLastOperationCommitted(committed); + } +} + +} // namespace iceberg diff --git a/src/iceberg/transaction_catalog.h b/src/iceberg/transaction_catalog.h new file mode 100644 index 000000000..58cb21684 --- /dev/null +++ b/src/iceberg/transaction_catalog.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/catalog.h" +#include "iceberg/result.h" +#include "iceberg/table.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/** + * @brief Lightweight catalog wrapper for BaseTransaction. + * + * For read-only operations, TransactionCatalog simply forwards to the wrapped catalog. + * For mutating calls such as UpdateTable or HasLastOperationCommitted, it delegates back + * to the owning BaseTransaction so staged updates remain private until commit. + */ +class ICEBERG_EXPORT TransactionCatalog : public Catalog { + public: + TransactionCatalog(std::shared_ptr catalog, BaseTransaction* owner) + : catalog_impl_(std::move(catalog)), owner_(owner) {} + ~TransactionCatalog() override = default; + + std::string_view name() const override { return catalog_impl_->name(); }; + + Status CreateNamespace( + const Namespace& ns, + const std::unordered_map& properties) override { + return catalog_impl_->CreateNamespace(ns, properties); + } + + Result> ListNamespaces(const Namespace& ns) const override { + return catalog_impl_->ListNamespaces(ns); + } + + Result> GetNamespaceProperties( + const Namespace& ns) const override { + return catalog_impl_->GetNamespaceProperties(ns); + } + + Status DropNamespace(const Namespace& ns) override { + // Will do nothing for directly dropping namespaces. + return NotSupported("DropNamespace is not supported in TransactionCatalog."); + } + + Result NamespaceExists(const Namespace& ns) const override { + return catalog_impl_->NamespaceExists(ns); + } + + Status UpdateNamespaceProperties( + const Namespace& ns, const std::unordered_map& updates, + const std::unordered_set& removals) override { + // Will do nothing for directly updating namespace properties. + return NotSupported( + "UpdateNamespaceProperties is not supported in TransactionCatalog."); + } + + Result> ListTables(const Namespace& ns) const override { + return catalog_impl_->ListTables(ns); + } + + Result> CreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override { + return NotImplemented("CreateTable is not implemented in TransactionCatalog."); + } + + Result> UpdateTable( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) override; + + Result> StageCreateTable( + const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, + const std::string& location, + const std::unordered_map& properties) override { + return NotImplemented("StageCreateTable is not implemented in TransactionCatalog."); + } + + Result TableExists(const TableIdentifier& identifier) const override { + return catalog_impl_->TableExists(identifier); + } + + Status DropTable(const TableIdentifier& identifier, bool purge) override { + return NotSupported("DropTable is not supported in TransactionCatalog."); + } + + Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override { + return NotImplemented("rename table"); + } + + Result> LoadTable(const TableIdentifier& identifier) override; + + Result> RegisterTable( + const TableIdentifier& identifier, + const std::string& metadata_file_location) override { + return NotImplemented("register table"); + } + + void SetLastOperationCommitted(bool committed) override; + + /// \brief Get the underlying catalog implementation + /// + /// \return the shared pointer to the underlying catalog + const std::shared_ptr& catalog_impl() const { return catalog_impl_; } + + private: + std::shared_ptr catalog_impl_; + BaseTransaction* owner_; +}; + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0e1867f60..f89378626 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -87,6 +87,7 @@ class UuidType; struct Namespace; struct TableIdentifier; +class BaseTransaction; class Catalog; class FileIO; class LocationProvider; @@ -95,6 +96,7 @@ class SortOrder; class Table; class TableProperties; class Transaction; +class TransactionCatalog; class Transform; class TransformFunction; @@ -154,14 +156,31 @@ class MapLike; class StructLike; class StructLikeAccessor; +class AddPartitionSpec; +class AddSchema; +class AddSnapshot; +class AddSortOrder; +class AssignUUID; +class RemovePartitionSpecs; +class RemoveProperties; +class RemoveSchemas; +class RemoveSnapshots; +class RemoveSnapshotRef; +class RemoveSortOrders; +class SetCurrentSchema; +class SetDefaultPartitionSpec; +class SetDefaultSortOrder; +class SetLocation; +class SetProperties; +class SetSnapshotRef; class TableUpdate; +class UpgradeFormatVersion; + class TableRequirement; class TableMetadataBuilder; class TableUpdateContext; class PendingUpdate; -template -class PendingUpdateTyped; class UpdateProperties; /// ---------------------------------------------------------------------------- diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index a4dcd1548..da56bb980 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -74,6 +74,10 @@ Status UpdateProperties::Apply() { if (!catalog_) { return InvalidArgument("Catalog is required to apply property updates"); } + + ICEBERG_ASSIGN_OR_RAISE(auto reloaded_table, catalog_->LoadTable(identifier_)); + base_metadata_ = reloaded_table->metadata(); + if (!base_metadata_) { return InvalidArgument("Base table metadata is required to apply property updates"); } @@ -126,8 +130,22 @@ Status UpdateProperties::Commit() { if (!updates.empty()) { ICEBERG_ASSIGN_OR_RAISE(auto requirements, TableRequirements::ForUpdateTable(*base_metadata_, updates)); - ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements, updates)); + auto shared_updates = std::vector>{}; + shared_updates.reserve(updates.size()); + for (auto& update : updates) { + shared_updates.push_back(std::move(update)); + } + auto shared_requirements = std::vector>{}; + shared_requirements.reserve(requirements.size()); + for (auto& requirement : requirements) { + shared_requirements.push_back(std::move(requirement)); + } + + ICEBERG_RETURN_UNEXPECTED( + catalog_->UpdateTable(identifier_, shared_requirements, shared_updates)); } + + catalog_->SetLastOperationCommitted(true); return {}; } diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index 0f1adf76a..14eec8ac7 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -39,7 +39,7 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { /// /// \param identifier The table identifier /// \param catalog The catalog containing the table - /// \param metadata The current table metadata + /// \param base The current table metadata UpdateProperties(TableIdentifier identifier, std::shared_ptr catalog, std::shared_ptr base);