Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
set(ICEBERG_SOURCES
arrow_c_data_guard_internal.cc
base_transaction.cc
catalog/memory/in_memory_catalog.cc
expression/aggregate.cc
expression/binder.cc
Expand Down Expand Up @@ -68,6 +69,7 @@ set(ICEBERG_SOURCES
table_requirements.cc
table_scan.cc
table_update.cc
transaction_catalog.cc
transform.cc
transform_function.cc
type.cc
Expand Down
146 changes: 146 additions & 0 deletions src/iceberg/base_transaction.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/base_transaction.h"

#include <utility>

#include "iceberg/catalog.h"
#include "iceberg/pending_update.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction_catalog.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

namespace iceberg {

BaseTransaction::BaseTransaction(std::shared_ptr<const Table> table,
std::shared_ptr<Catalog> catalog)
: table_(std::move(table)) {
ICEBERG_DCHECK(table_ != nullptr, "table must not be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add a Transaction::Make function to return Result<Transaction> and check these arguments explicitly.

ICEBERG_DCHECK(catalog != nullptr, "catalog must not be null");
context_.identifier = table_->name();
context_.current_metadata = table_->metadata();
catalog_ = std::make_shared<TransactionCatalog>(std::move(catalog), this);
}

const std::shared_ptr<const Table>& BaseTransaction::table() const { return table_; }

std::unique_ptr<UpdateProperties> BaseTransaction::UpdateProperties() {
auto update = CheckAndCreateUpdate<::iceberg::UpdateProperties>(
table_->name(), catalog_, CurrentMetadata());
if (!update.has_value()) {
ERROR_TO_EXCEPTION(update.error());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot throw. Perhaps we should also use Result wrapper for these update return types.

}

return std::move(update).value();
}

std::unique_ptr<AppendFiles> BaseTransaction::NewAppend() {
throw NotImplemented("BaseTransaction::NewAppend not implemented");
}

Status BaseTransaction::CommitTransaction() {
if (!HasLastOperationCommitted()) {
return InvalidState("Cannot commit transaction: last operation has not committed");
}

auto pending_updates = ConsumePendingUpdates();
if (pending_updates.empty()) {
return {};
}

auto pending_requirements = ConsumePendingRequirements();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we need to support retry on failed commits? In the current approach, these two lists are gone.


ICEBERG_ASSIGN_OR_RAISE(
auto updated_table,
catalog_->catalog_impl()->UpdateTable(
table_->name(), std::move(pending_requirements), std::move(pending_updates)));

// update table to the new version
if (updated_table) {
table_ = std::shared_ptr<Table>(std::move(updated_table));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this table_ after commit has been done? It seems useless?

}

return {};
}

Result<std::unique_ptr<Table>> BaseTransaction::StageUpdates(
const TableIdentifier& identifier,
std::vector<std::unique_ptr<TableRequirement>> requirements,
std::vector<std::unique_ptr<TableUpdate>> updates) {
if (identifier != context_.identifier) {
return InvalidArgument("Transaction only supports table '{}'",
context_.identifier.name);
}

if (!context_.current_metadata) {
return InvalidState("Transaction metadata is not initialized");
}

if (updates.empty()) {
return std::make_unique<Table>(
context_.identifier, std::make_shared<TableMetadata>(*context_.current_metadata),
table_->location(), table_->io(), catalog_->catalog_impl());
}

ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates));

for (auto& requirement : requirements) {
context_.pending_requirements.emplace_back(std::move(requirement));
}
for (auto& update : updates) {
context_.pending_updates.emplace_back(std::move(update));
}

return std::make_unique<Table>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to return a table?

context_.identifier, std::make_shared<TableMetadata>(*context_.current_metadata),
table_->location(), table_->io(), catalog_->catalog_impl());
}

Status BaseTransaction::ApplyUpdates(
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
if (updates.empty()) {
return {};
}

auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get());
for (const auto& update : updates) {
if (!update) {
continue;
}
update->ApplyTo(*builder);
}

ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build());
context_.current_metadata = std::shared_ptr<TableMetadata>(std::move(new_metadata));
return {};
}

std::vector<std::unique_ptr<TableRequirement>>
BaseTransaction::ConsumePendingRequirements() {
return std::exchange(context_.pending_requirements, {});
}

std::vector<std::unique_ptr<TableUpdate>> BaseTransaction::ConsumePendingUpdates() {
return std::exchange(context_.pending_updates, {});
}

} // namespace iceberg
103 changes: 103 additions & 0 deletions src/iceberg/base_transaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <vector>

