From 47783b5a98fced67301b411c1e04ade6c757865a Mon Sep 17 00:00:00 2001 From: hui lai Date: Thu, 4 Dec 2025 14:44:10 +0800 Subject: [PATCH] [enhance](memory) back pressure writing when memory usage is high in sink operation (#58530) ### What problem does this PR solve? Now, each VNodeChannel limit `_pending_batches_bytes` by config `nodechannel_pending_queue_max_bytes`, but it will still cause excessive memory usage on too many BE nodes or high concurrency. This pr introduce back pressure writing when memory usage is high in `VTabletWriter` to solve the issue. --- be/src/vec/sink/writer/vtablet_writer.cpp | 8 +++++++- be/src/vec/sink/writer/vtablet_writer.h | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 62531151f82209..b4a76da9f9b725 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -509,6 +509,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i _node_channel_tracker = std::make_shared( fmt::format("NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id())); + _load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100; } VNodeChannel::~VNodeChannel() = default; @@ -747,8 +748,13 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close(). + bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); + auto current_load_mem_value = MemoryProfile::load_current_usage(); + bool mem_limit_exceeded = is_exceed_soft_mem_limit || + current_load_mem_value > _load_mem_limit || + _pending_batches_bytes > _max_pending_batches_bytes; while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 && - _pending_batches_bytes > _max_pending_batches_bytes) { + mem_limit_exceeded) { SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index ba1c0c505d6230..039dd10a2c7551 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -357,6 +357,7 @@ class VNodeChannel { std::string _name; std::shared_ptr _node_channel_tracker; + int64_t _load_mem_limit = -1; TupleDescriptor* _tuple_desc = nullptr; NodeInfo _node_info;