Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UnrecoverableError: Invalid MergeFlag: 1 #2416

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ void SegmentIndexEntry::ReplaceChunkIndexEntries(TxnTableStore *txn_table_store,
ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_table_store, SegmentEntry *segment_entry) {
const auto &index_name = *table_index_entry_->GetIndexName();
Txn *txn = txn_table_store->GetTxn();
if (!TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
Expand Down Expand Up @@ -899,6 +900,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
return nullptr;
}
}
add_segment_optimizing();
RowID base_rowid(segment_id_, 0);
SharedPtr<ChunkIndexEntry> merged_chunk_index_entry = nullptr;
switch (index_base->index_type_) {
Expand Down Expand Up @@ -1225,17 +1227,18 @@ UniquePtr<SegmentIndexEntry> SegmentIndexEntry::Deserialize(const nlohmann::json
return segment_index_entry;
}

bool SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
Pair<bool, std::function<void()>> SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
bool expected = false;
bool success = optimizing_.compare_exchange_strong(expected, true);
if (!success) {
return false;
return {false, nullptr};
}
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
return true;
return {true, [this, txn] {
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
}};
}

void SegmentIndexEntry::ResetOptimizing() { optimizing_.store(false); }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private:
Atomic<TxnTimeStamp> deprecate_ts_ = UNCOMMIT_TS;

public:
bool TrySetOptimizing(Txn *txn);
Pair<bool, std::function<void()>> TrySetOptimizing(Txn *txn);

void ResetOptimizing();

Expand Down
5 changes: 3 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ void TableEntry::OptimizeIndex(Txn *txn) {
const IndexFullText *index_fulltext = static_cast<const IndexFullText *>(index_base);
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = table_index_entry->GetIndexBySegmentSnapshot(this, txn);
for (auto &[segment_id, segment_index_entry] : index_by_segment) {
if (!segment_index_entry->TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = segment_index_entry->TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id));
continue;
}
Expand All @@ -937,7 +938,7 @@ void TableEntry::OptimizeIndex(Txn *txn) {
opt_success = true;
continue;
}

add_segment_optimizing();
String msg = fmt::format("merging {}", index_name);
Vector<String> base_names;
Vector<RowID> base_rowids;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,11 @@ TxnTimeStamp Txn::Commit() {
txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->CheckTxnConflict(this)) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts));
if (const auto conflict_reason = txn_mgr_->CheckTxnConflict(this); conflict_reason) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}. Txn conflict reason: {}.", txn_id_, commit_ts, *conflict_reason));
wal_entry_ = nullptr;
txn_mgr_->SendToWAL(this);
RecoverableError(Status::TxnConflict(txn_id_, "Txn conflict reason."));
RecoverableError(Status::TxnConflict(txn_id_, fmt::format("Txn conflict reason: {}.", *conflict_reason)));
}

