Skip to content

Commit ca962a5

Browse files
[Enhancement] add spilling capability for multi_cast_local_exchange operator (StarRocks#47982)
Signed-off-by: silverbullet233 <[email protected]>
1 parent 8242295 commit ca962a5

16 files changed

+1633
-1
lines changed

be/src/common/config.h

+1
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,7 @@ CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
10671067
CONF_mDouble(spill_max_dir_bytes_ratio, "0.8"); // 80%
10681068
// min bytes size of spill read buffer. if the buffer size is less than this value, we will disable buffer read
10691069
CONF_Int64(spill_read_buffer_min_bytes, "1048576");
1070+
CONF_mInt64(mem_limited_chunk_queue_block_size, "8388608");
10701071

10711072
CONF_Int32(internal_service_query_rpc_thread_num, "-1");
10721073

be/src/exec/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ set(EXEC_FILES
176176
pipeline/exchange/local_exchange_sink_operator.cpp
177177
pipeline/exchange/local_exchange_source_operator.cpp
178178
pipeline/exchange/multi_cast_local_exchange.cpp
179+
pipeline/exchange/mem_limited_chunk_queue.cpp
180+
pipeline/exchange/spillable_multi_cast_local_exchange.cpp
179181
pipeline/exchange/multi_cast_local_exchange_sink_operator.cpp
180182
pipeline/exchange/multi_cast_local_exchange_source_operator.cpp
181183
pipeline/exchange/sink_buffer.cpp

be/src/exec/data_sink.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,13 @@ Status DataSink::decompose_data_sink_to_pipeline(pipeline::PipelineBuilderContex
315315
auto* upstream_source = context->source_operator(prev_operators);
316316
size_t upstream_plan_node_id = upstream->plan_node_id();
317317
// === create exchange ===
318-
auto mcast_local_exchanger = std::make_shared<InMemoryMultiCastLocalExchanger>(runtime_state, sinks.size());
318+
std::shared_ptr<MultiCastLocalExchanger> mcast_local_exchanger;
319+
if (runtime_state->enable_spill() && runtime_state->enable_multi_cast_local_exchange_spill()) {
320+
mcast_local_exchanger = std::make_shared<SpillableMultiCastLocalExchanger>(runtime_state, sinks.size(),
321+
upstream_plan_node_id);
322+
} else {
323+
mcast_local_exchanger = std::make_shared<InMemoryMultiCastLocalExchanger>(runtime_state, sinks.size());
324+
}
319325

320326
// === create sink op ====
321327
OpFactoryPtr sink_op = std::make_shared<MultiCastLocalExchangeSinkOperatorFactory>(

0 commit comments

Comments
 (0)