Skip to content

Commit 496cecd

Browse files
wwq2333wangwenqi
andauthored
[Store|TransferEngine]: use condition-variable based completion instead of busy-polling (#1053)
* (Event-driven completion) Provide an option to use condition-variable based completion instead of busy-polling Summary - Add USE_EVENT_DRIVEN_COMPLETION compile-time option in common.cmake (default OFF). - Store: TransferEngineOperationState waits on BatchDesc::completion_cv with timeout; falls back to original polling when the flag is OFF. - TransferEngine: add per-batch finished_task_count, has_failure, is_finished and completion_cv on BatchDesc; notify on last task completion; unify __atomic_* usage. - No behavior change when USE_EVENT_DRIVEN_COMPLETION is OFF (default). --------- Co-authored-by: wangwenqi <[email protected]>
1 parent 5f016f4 commit 496cecd

File tree

3 files changed

+157
-7
lines changed

3 files changed

+157
-7
lines changed

mooncake-common/common.cmake

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ option(WITH_RUST_EXAMPLE "build the Rust interface and sample code for the trans
7474
option(WITH_METRICS "enable metrics and metrics reporting thread" ON)
7575
option(USE_3FS "option for using 3FS storage backend" OFF)
7676
option(WITH_NVIDIA_PEERMEM "disable to support RDMA without nvidia-peermem. If WITH_NVIDIA_PEERMEM=OFF then USE_CUDA=ON is required." ON)
77+
option(USE_EVENT_DRIVEN_COMPLETION "option for using event-driven completion (store & transfer engine)" OFF)
7778

7879
option(USE_LRU_MASTER "option for using LRU in master service" OFF)
7980
set(LRU_MAX_CAPACITY 1000)
@@ -83,6 +84,12 @@ if (USE_LRU_MASTER)
8384
add_compile_definitions(LRU_MAX_CAPACITY)
8485
endif()
8586

87+
if (USE_EVENT_DRIVEN_COMPLETION)
88+
add_compile_definitions(USE_EVENT_DRIVEN_COMPLETION)
89+
message(STATUS "Event-driven completion is enabled")
90+
else()
91+
message(STATUS "Event-driven completion is disabled")
92+
endif()
8693

8794
if (USE_NVMEOF)
8895
set(USE_CUDA ON)

mooncake-store/src/transfer_task.cpp

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <algorithm>
66
#include <cstdlib>
77
#include "transfer_engine.h"
8+
#include "transport/transport.h"
89

910
namespace mooncake {
1011

@@ -286,19 +287,64 @@ void TransferEngineOperationState::set_result_internal(ErrorCode error_code) {
286287
VLOG(1) << "Setting transfer result for batch " << batch_id_ << " to "
287288
<< static_cast<int>(error_code);
288289
result_.emplace(error_code);
289-
290-
cv_.notify_all();
291290
}
292291

293292
void TransferEngineOperationState::wait_for_completion() {
294293
if (is_completed()) {
295294
return;
296295
}
297296

298-
VLOG(1) << "Starting transfer engine polling for batch " << batch_id_;
299297
constexpr int64_t timeout_seconds = 60;
300-
constexpr int64_t kOneSecondInNano = 1000 * 1000 * 1000;
301298

299+
#ifdef USE_EVENT_DRIVEN_COMPLETION
300+
VLOG(1) << "Waiting for transfer engine completion for batch " << batch_id_;
301+
302+
// Wait directly on BatchDesc's condition variable.
303+
auto& batch_desc = Transport::toBatchDesc(batch_id_);
304+
bool completed;
305+
bool failed = false;
306+
307+
// Fast path: if already finished, avoid taking the mutex and waiting.
308+
// Use acquire here to pair with the writer's release-store, because this
309+
// path may skip taking the mutex. It ensures all prior updates are visible.
310+
completed = batch_desc.is_finished.load(std::memory_order_acquire);
311+
if (!completed) {
312+
// Use the same mutex as the notifier when updating the predicate to
313+
// avoid missed notifications. The predicate is re-checked under the
314+
// lock. Under the mutex, relaxed is sufficient; the mutex acquire
315+
// orders prior writes.
316+
std::unique_lock<std::mutex> lock(batch_desc.completion_mutex);
317+
completed = batch_desc.completion_cv.wait_for(
318+
lock, std::chrono::seconds(timeout_seconds), [&batch_desc] {
319+
return batch_desc.is_finished.load(std::memory_order_relaxed);
320+
});
321+
} // Explicitly release completion_mutex before acquiring mutex_
322+
323+
// Once completion is observed, read failure flag.
324+
if (completed) {
325+
failed = batch_desc.has_failure.load(std::memory_order_relaxed);
326+
}
327+
328+
ErrorCode error_code =
329+
completed ? (failed ? ErrorCode::TRANSFER_FAIL : ErrorCode::OK)
330+
: ErrorCode::TRANSFER_FAIL;
331+
332+
{
333+
std::lock_guard<std::mutex> lock(mutex_);
334+
set_result_internal(error_code);
335+
}
336+
337+
if (completed) {
338+
VLOG(1) << "Transfer engine operation completed for batch " << batch_id_
339+
<< " with result: " << static_cast<int>(error_code);
340+
} else {
341+
LOG(ERROR) << "Failed to complete transfers after " << timeout_seconds
342+
<< " seconds for batch " << batch_id_;
343+
}
344+
#else
345+
VLOG(1) << "Starting transfer engine polling for batch " << batch_id_;
346+
347+
constexpr int64_t kOneSecondInNano = 1000 * 1000 * 1000;
302348
const int64_t start_ts = getCurrentTimeInNano();
303349

304350
while (true) {
@@ -322,6 +368,7 @@ void TransferEngineOperationState::wait_for_completion() {
322368
VLOG(1) << "Transfer engine operation still pending for batch "
323369
<< batch_id_;
324370
}
371+
#endif
325372
}
326373

327374
// ============================================================================

mooncake-transfer-engine/include/transport/transport.h

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
#include <memory>
2727
#include <queue>
2828
#include <string>
29+
#include <atomic>
30+
#include <functional>
31+
#include <mutex>
32+
#include <condition_variable>
2933

3034
#include "common/base/status.h"
3135
#include "transfer_metadata.h"
@@ -76,8 +80,24 @@ class Transport {
7680
size_t transferred_bytes;
7781
};
7882

83+
struct BatchDesc;
7984
struct TransferTask;
8085

86+
// NOTE ABOUT BatchID → BatchDesc conversion:
87+
//
88+
// BatchID is an opaque 64‑bit unsigned integer that carries a
89+
// BatchDesc pointer value. For performance reasons, this helper
90+
// reinterprets the integral handle directly as a BatchDesc
91+
// reference.
92+
//
93+
// The conversion intentionally bypasses any map or lookup to
94+
// minimize overhead on hot paths. The caller must ensure that
95+
// the underlying BatchDesc object remains alive and valid for
96+
// as long as the handle is in use.
97+
static inline BatchDesc &toBatchDesc(BatchID id) {
98+
return *reinterpret_cast<BatchDesc *>(id);
99+
}
100+
81101
// Slice must be allocated on heap, as it will delete self on markSuccess
82102
// or markFailed.
83103
struct Slice {
@@ -128,16 +148,76 @@ class Transport {
128148
public:
129149
void markSuccess() {
130150
status = Slice::SUCCESS;
131-
__sync_fetch_and_add(&task->transferred_bytes, length);
132-
__sync_fetch_and_add(&task->success_slice_count, 1);
151+
__atomic_fetch_add(&task->transferred_bytes, length,
152+
__ATOMIC_RELAXED);
153+
__atomic_fetch_add(&task->success_slice_count, 1, __ATOMIC_RELAXED);
154+
155+
check_batch_completion(false);
133156
}
134157

135158
void markFailed() {
136159
status = Slice::FAILED;
137-
__sync_fetch_and_add(&task->failed_slice_count, 1);
160+
__atomic_fetch_add(&task->failed_slice_count, 1, __ATOMIC_RELAXED);
161+
162+
check_batch_completion(true);
138163
}
139164

140165
volatile int64_t ts;
166+
167+
private:
168+
inline void check_batch_completion(bool is_failed) {
169+
#ifdef USE_EVENT_DRIVEN_COMPLETION
170+
auto &batch_desc = toBatchDesc(task->batch_id);
171+
if (is_failed) {
172+
batch_desc.has_failure.store(true, std::memory_order_relaxed);
173+
}
174+
175+
// When the last slice of a task completes, check if the entire task
176+
// is done using a single atomic counter to avoid reading
177+
// inconsistent results.
178+
uint64_t prev_completed = __atomic_fetch_add(
179+
&task->completed_slice_count, 1, __ATOMIC_RELAXED);
180+
181+
// Only the thread completing the final slice will see prev+1 ==
182+
// slice_count.
183+
if (prev_completed + 1 == task->slice_count) {
184+
__atomic_store_n(&task->is_finished, true, __ATOMIC_RELAXED);
185+
186+
// Increment the number of finished tasks in the batch
187+
// (relaxed). This counter does not itself publish data; only
188+
// the thread that observes the last task completion performs
189+
// the release-store on batch_desc.is_finished below. The waiter
190+
// pairs this with an acquire load, which makes all prior writes
191+
// (including relaxed increments) visible.
192+
//
193+
// check if this is the last task in the batch
194+
auto prev = batch_desc.finished_task_count.fetch_add(
195+
1, std::memory_order_relaxed);
196+
197+
// Last task in the batch: wake up waiting thread directly
198+
if (prev + 1 == batch_desc.batch_size) {
199+
// Publish completion of the entire batch under the same
200+
// mutex used by the waiter to avoid lost notifications.
201+
//
202+
// Keep a release-store because the reader has a fast path
203+
// that may observe completion without taking the mutex. The
204+
// acquire load in that fast path pairs with this release to
205+
// make all prior updates visible. For the predicate checked
206+
// under the mutex, relaxed would suffice since the mutex
207+
// acquire provides the necessary visibility.
208+
{
209+
std::lock_guard<std::mutex> lock(
210+
batch_desc.completion_mutex);
211+
batch_desc.is_finished.store(true,
212+
std::memory_order_release);
213+
}
214+
// Notify after releasing the lock to avoid waking threads
215+
// only to block again on the mutex.
216+
batch_desc.completion_cv.notify_all();
217+
}
218+
}
219+
#endif
220+
}
141221
};
142222

143223
struct ThreadLocalSliceCache {
@@ -198,6 +278,10 @@ class Transport {
198278
uint64_t total_bytes = 0;
199279
BatchID batch_id = 0;
200280

281+
#ifdef USE_EVENT_DRIVEN_COMPLETION
282+
volatile uint64_t completed_slice_count = 0;
283+
#endif
284+
201285
// record the origin request
202286
#ifdef USE_ASCEND_HETEROGENEOUS
203287
// need to modify the request's source address, changing it from an NPU
@@ -220,6 +304,18 @@ class Transport {
220304
std::vector<TransferTask> task_list;
221305
void *context; // for transport implementers.
222306
int64_t start_timestamp;
307+
308+
#ifdef USE_EVENT_DRIVEN_COMPLETION
309+
// Event-driven completion: tracks batch progress and notifies waiters
310+
std::atomic<uint64_t> finished_task_count{0};
311+
std::atomic<bool> has_failure{false};
312+
std::atomic<bool> is_finished{
313+
false}; // Completion flag for wait predicate
314+
315+
// Synchronization primitives for direct notification
316+
std::mutex completion_mutex;
317+
std::condition_variable completion_cv;
318+
#endif
223319
};
224320

225321
public:

0 commit comments

Comments
 (0)