Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
837bda8
resolve conflict
dqhl76 Oct 16, 2025
1543002
refactor: add settings
dqhl76 Sep 27, 2025
bac89c2
refactor: before repartition
dqhl76 Oct 8, 2025
3076c11
save
dqhl76 Oct 9, 2025
9edaccf
save
dqhl76 Oct 9, 2025
e0f0b4d
add back log
dqhl76 Oct 9, 2025
3e823f1
remove useless
dqhl76 Oct 9, 2025
7464c40
extract some from final
dqhl76 Oct 12, 2025
a86170a
save
dqhl76 Oct 12, 2025
c5087b5
save
dqhl76 Oct 12, 2025
3d554be
fix: partition id
dqhl76 Oct 12, 2025
791cc50
fix: compile
dqhl76 Oct 13, 2025
b14c2f6
save
dqhl76 Oct 13, 2025
f8f000f
save
dqhl76 Oct 16, 2025
e1e751f
refactor
dqhl76 Oct 16, 2025
4d6c28a
refactor
dqhl76 Oct 16, 2025
8ff9892
save
dqhl76 Oct 19, 2025
c9ebff3
this commit add Debug trait for AggregateMeta, should revert before m…
dqhl76 Oct 19, 2025
a61410e
fix: reset aggregate status
dqhl76 Oct 19, 2025
384f964
fix: fix dispatcher cannot finish caused hang
dqhl76 Oct 19, 2025
1db4169
clean, start spill and restore refactor
dqhl76 Oct 19, 2025
ff20894
check memory pressure
dqhl76 Oct 19, 2025
9195046
feat: add support for recursive schedule
dqhl76 Oct 19, 2025
61ff115
feat: add support for recursive spill
dqhl76 Oct 19, 2025
66a6485
make lint
dqhl76 Oct 19, 2025
47bad4c
partition stream
dqhl76 Oct 20, 2025
979619b
Revert "this commit add Debug trait for AggregateMeta, should revert …
dqhl76 Oct 20, 2025
bff4fee
chore: clean debug
dqhl76 Oct 20, 2025
4332e9b
refactor: merge dispatcher with bucket scheduler
dqhl76 Oct 20, 2025
5691e11
add back aggregate
dqhl76 Oct 20, 2025
b22986b
add back spill restore
dqhl76 Oct 21, 2025
6ef2a8a
recursive spill
dqhl76 Oct 21, 2025
5f19c8d
make build pass
dqhl76 Oct 22, 2025
431dbcf
partition stream need finalize
dqhl76 Oct 22, 2025
dfe38e2
refine event, make it more clearly to understand
dqhl76 Oct 22, 2025
efa24a3
fix: reset flag during new round begin
dqhl76 Oct 22, 2025
6f57f21
debug
dqhl76 Oct 22, 2025
107a191
debug
dqhl76 Oct 22, 2025
29f16a5
make lint
dqhl76 Oct 22, 2025
9ed374e
fix
dqhl76 Oct 22, 2025
d4e4b3f
chore: add random_spill_percentage
dqhl76 Oct 22, 2025
d6993d8
fix: spill flag not reset
dqhl76 Oct 22, 2025
1fdd169
used for debug
dqhl76 Oct 22, 2025
5225a17
fix: skip if output datablock is empty (caused downstream processor m…
dqhl76 Oct 23, 2025
29ed0d0
fix: add blocking layer
dqhl76 Oct 23, 2025
0b26db3
disable random_spill_percentage
dqhl76 Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ opendal = { version = "0.53.2", features = [
"layers-fastrace",
"layers-prometheus-client",
"layers-async-backtrace",
"layers-blocking",
"services-s3",
"services-fs",
"services-gcs",
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ databend-storages-common-cache = { workspace = true }
jsonb = { workspace = true }
log = { workspace = true }
match-template = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
typetag = { workspace = true }
Expand Down
30 changes: 25 additions & 5 deletions src/query/pipeline/transforms/src/processors/memory_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use bytesize::ByteSize;
use databend_common_base::runtime::MemStat;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
use rand::Rng;

#[derive(Clone)]
#[non_exhaustive]
Expand All @@ -35,6 +36,8 @@ pub struct MemorySettings {
pub enable_query_level_spill: bool,
pub max_query_memory_usage: usize,
pub query_memory_tracking: Option<Arc<MemStat>>,

pub random_spill_percentage: u64,
}

impl Debug for MemorySettings {
Expand Down Expand Up @@ -85,6 +88,7 @@ pub struct MemorySettingsBuilder {
query_memory_tracking: Option<Arc<MemStat>>,

spill_unit_size: Option<usize>,
random_spill_percentage: u64,
}

impl MemorySettingsBuilder {
Expand Down Expand Up @@ -115,6 +119,11 @@ impl MemorySettingsBuilder {
self
}

pub fn with_random_spill_percentage(mut self, percentage: u64) -> Self {
self.random_spill_percentage = percentage;
self
}

pub fn build(self) -> MemorySettings {
MemorySettings {
enable_group_spill: self.enable_group_spill,
Expand All @@ -125,6 +134,7 @@ impl MemorySettingsBuilder {
max_query_memory_usage: self.max_query_memory_usage.unwrap_or(usize::MAX),
query_memory_tracking: self.query_memory_tracking,
spill_unit_size: self.spill_unit_size.unwrap_or(0),
random_spill_percentage: self.random_spill_percentage,
}
}
}
Expand All @@ -142,6 +152,7 @@ impl MemorySettings {
query_memory_tracking: None,

spill_unit_size: None,
random_spill_percentage: 0,
}
}

Expand All @@ -163,12 +174,20 @@ impl MemorySettings {
}
}

let Some(query_memory_tracking) = self.query_memory_tracking.as_ref() else {
return false;
};
if let Some(query_memory_tracking) = self.query_memory_tracking.as_ref() {
if self.enable_query_level_spill
&& query_memory_tracking.get_memory_usage() >= self.max_query_memory_usage
{
return true;
}
}

if self.random_spill_percentage > 0 {
let random_value = rand::thread_rng().gen_range(0..100);
return random_value < self.random_spill_percentage;
}

self.enable_query_level_spill
&& query_memory_tracking.get_memory_usage() >= self.max_query_memory_usage
false
}

fn check_global(&self) -> Option<isize> {
Expand Down Expand Up @@ -248,6 +267,7 @@ mod tests {
query_memory_tracking: None,
enable_query_level_spill: false,
spill_unit_size: 4096,
random_spill_percentage: 0,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ impl IPhysicalPlan for AggregateFinal {
.get_enable_experimental_aggregate_hashtable()?;
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;
let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?;
let experiment_aggregate_final =
builder.settings.get_enable_experiment_aggregate_final()?;

let mut is_cluster_aggregate = false;
if ExchangeSource::check_physical_plan(&self.input) {
Expand Down Expand Up @@ -199,6 +201,8 @@ impl IPhysicalPlan for AggregateFinal {
params.clone(),
max_restore_worker,
after_group_parallel,
experiment_aggregate_final,
builder.ctx.clone(),
)
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/query/service/src/pipelines/memory_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ impl MemorySettingsExt for MemorySettings {
);
}

let random_spill_percentage = settings.get_random_spill_percentage()?;
builder = builder.with_random_spill_percentage(random_spill_percentage);

Ok(builder.build())
}

Expand Down Expand Up @@ -92,6 +95,9 @@ impl MemorySettingsExt for MemorySettings {
);
}

let random_spill_percentage = settings.get_random_spill_percentage()?;
builder = builder.with_random_spill_percentage(random_spill_percentage);

Ok(builder.build())
}

Expand Down Expand Up @@ -123,6 +129,9 @@ impl MemorySettingsExt for MemorySettings {
);
}

let random_spill_percentage = settings.get_random_spill_percentage()?;
builder = builder.with_random_spill_percentage(random_spill_percentage);

Ok(builder.build())
}

