From 4a80ca223fde8e77fc60d745b00f0932f72b2358 Mon Sep 17 00:00:00 2001 From: coldWater Date: Fri, 25 Oct 2024 15:43:10 +0800 Subject: [PATCH] fix(query): sort spilling may hang (#16672) * fix Signed-off-by: coldWater * refactor sort test Signed-off-by: coldWater * test Signed-off-by: coldWater * fix lint Signed-off-by: coldWater --------- Signed-off-by: coldWater Co-authored-by: TCeason <33082201+TCeason@users.noreply.github.com> --- src/query/pipeline/transforms/src/lib.rs | 1 + .../service/src/pipelines/processors/mod.rs | 2 +- .../transforms/aggregator/aggregate_cell.rs | 4 + ...transform_exchange_aggregate_serializer.rs | 4 +- .../transform_exchange_group_by_serializer.rs | 4 +- .../transforms/transform_async_function.rs | 2 +- .../transforms/transform_sort_spill.rs | 5 +- .../tests/it/pipelines/transforms/sort.rs | 186 ++-------------- .../it/pipelines/transforms/sort/k_way.rs | 174 +++++++++++++++ .../it/pipelines/transforms/sort/spill.rs | 207 ++++++++++++++++++ 10 files changed, 415 insertions(+), 174 deletions(-) create mode 100644 src/query/service/tests/it/pipelines/transforms/sort/k_way.rs create mode 100644 src/query/service/tests/it/pipelines/transforms/sort/spill.rs diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index 23c89473f7db..a8ffc7007b32 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -20,3 +20,4 @@ #![feature(iter_map_windows)] pub mod processors; +pub use processors::*; diff --git a/src/query/service/src/pipelines/processors/mod.rs b/src/query/service/src/pipelines/processors/mod.rs index 1d924d948156..19c56b02d31a 100644 --- a/src/query/service/src/pipelines/processors/mod.rs +++ b/src/query/service/src/pipelines/processors/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. pub use databend_common_pipeline_core::processors::*; -pub(crate) mod transforms; +pub mod transforms; pub use transforms::HashJoinBuildState; pub use transforms::HashJoinDesc; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs index 2b0579208fc0..8e91237cdf30 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs @@ -67,6 +67,10 @@ impl HashTableCell { self.hashtable.len() } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn allocated_bytes(&self) -> usize { self.hashtable.bytes_len(false) + self.arena.allocated_bytes() diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index aa0e3feba5cc..b2b65acbce7a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -163,7 +163,7 @@ impl BlockMetaTransform &self.location_prefix, payload, )?, - false => agg_spilling_aggregate_payload::( + false => agg_spilling_aggregate_payload( self.ctx.clone(), self.operator.clone(), &self.location_prefix, @@ -239,7 +239,7 @@ impl BlockMetaTransform } } -fn agg_spilling_aggregate_payload( +fn agg_spilling_aggregate_payload( ctx: Arc, operator: Operator, location_prefix: &str, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index a5a7777ad422..f8ed4046a520 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -214,7 +214,7 @@ impl BlockMetaTransform &self.location_prefix, payload, )?, - false => agg_spilling_group_by_payload::( + false => agg_spilling_group_by_payload( self.ctx.clone(), self.operator.clone(), &self.location_prefix, @@ -292,7 +292,7 @@ fn get_columns(data_block: DataBlock) -> Vec { data_block.columns().to_vec() } -fn agg_spilling_group_by_payload( +fn agg_spilling_group_by_payload( ctx: Arc, operator: Operator, location_prefix: &str, diff --git a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs index d5d760478350..385cd1a40ce5 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs @@ -40,7 +40,7 @@ pub struct TransformAsyncFunction { } impl TransformAsyncFunction { - pub fn new( + pub(crate) fn new( ctx: Arc, async_func_descs: Vec, operators: BTreeMap>, diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 60bfa69b9e24..ac149d5dd852 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -129,13 +129,13 @@ where } if let Some(block) = self.output_data.take() { - debug_assert!(matches!(self.state, State::MergeFinal | State::Finish)); + assert!(matches!(self.state, State::MergeFinal | State::Finish)); self.output_block(block); return Ok(Event::NeedConsume); } if matches!(self.state, State::Finish) { - debug_assert!(self.input.is_finished()); + assert!(self.input.is_finished()); self.output.finish(); return Ok(Event::Finished); } @@ -179,6 +179,7 @@ where if meta.is_none() { // It means we get the last block. // We can launch external merge sort now. + self.input.finish(); self.state = State::Merging; } self.input_data = Some(block); diff --git a/src/query/service/tests/it/pipelines/transforms/sort.rs b/src/query/service/tests/it/pipelines/transforms/sort.rs index 3cc91634ba0a..ba4e99323216 100644 --- a/src/query/service/tests/it/pipelines/transforms/sort.rs +++ b/src/query/service/tests/it/pipelines/transforms/sort.rs @@ -19,7 +19,6 @@ use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::mpsc::channel; use databend_common_base::base::tokio::sync::mpsc::Receiver; use databend_common_exception::Result; -use databend_common_expression::block_debug::pretty_format_blocks; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::DataField; @@ -34,130 +33,16 @@ use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sinks::SyncSenderSink; use databend_common_pipeline_sources::BlocksSource; -use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; use databend_query::pipelines::executor::ExecutorSettings; use databend_query::pipelines::executor::QueryPipelineExecutor; use databend_query::sessions::QueryContext; use databend_query::test_kits::TestFixture; -use itertools::Itertools; -use parking_lot::Mutex; use rand::rngs::ThreadRng; use rand::Rng; -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_k_way_merge_sort() -> Result<()> { - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - - let worker = 3; - let block_size = 4; - let limit = None; - let (data, expected) = basic_test_data(None); - let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; - - executor.execute()?; - - let mut got: Vec = Vec::new(); - while !rx.is_empty() { - got.push(rx.recv().await.unwrap()?); - } - - check_result(got, expected); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_k_way_merge_sort_fuzz() -> Result<()> { - let mut rng = rand::thread_rng(); - let fixture = TestFixture::setup().await?; - - for _ in 0..10 { - let ctx = fixture.new_query_ctx().await?; - run_fuzz(ctx, &mut rng, false).await?; - } - - for _ in 0..10 { - let ctx = fixture.new_query_ctx().await?; - run_fuzz(ctx, &mut rng, true).await?; - } - Ok(()) -} - -async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { - let worker = rng.gen_range(1..=5); - let block_size = rng.gen_range(1..=20); - let (data, expected, limit) = random_test_data(rng, with_limit); - - // println!("\nwith_limit {with_limit}"); - // for (input, blocks) in data.iter().enumerate() { - // println!("intput {input}"); - // for b in blocks { - // println!("{:?}", b.columns()[0].value); - // } - // } - - let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; - executor.execute()?; - - let mut got: Vec = Vec::new(); - while !rx.is_empty() { - got.push(rx.recv().await.unwrap()?); - } - - check_result(got, expected); - - Ok(()) -} - -fn create_pipeline( - ctx: Arc, - data: Vec>, - worker: usize, - block_size: usize, - limit: Option, -) -> Result<(Arc, Receiver>)> { - let mut pipeline = Pipeline::create(); - - let data_type = data[0][0].get_by_offset(0).data_type.clone(); - let source_pipe = create_source_pipe(ctx, data)?; - pipeline.add_pipe(source_pipe); - - let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); - let sort_desc = Arc::new(vec![SortColumnDescription { - offset: 0, - asc: true, - nulls_first: true, - is_nullable: false, - }]); - add_k_way_merge_sort( - &mut pipeline, - schema, - worker, - block_size, - limit, - sort_desc, - false, - true, - )?; - - let (mut rx, sink_pipe) = create_sink_pipe(1)?; - let rx = rx.pop().unwrap(); - pipeline.add_pipe(sink_pipe); - pipeline.set_max_threads(3); - - let settings = ExecutorSettings { - query_id: Arc::new("".to_string()), - max_execute_time_in_seconds: Default::default(), - enable_queries_executor: false, - max_threads: 8, - executor_node_id: "".to_string(), - }; - let executor = QueryPipelineExecutor::create(pipeline, settings)?; - Ok((executor, rx)) -} - fn create_source_pipe(ctx: Arc, data: Vec>) -> Result { + use parking_lot::Mutex; + let size = data.len(); let mut items = Vec::with_capacity(size); @@ -179,7 +64,7 @@ fn create_source_pipe(ctx: Arc, data: Vec>) -> Resu fn create_sink_pipe(size: usize) -> Result<(Vec>>, Pipe)> { let mut rxs = Vec::with_capacity(size); let mut items = Vec::with_capacity(size); - for _index in 0..size { + for _ in 0..size { let input = InputPort::create(); let (tx, rx) = channel(1000); rxs.push(rx); @@ -193,21 +78,11 @@ fn create_sink_pipe(size: usize) -> Result<(Vec>>, Pi Ok((rxs, Pipe::create(size, 0, items))) } -/// Returns (input, expected) -pub fn basic_test_data(limit: Option) -> (Vec>, DataBlock) { - let data = vec![ - vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]], - vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]], - vec![vec![0, 2, 4, 5]], - ]; - - prepare_input_and_result(data, limit) -} - -fn prepare_input_and_result( +fn prepare_multi_input_and_result( data: Vec>>, limit: Option, ) -> (Vec>, DataBlock) { + use itertools::Itertools; let input = data .clone() .into_iter() @@ -229,7 +104,17 @@ fn prepare_input_and_result( (input, result) } +fn prepare_single_input_and_result( + data: Vec>, + limit: Option, +) -> (Vec, DataBlock) { + let (mut input, expected) = prepare_multi_input_and_result(vec![data], limit); + (input.remove(0), expected) +} + fn check_result(result: Vec, expected: DataBlock) { + use databend_common_expression::block_debug::pretty_format_blocks; + if expected.is_empty() { if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() { panic!( @@ -240,46 +125,15 @@ fn check_result(result: Vec, expected: DataBlock) { return; } - let result_rows: usize = result.iter().map(|v| v.num_rows()).sum(); - let result = pretty_format_blocks(&result).unwrap(); let expected_rows = expected.num_rows(); let expected = pretty_format_blocks(&[expected]).unwrap(); + let result_rows: usize = result.iter().map(|v| v.num_rows()).sum(); + let result = pretty_format_blocks(&result).unwrap(); assert_eq!( expected, result, - "\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}", - expected_rows, expected, result_rows, result + "\nexpected (num_rows = {expected_rows}):\n{expected}\nactual (num_rows = {result_rows}):\n{result}", ); } -fn random_test_data( - rng: &mut ThreadRng, - with_limit: bool, -) -> (Vec>, DataBlock, Option) { - let random_batch_size = rng.gen_range(1..=10); - let random_num_streams = rng.gen_range(5..=10); - - let random_data = (0..random_num_streams) - .map(|_| { - let random_num_blocks = rng.gen_range(1..=10); - let mut data = (0..random_batch_size * random_num_blocks) - .map(|_| rng.gen_range(0..=1000)) - .collect::>(); - data.sort(); - data.chunks(random_batch_size) - .map(|v| v.to_vec()) - .collect::>() - }) - .collect::>(); - - let num_rows = random_data - .iter() - .map(|v| v.iter().map(|v| v.len()).sum::()) - .sum::(); - let limit = if with_limit { - Some(rng.gen_range(0..=num_rows)) - } else { - None - }; - let (input, expected) = prepare_input_and_result(random_data, limit); - (input, expected, limit) -} +mod k_way; +mod spill; diff --git a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs new file mode 100644 index 000000000000..e217b1833c67 --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs @@ -0,0 +1,174 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; + +use super::*; + +fn create_pipeline( + ctx: Arc, + data: Vec>, + worker: usize, + block_size: usize, + limit: Option, +) -> Result<(Arc, Receiver>)> { + let mut pipeline = Pipeline::create(); + + let data_type = data[0][0].get_by_offset(0).data_type.clone(); + let source_pipe = create_source_pipe(ctx, data)?; + pipeline.add_pipe(source_pipe); + + let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: true, + is_nullable: false, + }]); + add_k_way_merge_sort( + &mut pipeline, + schema, + worker, + block_size, + limit, + sort_desc, + false, + true, + )?; + + let (mut rx, sink_pipe) = create_sink_pipe(1)?; + let rx = rx.pop().unwrap(); + pipeline.add_pipe(sink_pipe); + pipeline.set_max_threads(3); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(pipeline, settings)?; + Ok((executor, rx)) +} + +async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { + let worker = rng.gen_range(1..=5); + let block_size = rng.gen_range(1..=20); + let (data, expected, limit) = random_test_data(rng, with_limit); + + // println!("\nwith_limit {with_limit}"); + // for (input, blocks) in data.iter().enumerate() { + // println!("intput {input}"); + // for b in blocks { + // println!("{:?}", b.columns()[0].value); + // } + // } + + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +/// Returns (input, expected) +fn basic_test_data(limit: Option) -> (Vec>, DataBlock) { + let data = vec![ + vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]], + vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]], + vec![vec![0, 2, 4, 5]], + ]; + + prepare_multi_input_and_result(data, limit) +} + +fn random_test_data( + rng: &mut ThreadRng, + with_limit: bool, +) -> (Vec>, DataBlock, Option) { + let random_batch_size = rng.gen_range(1..=10); + let random_num_streams = rng.gen_range(5..=10); + + let random_data = (0..random_num_streams) + .map(|_| { + let random_num_blocks = rng.gen_range(1..=10); + let mut data = (0..random_batch_size * random_num_blocks) + .map(|_| rng.gen_range(0..=1000)) + .collect::>(); + data.sort(); + data.chunks(random_batch_size) + .map(|v| v.to_vec()) + .collect::>() + }) + .collect::>(); + + let num_rows = random_data + .iter() + .map(|v| v.iter().map(|v| v.len()).sum::()) + .sum::(); + let limit = if with_limit { + Some(rng.gen_range(0..=num_rows)) + } else { + None + }; + let (input, expected) = prepare_multi_input_and_result(random_data, limit); + (input, expected, limit) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let worker = 3; + let block_size = 4; + let limit = None; + let (data, expected) = basic_test_data(None); + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort_fuzz() -> Result<()> { + let mut rng = rand::thread_rng(); + let fixture = TestFixture::setup().await?; + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, false).await?; + } + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, true).await?; + } + Ok(()) +} diff --git a/src/query/service/tests/it/pipelines/transforms/sort/spill.rs b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs new file mode 100644 index 000000000000..a717c11f108f --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs @@ -0,0 +1,207 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_pipeline_transforms::sort::SimpleRowConverter; +use databend_common_pipeline_transforms::sort::SimpleRowsAsc; +use databend_common_pipeline_transforms::TransformPipelineHelper; +use databend_common_pipeline_transforms::TransformSortMerge; +use databend_common_pipeline_transforms::TransformSortMergeBase; +use databend_common_storage::DataOperator; +use databend_query::pipelines::processors::transforms::create_transform_sort_spill; +use databend_query::spillers::Spiller; +use databend_query::spillers::SpillerConfig; +use databend_query::spillers::SpillerType; + +use super::*; + +fn create_sort_spill_pipeline( + ctx: Arc, + data: Vec, + block_size: usize, + limit: Option, +) -> Result<(Arc, Receiver>)> { + let mut pipeline = Pipeline::create(); + + let data_type = data[0].get_by_offset(0).data_type.clone(); + let source_pipe = create_source_pipe(ctx.clone(), vec![data])?; + pipeline.add_pipe(source_pipe); + + let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: true, + is_nullable: false, + }]); + + let order_col_generated = false; + let output_order_col = false; + let max_memory_usage = 100; + let spilling_bytes_threshold_per_core = 1; + let spilling_batch_bytes = 1000; + let enable_loser_tree = true; + + pipeline.try_add_accumulating_transformer(|| { + TransformSortMergeBase::< + TransformSortMerge>, + SimpleRowsAsc, + SimpleRowConverter, + >::try_create( + schema.clone(), + sort_desc.clone(), + order_col_generated, + output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, + spilling_batch_bytes, + TransformSortMerge::create( + schema.clone(), + sort_desc.clone(), + block_size, + enable_loser_tree, + ), + ) + })?; + + let spill_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, + location_prefix: "_sort_spill".to_string(), + disk_spill: None, + use_parquet: true, + }; + let op = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx.clone(), op, spill_config.clone())?; + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(create_transform_sort_spill( + input, + output, + schema.clone(), + sort_desc.clone(), + limit, + spiller.clone(), + true, + enable_loser_tree, + ))) + })?; + + let (mut rx, sink_pipe) = create_sink_pipe(1)?; + let rx = rx.pop().unwrap(); + pipeline.add_pipe(sink_pipe); + pipeline.set_max_threads(3); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(pipeline, settings)?; + Ok((executor, rx)) +} + +fn basic_test_data(limit: Option) -> (Vec, DataBlock) { + let data = vec![vec![1, 1, 1, 1], vec![1, 2, 3, 4], vec![4, 5, 6, 7]]; + + prepare_single_input_and_result(data, limit) +} + +fn random_test_data( + rng: &mut ThreadRng, + with_limit: bool, +) -> (Vec, DataBlock, Option) { + let num_rows = rng.gen_range(1..=100); + let mut data = (0..num_rows) + .map(|_| rng.gen_range(0..1000)) + .collect::>(); + data.sort(); + + let mut data = VecDeque::from(data); + let mut random_data = Vec::new(); + while !data.is_empty() { + let n = rng.gen_range(1..=10).min(data.len()); + random_data.push(data.drain(..n).collect::>()); + } + + let limit = if with_limit { + Some(rng.gen_range(1..=num_rows)) + } else { + None + }; + let (input, expected) = prepare_single_input_and_result(random_data, limit); + (input, expected, limit) +} + +async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { + let block_size = rng.gen_range(1..=20); + let (data, expected, limit) = random_test_data(rng, with_limit); + + // println!("\nwith_limit {with_limit}"); + // for (input, block) in data.iter().enumerate() { + // println!("intput {input}"); + // println!("{:?}", block.columns()[0].value); + // } + + let (executor, mut rx) = create_sort_spill_pipeline(ctx, data, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_sort_spill() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let block_size = 4; + let limit = None; + let (data, expected) = basic_test_data(None); + let (executor, mut rx) = create_sort_spill_pipeline(ctx, data, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_sort_spill_fuzz() -> Result<()> { + let mut rng = rand::thread_rng(); + let fixture = TestFixture::setup().await?; + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, false).await?; + } + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, true).await?; + } + Ok(()) +}