Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, i
_node_channel_tracker = std::make_shared<MemTracker>(
fmt::format("NodeChannel:indexID={}:threadId={}",
std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id()));
_load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100;
}

VNodeChannel::~VNodeChannel() = default;
Expand Down Expand Up @@ -747,8 +748,13 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload)
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close().
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
auto current_load_mem_value = MemoryProfile::load_current_usage();
bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
current_load_mem_value > _load_mem_limit ||
_pending_batches_bytes > _max_pending_batches_bytes;
while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 &&
_pending_batches_bytes > _max_pending_batches_bytes) {
mem_limit_exceeded) {
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ class VNodeChannel {
std::string _name;

std::shared_ptr<MemTracker> _node_channel_tracker;
int64_t _load_mem_limit = -1;

TupleDescriptor* _tuple_desc = nullptr;
NodeInfo _node_info;
Expand Down
Loading