Expand Down Expand Up @@ -153,6 +162,9 @@ impl MemorySettingsExt for MemorySettings {
);
}

let random_spill_percentage = settings.get_random_spill_percentage()?;
builder = builder.with_random_spill_percentage(random_spill_percentage);

Ok(builder.build())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl SerializedPayload {
}
}

#[derive(Debug)]
pub struct BucketSpilledPayload {
pub bucket: isize,
pub location: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::Pipe;
use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_core::TransformPipeBuilder;
use databend_common_storage::DataOperator;
use parking_lot::Mutex;
use tokio::sync::Barrier;
use tokio::sync::Semaphore;

use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSharedState;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSpiller;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::NewFinalAggregateTransform;
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::TransformPartitionBucketScatter;
use crate::pipelines::processors::transforms::aggregator::transform_partition_bucket::TransformPartitionBucket;
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillReader;
use crate::pipelines::processors::transforms::aggregator::TransformFinalAggregate;
use crate::sessions::QueryContext;

pub fn build_partition_bucket(
pipeline: &mut Pipeline,
params: Arc<AggregatorParams>,
max_restore_worker: u64,
after_worker: usize,
experiment_aggregate_final: bool,
ctx: Arc<QueryContext>,
) -> Result<()> {
let operator = DataOperator::instance().spill_operator();

if experiment_aggregate_final {
// PartitionedPayload only accept power of two partitions
let mut output_num = after_worker.next_power_of_two();
const MAX_PARTITION_COUNT: usize = 128;
if output_num > MAX_PARTITION_COUNT {
output_num = MAX_PARTITION_COUNT;
}

let input_num = pipeline.output_len();
let scatter =
TransformPartitionBucketScatter::create(input_num, output_num, params.clone())?;
let scatter_inputs = scatter.get_inputs();
let scatter_outputs = scatter.get_outputs();

pipeline.add_pipe(Pipe::create(
scatter_inputs.len(),
scatter_outputs.len(),
vec![PipeItem::create(
ProcessorPtr::create(Box::new(scatter)),
scatter_inputs,
scatter_outputs,
)],
));

let mut builder = TransformPipeBuilder::create();
let barrier = Arc::new(Barrier::new(output_num));
let shared_state = Arc::new(Mutex::new(FinalAggregateSharedState::new(output_num)));

for id in 0..output_num {
let spiller = FinalAggregateSpiller::try_create(ctx.clone(), operator.clone())?;
let input_port = InputPort::create();
let output_port = OutputPort::create();
let processor = NewFinalAggregateTransform::try_create(
input_port.clone(),
output_port.clone(),
id,
params.clone(),
output_num,
barrier.clone(),
shared_state.clone(),
spiller,
ctx.clone(),
)?;
builder.add_transform(input_port, output_port, ProcessorPtr::create(processor));
}

pipeline.add_pipe(builder.finalize());
pipeline.try_resize(after_worker)?;
} else {
let input_nums = pipeline.output_len();
let transform = TransformPartitionBucket::create(input_nums, params.clone())?;

let output = transform.get_output();
let inputs_port = transform.get_inputs();

pipeline.add_pipe(Pipe::create(inputs_port.len(), 1, vec![PipeItem::create(
ProcessorPtr::create(Box::new(transform)),
inputs_port,
vec![output],
)]));

pipeline.try_resize(std::cmp::min(input_nums, max_restore_worker as usize))?;
let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests));
pipeline.add_transform(|input, output| {
let operator = operator.clone();
TransformAggregateSpillReader::create(input, output, operator, semaphore.clone())
})?;
pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(TransformFinalAggregate::try_create(
input,
output,
params.clone(),
)?))
})?;
pipeline.try_resize(after_worker)?;
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
mod aggregate_exchange_injector;
mod aggregate_meta;
mod aggregator_params;
mod new_transform_partition_bucket;
mod build_partition_bucket;
mod new_final_aggregate;
mod serde;
mod transform_aggregate_expand;
mod transform_aggregate_final;
mod transform_aggregate_partial;
mod transform_partition_bucket;
mod transform_single_key;
mod udaf_script;