#include "iceberg/table_identifier.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/transaction.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Base class for transaction implementations
class ICEBERG_EXPORT BaseTransaction : public Transaction {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If BaseTransaction is the only subclass, we can just implement all features to Transaction just like Table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we want to introduce TransactionType from the Java impl as well? If would be good if we can add this to make it clear.

public:
BaseTransaction(std::shared_ptr<const Table> table, std::shared_ptr<Catalog> catalog);
~BaseTransaction() override = default;

const std::shared_ptr<const Table>& table() const override;

std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() override;

std::unique_ptr<AppendFiles> NewAppend() override;
Comment on lines +41 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() override;
std::unique_ptr<AppendFiles> NewAppend() override;
std::shared_ptr<UpdateProperties> NewUpdateProperties() override;
std::shared_ptr<AppendFiles> NewAppend() override;

It seems that returning shared_ptr will make it easier to collect and transform the update internally. Adding New prefix also can avoid iceberg:: prefix in the return type.


Status CommitTransaction() override;

Result<std::unique_ptr<Table>> StageUpdates(
const TableIdentifier& identifier,
std::vector<std::unique_ptr<TableRequirement>> requirements,
std::vector<std::unique_ptr<TableUpdate>> updates);

bool HasLastOperationCommitted() const { return context_.last_operation_committed; }

void SetLastOperationCommitted(bool committed) {
context_.last_operation_committed = committed;
}

const std::shared_ptr<TableMetadata>& CurrentMetadata() const {
return context_.current_metadata;
}

Status ApplyUpdates(const std::vector<std::unique_ptr<TableUpdate>>& updates);

std::vector<std::unique_ptr<TableRequirement>> ConsumePendingRequirements();

std::vector<std::unique_ptr<TableUpdate>> ConsumePendingUpdates();

protected:
template <typename UpdateType, typename... Args>
Result<std::unique_ptr<UpdateType>> CheckAndCreateUpdate(Args&&... args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is over-complicated. Let's switch to std::shared_ptr for these update types.

if (!HasLastOperationCommitted()) {
return InvalidState(
"Cannot create new update: last operation in transaction has not committed");
}
SetLastOperationCommitted(false);
return std::make_unique<UpdateType>(std::forward<Args>(args)...);
}

private:
struct TransactionContext {
TransactionContext() = default;
TransactionContext(TableIdentifier identifier,
std::shared_ptr<TableMetadata> metadata)
: identifier(std::move(identifier)), current_metadata(std::move(metadata)) {}

TransactionContext(const TransactionContext&) = delete;
TransactionContext& operator=(const TransactionContext&) = delete;
TransactionContext(TransactionContext&&) noexcept = default;
TransactionContext& operator=(TransactionContext&&) noexcept = default;

bool last_operation_committed = true;
TableIdentifier identifier;
std::shared_ptr<TableMetadata> current_metadata;
std::vector<std::unique_ptr<TableRequirement>> pending_requirements;
std::vector<std::unique_ptr<TableUpdate>> pending_updates;
};

std::shared_ptr<const Table> table_;
std::shared_ptr<TransactionCatalog> catalog_;
TransactionContext context_;
};

} // namespace iceberg
9 changes: 7 additions & 2 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ class ICEBERG_EXPORT Catalog {
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;
std::vector<std::unique_ptr<TableRequirement>> requirements,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert this change? If later we support retry on failed commits, these requirements and updates cannot be reused since they are moved away.

std::vector<std::unique_ptr<TableUpdate>> updates) = 0;

/// \brief Start a transaction to create a table
///
Expand Down Expand Up @@ -184,6 +184,11 @@ class ICEBERG_EXPORT Catalog {
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;

/// \brief Set whether the last operation in a transaction has been committed
///
/// \param committed true if the last operation has been committed, false otherwise
virtual void SetLastOperationCommitted(bool committed) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used by transaction catalog so it should not appear here.

};

} // namespace iceberg
4 changes: 2 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(

Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::vector<std::unique_ptr<TableRequirement>> requirements,
std::vector<std::unique_ptr<TableUpdate>> updates) {
return NotImplemented("update table");
}

Expand Down
8 changes: 6 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <mutex>

#include "iceberg/catalog.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"

namespace iceberg {

Expand Down Expand Up @@ -77,8 +79,8 @@ class ICEBERG_EXPORT InMemoryCatalog

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
std::vector<std::unique_ptr<TableRequirement>> requirements,
std::vector<std::unique_ptr<TableUpdate>> updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
Expand All @@ -97,6 +99,8 @@ class ICEBERG_EXPORT InMemoryCatalog
const TableIdentifier& identifier,
const std::string& metadata_file_location) override;

void SetLastOperationCommitted(bool committed) override {}

private:
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ Result<std::unique_ptr<Table>> RestCatalog::CreateTable(

Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
[[maybe_unused]] std::vector<std::unique_ptr<TableRequirement>> requirements,
[[maybe_unused]] std::vector<std::unique_ptr<TableUpdate>> updates) {
return NotImplemented("Not implemented");
}

Expand Down
8 changes: 6 additions & 2 deletions src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"

/// \file iceberg/catalog/rest/rest_catalog.h
/// RestCatalog implementation for Iceberg REST API.
Expand Down Expand Up @@ -76,8 +78,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
std::vector<std::unique_ptr<TableRequirement>> requirements,
std::vector<std::unique_ptr<TableUpdate>> updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
Expand All @@ -96,6 +98,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
const TableIdentifier& identifier,
const std::string& metadata_file_location) override;

void SetLastOperationCommitted(bool committed) override {}

private:
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::unique_ptr<ResourcePaths> paths);
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ configure_file(
iceberg_include_dir = include_directories('..')
iceberg_sources = files(
'arrow_c_data_guard_internal.cc',
'base_transaction.cc',
'catalog/memory/in_memory_catalog.cc',
'expression/aggregate.cc',
'expression/binder.cc',
Expand Down Expand Up @@ -90,6 +91,7 @@ iceberg_sources = files(
'table_requirements.cc',
'table_scan.cc',
'table_update.cc',
'transaction_catalog.cc',
'transform.cc',
'transform_function.cc',
'type.cc',
Expand Down
Loading
Loading