Skip to content

Commit

Permalink
update TrySetOptimizing
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzq50 committed Jan 2, 2025
1 parent 85ec171 commit 3c7ddfc
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
19 changes: 11 additions & 8 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,8 @@ void SegmentIndexEntry::ReplaceChunkIndexEntries(TxnTableStore *txn_table_store,
ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_table_store, SegmentEntry *segment_entry) {
const auto &index_name = *table_index_entry_->GetIndexName();
Txn *txn = txn_table_store->GetTxn();
if (!TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id_));
return nullptr;
}
Expand Down Expand Up @@ -899,6 +900,7 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_
return nullptr;
}
}
add_segment_optimizing();
RowID base_rowid(segment_id_, 0);
SharedPtr<ChunkIndexEntry> merged_chunk_index_entry = nullptr;
switch (index_base->index_type_) {
Expand Down Expand Up @@ -1225,17 +1227,18 @@ UniquePtr<SegmentIndexEntry> SegmentIndexEntry::Deserialize(const nlohmann::json
return segment_index_entry;
}

bool SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
Pair<bool, std::function<void()>> SegmentIndexEntry::TrySetOptimizing(Txn *txn) {
bool expected = false;
bool success = optimizing_.compare_exchange_strong(expected, true);
if (!success) {
return false;
return {false, nullptr};
}
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
return true;
return {true, [this, txn] {
TableEntry *table_entry = table_index_entry_->table_index_meta()->GetTableEntry();
TxnTableStore *txn_table_store = txn->txn_store()->GetTxnTableStore(table_entry);
TxnIndexStore *txn_index_store = txn_table_store->GetIndexStore(table_index_entry_);
txn_index_store->AddSegmentOptimizing(this);
}};
}

void SegmentIndexEntry::ResetOptimizing() { optimizing_.store(false); }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private:
Atomic<TxnTimeStamp> deprecate_ts_ = UNCOMMIT_TS;

public:
bool TrySetOptimizing(Txn *txn);
Pair<bool, std::function<void()>> TrySetOptimizing(Txn *txn);

void ResetOptimizing();

Expand Down
5 changes: 3 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ void TableEntry::OptimizeIndex(Txn *txn) {
const IndexFullText *index_fulltext = static_cast<const IndexFullText *>(index_base);
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment = table_index_entry->GetIndexBySegmentSnapshot(this, txn);
for (auto &[segment_id, segment_index_entry] : index_by_segment) {
if (!segment_index_entry->TrySetOptimizing(txn)) {
const auto [result_b, add_segment_optimizing] = segment_index_entry->TrySetOptimizing(txn);
if (!result_b) {
LOG_INFO(fmt::format("Index {} segment {} is optimizing, skip optimize.", index_name, segment_id));
continue;
}
Expand All @@ -937,7 +938,7 @@ void TableEntry::OptimizeIndex(Txn *txn) {
opt_success = true;
continue;
}

add_segment_optimizing();
String msg = fmt::format("merging {}", index_name);
Vector<String> base_names;
Vector<RowID> base_rowids;
Expand Down

0 comments on commit 3c7ddfc

Please sign in to comment.