pub use aggregate_exchange_injector::AggregateInjector;
pub use aggregate_meta::*;
pub use aggregator_params::AggregatorParams;
pub use new_transform_partition_bucket::build_partition_bucket;
pub use build_partition_bucket::build_partition_bucket;
pub use transform_aggregate_expand::TransformExpandGroupingSets;
pub use transform_aggregate_final::TransformFinalAggregate;
pub use transform_aggregate_partial::TransformPartialAggregate;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_expression::DataBlock;

use crate::pipelines::processors::transforms::aggregator::AggregateMeta;

/// Split partitioned metadata evenly into DataBlock chunks.
pub fn split_partitioned_meta_into_datablocks(
bucket: isize,
data: Vec<AggregateMeta>,
outputs_len: usize,
) -> Vec<DataBlock> {
if outputs_len == 0 {
return vec![];
}

let total_len = data.len();
let base_chunk_size = total_len / outputs_len;
let remainder = total_len % outputs_len;

let mut result = Vec::with_capacity(outputs_len);
let mut data_iter = data.into_iter();

for index in 0..outputs_len {
let chunk_size = if index < remainder {
base_chunk_size + 1
} else {
base_chunk_size
};

let chunk: Vec<AggregateMeta> = data_iter.by_ref().take(chunk_size).collect();
result.push(DataBlock::empty_with_meta(
AggregateMeta::create_partitioned(bucket, chunk),
));
}

result
}
Loading
Loading