// Put wal entry to the manager in the same order as commit_ts.
Expand All @@ -595,7 +595,7 @@ TxnTimeStamp Txn::Commit() {

bool Txn::CheckConflict() { return txn_store_.CheckConflict(catalog_); }

bool Txn::CheckConflict(Txn *other_txn) {
Optional<String> Txn::CheckConflict(Txn *other_txn) {
LOG_TRACE(fmt::format("Txn {} check conflict with {}.", txn_id_, other_txn->txn_id_));

return txn_store_.CheckConflict(other_txn->txn_store_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public:

bool CheckConflict();

bool CheckConflict(Txn *txn);
Optional<String> CheckConflict(Txn *txn);

void CommitBottom();

Expand Down
12 changes: 6 additions & 6 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
return commit_ts;
}

bool TxnManager::CheckTxnConflict(Txn *txn) {
Optional<String> TxnManager::CheckTxnConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Expand All @@ -160,19 +160,19 @@ bool TxnManager::CheckTxnConflict(Txn *txn) {
}
});
if (txn->CheckConflict()) {
return true;
return "Conflict in txn->CheckConflict()";
}
for (SharedPtr<Txn> &candidate_txn : candidate_txns) {
for (const auto &candidate_txn : candidate_txns) {
// LOG_INFO(fmt::format("Txn {}(commit_ts: {}) check conflict with txn {}(commit_ts: {})",
// txn->TxnID(),
// txn->CommitTS(),
// candidate_txn->TxnID(),
// candidate_txn->CommitTS()));
if (txn->CheckConflict(candidate_txn.get())) {
return true;
if (const auto conflict_reason = txn->CheckConflict(candidate_txn.get()); conflict_reason) {
return fmt::format("Conflict with candidate_txn {}: {}", candidate_txn->TxnID(), *conflict_reason);
}
}
return false;
return None;
}

void TxnManager::SendToWAL(Txn *txn) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:

TxnTimeStamp GetWriteCommitTS(Txn *txn);

bool CheckTxnConflict(Txn *txn);
Optional<String> CheckTxnConflict(Txn *txn);

void SendToWAL(Txn *txn);

Expand Down
60 changes: 47 additions & 13 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

module;

#include <ranges>
#include <string>
#include <vector>

Expand Down Expand Up @@ -357,19 +358,50 @@ bool TxnTableStore::CheckConflict(Catalog *catalog, Txn *txn) const {
return false;
}

bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
for (const auto &[index_name, _] : txn_indexes_store_) {
for (const auto [index_entry, _] : other_table_store->txn_indexes_) {
if (index_name == *index_entry->GetIndexName()) {
return true;
Optional<String> TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const {
{
Set<std::string_view> other_txn_indexes_set;
Map<std::string_view, Set<SegmentID>> other_txn_indexes_store_map;
for (const auto &index_entry : std::views::keys(other_table_store->txn_indexes_)) {
other_txn_indexes_set.insert(*index_entry->GetIndexName());
}
for (const auto &[index_name, index_store] : other_table_store->txn_indexes_store_) {
auto &segment_set = other_txn_indexes_store_map[index_name];
for (const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
segment_set.insert(segment_id);
}
}
for (const auto &index_entry : std::views::keys(txn_indexes_)) {
if (const auto &index_name = *index_entry->GetIndexName(); other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
}
for (const auto &[index_name, index_store] : txn_indexes_store_) {
if (other_txn_indexes_set.contains(index_name)) {
return fmt::format("{}: txn_indexes_store_ containing Index {} conflict with other_table_store->txn_indexes_",
__func__,
index_name);
}
if (const auto iter = other_txn_indexes_store_map.find(index_name); iter != other_txn_indexes_store_map.end()) {
for (const auto &other_segment_set = iter->second; const auto segment_id : std::views::keys(index_store->index_entry_map_)) {
if (other_segment_set.contains(segment_id)) {
return fmt::format(
"{}: txn_indexes_store_ containing Index {} Segment {} conflict with other_table_store->txn_indexes_store_",
__func__,
index_name,
segment_id);
}
}
}
}
}

const auto &delete_state = delete_state_;
const auto &other_delete_state = other_table_store->delete_state_;
if (delete_state.rows_.empty() || other_delete_state.rows_.empty()) {
return false;
return None;
}
for (const auto &[segment_id, block_map] : delete_state.rows_) {
auto other_iter = other_delete_state.rows_.find(segment_id);
Expand All @@ -391,12 +423,12 @@ bool TxnTableStore::CheckConflict(const TxnTableStore *other_table_store) const
break;
}
if (other_block_offsets[j] == block_offset) {
return true;
return fmt::format("Delete conflict: segment_id: {}, block_id: {}, block_offset: {}", segment_id, block_id, block_offset);
}
}
}
}
return false;
return None;
}

void TxnTableStore::PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const {
Expand Down Expand Up @@ -635,11 +667,11 @@ bool TxnStore::CheckConflict(Catalog *catalog) {
return false;
}

bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
Optional<String> TxnStore::CheckConflict(const TxnStore &other_txn_store) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
for (const auto [table_entry, _] : other_txn_store.txn_tables_) {
if (table_name == *table_entry->GetTableName()) {
return true;
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_", table_name);
}
}

Expand All @@ -649,11 +681,13 @@ bool TxnStore::CheckConflict(const TxnStore &other_txn_store) {
}

const TxnTableStore *other_table_store = other_iter->second.get();
if (table_store->CheckConflict(other_table_store)) {
return true;
if (const auto conflict_reason = table_store->CheckConflict(other_table_store); conflict_reason) {
return fmt::format("txn_tables_store_ containing table_name {} conflict with other_txn_store.txn_tables_store_: {}",
table_name,
*conflict_reason);
}
}
return false;
return None;
}

void TxnStore::PrepareCommit1() {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public:

bool CheckConflict(Catalog *catalog, Txn *txn) const;

bool CheckConflict(const TxnTableStore *txn_table_store) const;
Optional<String> CheckConflict(const TxnTableStore *txn_table_store) const;

void PrepareCommit1(const Vector<WalSegmentInfo *> &segment_infos) const;

Expand Down Expand Up @@ -241,7 +241,7 @@ public:

bool CheckConflict(Catalog *catalog);

bool CheckConflict(const TxnStore &txn_store);
Optional<String> CheckConflict(const TxnStore &txn_store);

void PrepareCommit1();

Expand Down