diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index ecac6ff998..798181c1df 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -568,11 +568,11 @@ TxnTimeStamp Txn::Commit() { txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment // LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts)); - if (txn_mgr_->CheckTxnConflict(this)) { - LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts)); + if (const auto conflict_reason = txn_mgr_->CheckTxnConflict(this); conflict_reason) { + LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}. Txn conflict reason: {}.", txn_id_, commit_ts, *conflict_reason)); wal_entry_ = nullptr; txn_mgr_->SendToWAL(this); - RecoverableError(Status::TxnConflict(txn_id_, "Txn conflict reason.")); + RecoverableError(Status::TxnConflict(txn_id_, fmt::format("Txn conflict reason: {}.", *conflict_reason))); } // Put wal entry to the manager in the same order as commit_ts. @@ -595,7 +595,7 @@ TxnTimeStamp Txn::Commit() { bool Txn::CheckConflict() { return txn_store_.CheckConflict(catalog_); } -bool Txn::CheckConflict(Txn *other_txn) { +Optional Txn::CheckConflict(Txn *other_txn) { LOG_TRACE(fmt::format("Txn {} check conflict with {}.", txn_id_, other_txn->txn_id_)); return txn_store_.CheckConflict(other_txn->txn_store_); diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index ea035da3e7..6179517531 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -92,7 +92,7 @@ public: bool CheckConflict(); - bool CheckConflict(Txn *txn); + Optional CheckConflict(Txn *txn); void CommitBottom(); diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index e81863ac3d..a2bad7f40c 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -136,7 +136,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) { return commit_ts; } -bool TxnManager::CheckTxnConflict(Txn *txn) { +Optional TxnManager::CheckTxnConflict(Txn *txn) { TxnTimeStamp commit_ts = txn->CommitTS(); Vector> candidate_txns; TxnTimeStamp min_checking_ts = UNCOMMIT_TS; @@ -160,19 +160,19 @@ bool TxnManager::CheckTxnConflict(Txn *txn) { } }); if (txn->CheckConflict()) { - return true; + return "Conflict in txn->CheckConflict()"; } - for (SharedPtr &candidate_txn : candidate_txns) { + for (const auto &candidate_txn : candidate_txns) { // LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})", // txn->TxnID(), // txn->CommitTS(), // candidate_txn->TxnID(), // candidate_txn->CommitTS())); - if (txn->CheckConflict(candidate_txn.get())) { - return true; + if (const auto conflict_reason = txn->CheckConflict(candidate_txn.get()); conflict_reason) { + return fmt::format("Conflict with candidate_txn {}: {}", candidate_txn->TxnID(), *conflict_reason); } } - return false; + return None; } void TxnManager::SendToWAL(Txn *txn) { diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index 45469902e3..4acf0ee236 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -58,7 +58,7 @@ public: TxnTimeStamp GetWriteCommitTS(Txn *txn); - bool CheckTxnConflict(Txn *txn); + Optional CheckTxnConflict(Txn *txn); void SendToWAL(Txn *txn); diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 4ef4fdb462..f32c784053 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -14,6 +14,7 @@ module; +#include #include #include @@ -357,11 +358,42 @@ bool TxnTableStore::CheckConflict(Catalog *catalog, Txn *txn) const { return false; } -bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const { - for (const auto &[index_name, _] : txn_indexes_store_) { - for (const auto [index_entry, _] : other_table_store->txn_indexes_) { - if (index_name == *index_entry->GetIndexName()) { - return true; +Optional TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const { + { + Set other_txn_indexes_set; + Map> other_txn_indexes_store_map; + for (const auto &index_entry : std::views::keys(other_table_store->txn_indexes_)) { + other_txn_indexes_set.insert(*index_entry->GetIndexName()); + } + for (const auto &[index_name, index_store] : other_table_store->txn_indexes_store_) { + auto &segment_set = other_txn_indexes_store_map[index_name]; + for (const auto segment_id : std::views::keys(index_store->index_entry_map_)) { + segment_set.insert(segment_id); + } + } + for (const auto &index_entry : std::views::keys(txn_indexes_)) { + if (const auto &index_name = *index_entry->GetIndexName(); other_txn_indexes_set.contains(index_name)) { + return fmt::format("{}: txn_indexes_ containing Index {} conflict with other_table_store->txn_indexes_", + __func__, + index_name); + } + } + for (const auto &[index_name, index_store] : txn_indexes_store_) { + if (other_txn_indexes_set.contains(index_name)) { + return fmt::format("{}: txn_indexes_store_ containing Index {} conflict with other_table_store->txn_indexes_", + __func__, + index_name); + } + if (const auto iter = other_txn_indexes_store_map.find(index_name); iter != other_txn_indexes_store_map.end()) { + for (const auto &other_segment_set = iter->second; const auto segment_id : std::views::keys(index_store->index_entry_map_)) { + if (other_segment_set.contains(segment_id)) { + return fmt::format( + "{}: txn_indexes_store_ containing Index {} Segment {} conflict with other_table_store->txn_indexes_store_", + __func__, + index_name, + segment_id); + } + } } } } @@ -369,7 +401,7 @@ bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const const auto &delete_state = delete_state_; const auto &other_delete_state = other_table_store->delete_state_; if (delete_state.rows_.empty() || other_delete_state.rows_.empty()) { - return false; + return None; } for (const auto &[segment_id, block_map] : delete_state.rows_) { auto other_iter = other_delete_state.rows_.find(segment_id); @@ -391,12 +423,12 @@ bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const break; } if (other_block_offsets[j] == block_offset) { - return true; + return fmt::format("Delete conflict: segment_id: {}, block_id: {}, block_offset: {}", segment_id, block_id, block_offset); } } } } - return false; + return None; } void TxnTableStore::PrepareCommit1(const Vector &segment_infos) const { @@ -635,11 +667,11 @@ bool TxnStore::CheckConflict(Catalog *catalog) { return false; } -bool TxnStore::CheckConflict(const TxnStore &other_txn_store) { +Optional TxnStore::CheckConflict(const TxnStore &other_txn_store) { for (const auto &[table_name, table_store] : txn_tables_store_) { for (const auto [table_entry, _] : other_txn_store.txn_tables_) { if (table_name == *table_entry->GetTableName()) { - return true; + return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_", table_name); } } @@ -649,11 +681,13 @@ bool TxnStore::CheckConflict(const TxnStore &other_txn_store) { } const TxnTableStore *other_table_store = other_iter->second.get(); - if (table_store->CheckConflict(other_table_store)) { - return true; + if (const auto conflict_reason = table_store->CheckConflict(other_table_store); conflict_reason) { + return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_store_: {}", + table_name, + *conflict_reason); } } - return false; + return None; } void TxnStore::PrepareCommit1() { diff --git a/src/storage/txn/txn_store.cppm b/src/storage/txn/txn_store.cppm index 3b6d860e5c..c06acd4e22 100644 --- a/src/storage/txn/txn_store.cppm +++ b/src/storage/txn/txn_store.cppm @@ -140,7 +140,7 @@ public: bool CheckConflict(Catalog *catalog, Txn *txn) const; - bool CheckConflict(const TxnTableStore *txn_table_store) const; + Optional CheckConflict(const TxnTableStore *txn_table_store) const; void PrepareCommit1(const Vector &segment_infos) const; @@ -241,7 +241,7 @@ public: bool CheckConflict(Catalog *catalog); - bool CheckConflict(const TxnStore &txn_store); + Optional CheckConflict(const TxnStore &txn_store); void PrepareCommit1();