Skip to content

Commit

Permalink
update error message
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzq50 committed Jan 2, 2025
1 parent e295d00 commit 85ec171
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 27 deletions.
8 changes: 4 additions & 4 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,11 @@ TxnTimeStamp Txn::Commit() {
txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->CheckTxnConflict(this)) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts));
if (const auto conflict_reason = txn_mgr_->CheckTxnConflict(this); conflict_reason) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}. Txn conflict reason: {}.", txn_id_, commit_ts, *conflict_reason));
wal_entry_ = nullptr;
txn_mgr_->SendToWAL(this);
RecoverableError(Status::TxnConflict(txn_id_, "Txn conflict reason."));
RecoverableError(Status::TxnConflict(txn_id_, fmt::format("Txn conflict reason: {}.", *conflict_reason)));
}

// Put wal entry to the manager in the same order as commit_ts.
Expand All @@ -595,7 +595,7 @@ TxnTimeStamp Txn::Commit() {

bool Txn::CheckConflict() { return txn_store_.CheckConflict(catalog_); }

bool Txn::CheckConflict(Txn *other_txn) {
Optional<String> Txn::CheckConflict(Txn *other_txn) {
LOG_TRACE(fmt::format("Txn {} check conflict with {}.", txn_id_, other_txn->txn_id_));

return txn_store_.CheckConflict(other_txn->txn_store_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public:

bool CheckConflict();

bool CheckConflict(Txn *txn);
Optional<String> CheckConflict(Txn *txn);

void CommitBottom();

Expand Down
12 changes: 6 additions & 6 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
return commit_ts;
}

bool TxnManager::CheckTxnConflict(Txn *txn) {
Optional<String> TxnManager::CheckTxnConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Expand All @@ -160,19 +160,19 @@ bool TxnManager::CheckTxnConflict(Txn *txn) {
}
});
if (txn->CheckConflict()) {
return true;
return "Conflict in txn->CheckConflict()";
}
for (SharedPtr<Txn> &candidate_txn : candidate_txns) {
for (const auto &candidate_txn : candidate_txns) {
// LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})",
// txn->TxnID(),
// txn->CommitTS(),
// candidate_txn->TxnID(),
// candidate_txn->CommitTS()));
if (txn->CheckConflict(candidate_txn.get())) {
return true;
if (const auto conflict_reason = txn->CheckConflict(candidate_txn.get()); conflict_reason) {
return fmt::format("Conflict with candidate_txn {}: {}", candidate_txn->TxnID(), *conflict_reason);
}
}
return false;
return None;
}

void TxnManager::SendToWAL(Txn *txn) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:

TxnTimeStamp GetWriteCommitTS(Txn *txn);

bool CheckTxnConflict(Txn *txn);
Optional<String> CheckTxnConflict(Txn *txn);

void SendToWAL(Txn *txn);

Expand Down
60 changes: 47 additions & 13 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

module;

#include <ranges>
#include <string>
#include <vector>

Expand Down Expand Up @@ -357,19 +358,50 @@ bool TxnTableStore::CheckConflict(Catalog *catalog, Txn *txn) const {
return false;
}

bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
for (const auto &[index_name, _] : txn_indexes_store_) {
for (const auto [index_entry, _] : other_table_store->txn_indexes_) {
if (index_name == *index_entry->GetIndexName()) {
return true;
Optional<String> TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
{
Set<std::string_view> other_txn_indexes_set;
Map<std::string_view, Set<SegmentID>> other_txn_indexes_store_map;
for (const auto &index_entry : std::views::keys(other_table_store->txn_indexes_)) {
other_txn_indexes_set.insert(*index_entry->GetIndexName());
}
for (const auto &[index_name, index_store] : other_table_store->txn_indexes_store_) {
auto &segment_set = other_txn_indexes_store_map[index_name];
for (const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
segment_set.insert(segment_id);
}
}
for (const auto &index_entry : std::views::keys(txn_indexes_)) {
if (const auto &index_name = *index_entry->GetIndexName(); other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
}
for (const auto &[index_name, index_store] : txn_indexes_store_) {
if (other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_store_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
if (const auto iter = other_txn_indexes_store_map.find(index_name); iter != other_txn_indexes_store_map.end()) {
for (const auto &other_segment_set = iter->second; const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
if (other_segment_set.contains(segment_id)) {
return fmt::format(
"{}: txn_indexes_store_ containing Index {} Segment {} conflict with other_table_store->txn_indexes_store_",
__func__,
index_name,
segment_id);
}
}
}
}
}

const auto &delete_state = delete_state_;
const auto &other_delete_state = other_table_store->delete_state_;
if (delete_state.rows_.empty() || other_delete_state.rows_.empty()) {
return false;
return None;
}
for (const auto &[segment_id, block_map] : delete_state.rows_) {
auto other_iter = other_delete_state.rows_.find(segment_id);
Expand All @@ -391,12 +423,12 @@ bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const
break;
}
if (other_block_offsets[j] == block_offset) {
return true;
return fmt::format("Delete conflict: segment_id: {}, block_id: {}, block_offset: {}", segment_id, block_id, block_offset);
}
}
}
}
return false;
return None;
}

void TxnTableStore::PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const {
Expand Down Expand Up @@ -635,11 +667,11 @@ bool TxnStore::CheckConflict(Catalog *catalog) {
return false;
}

bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
Optional<String> TxnStore::CheckConflict(const TxnStore &other_txn_store) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
for (const auto [table_entry, _] : other_txn_store.txn_tables_) {
if (table_name == *table_entry->GetTableName()) {
return true;
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_", table_name);
}
}

Expand All @@ -649,11 +681,13 @@ bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
}

const TxnTableStore *other_table_store = other_iter->second.get();
if (table_store->CheckConflict(other_table_store)) {
return true;
if (const auto conflict_reason = table_store->CheckConflict(other_table_store); conflict_reason) {
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_store_: {}",
table_name,
*conflict_reason);
}
}
return false;
return None;
}

void TxnStore::PrepareCommit1() {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public:

bool CheckConflict(Catalog *catalog, Txn *txn) const;

bool CheckConflict(const TxnTableStore *txn_table_store) const;
Optional<String> CheckConflict(const TxnTableStore *txn_table_store) const;

void PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const;

Expand Down Expand Up @@ -241,7 +241,7 @@ public:

bool CheckConflict(Catalog *catalog);

bool CheckConflict(const TxnStore &txn_store);
Optional<String> CheckConflict(const TxnStore &txn_store);

void PrepareCommit1();

Expand Down

0 comments on commit 85ec171

Please sign in to comment.