Skip to content

Commit

Permalink
fix(query): sort spilling may hang (#16672)
Browse files Browse the repository at this point in the history
* fix

Signed-off-by: coldWater <[email protected]>

* refactor sort test

Signed-off-by: coldWater <[email protected]>

* test

Signed-off-by: coldWater <[email protected]>

* fix lint

Signed-off-by: coldWater <[email protected]>

---------

Signed-off-by: coldWater <[email protected]>
Co-authored-by: TCeason <[email protected]>
  • Loading branch information
forsaken628 and TCeason authored Oct 25, 2024
1 parent 2059d71 commit 4a80ca2
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 174 deletions.
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
#![feature(iter_map_windows)]

pub mod processors;
pub use processors::*;
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

pub use databend_common_pipeline_core::processors::*;
pub(crate) mod transforms;
pub mod transforms;

pub use transforms::HashJoinBuildState;
pub use transforms::HashJoinDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl<T: HashMethodBounds, V: Send + Sync + 'static> HashTableCell<T, V> {
self.hashtable.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn allocated_bytes(&self) -> usize {
self.hashtable.bytes_len(false)
+ self.arena.allocated_bytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
&self.location_prefix,
payload,
)?,
false => agg_spilling_aggregate_payload::<Method>(
false => agg_spilling_aggregate_payload(
self.ctx.clone(),
self.operator.clone(),
&self.location_prefix,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}
}

fn agg_spilling_aggregate_payload<Method: HashMethodBounds>(
fn agg_spilling_aggregate_payload(
ctx: Arc<QueryContext>,
operator: Operator,
location_prefix: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
&self.location_prefix,
payload,
)?,
false => agg_spilling_group_by_payload::<Method>(
false => agg_spilling_group_by_payload(
self.ctx.clone(),
self.operator.clone(),
&self.location_prefix,
Expand Down Expand Up @@ -292,7 +292,7 @@ fn get_columns(data_block: DataBlock) -> Vec<BlockEntry> {
data_block.columns().to_vec()
}

fn agg_spilling_group_by_payload<Method: HashMethodBounds>(
fn agg_spilling_group_by_payload(
ctx: Arc<QueryContext>,
operator: Operator,
location_prefix: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct TransformAsyncFunction {
}

impl TransformAsyncFunction {
pub fn new(
pub(crate) fn new(
ctx: Arc<QueryContext>,
async_func_descs: Vec<AsyncFunctionDesc>,
operators: BTreeMap<usize, Arc<DictionaryOperator>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ where
}

if let Some(block) = self.output_data.take() {
debug_assert!(matches!(self.state, State::MergeFinal | State::Finish));
assert!(matches!(self.state, State::MergeFinal | State::Finish));
self.output_block(block);
return Ok(Event::NeedConsume);
}

if matches!(self.state, State::Finish) {
debug_assert!(self.input.is_finished());
assert!(self.input.is_finished());
self.output.finish();
return Ok(Event::Finished);
}
Expand Down Expand Up @@ -179,6 +179,7 @@ where
if meta.is_none() {
// It means we get the last block.
// We can launch external merge sort now.
self.input.finish();
self.state = State::Merging;
}
self.input_data = Some(block);
Expand Down
186 changes: 20 additions & 166 deletions src/query/service/tests/it/pipelines/transforms/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use databend_common_base::base::tokio;
use databend_common_base::base::tokio::sync::mpsc::channel;
use databend_common_base::base::tokio::sync::mpsc::Receiver;
use databend_common_exception::Result;
use databend_common_expression::block_debug::pretty_format_blocks;
use databend_common_expression::types::Int32Type;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
Expand All @@ -34,130 +33,16 @@ use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_sinks::SyncSenderSink;
use databend_common_pipeline_sources::BlocksSource;
use databend_common_pipeline_transforms::processors::add_k_way_merge_sort;
use databend_query::pipelines::executor::ExecutorSettings;
use databend_query::pipelines::executor::QueryPipelineExecutor;
use databend_query::sessions::QueryContext;
use databend_query::test_kits::TestFixture;
use itertools::Itertools;
use parking_lot::Mutex;
use rand::rngs::ThreadRng;
use rand::Rng;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_k_way_merge_sort() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

let worker = 3;
let block_size = 4;
let limit = None;
let (data, expected) = basic_test_data(None);
let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;

executor.execute()?;

let mut got: Vec<DataBlock> = Vec::new();
while !rx.is_empty() {
got.push(rx.recv().await.unwrap()?);
}

check_result(got, expected);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_k_way_merge_sort_fuzz() -> Result<()> {
let mut rng = rand::thread_rng();
let fixture = TestFixture::setup().await?;

for _ in 0..10 {
let ctx = fixture.new_query_ctx().await?;
run_fuzz(ctx, &mut rng, false).await?;
}

for _ in 0..10 {
let ctx = fixture.new_query_ctx().await?;
run_fuzz(ctx, &mut rng, true).await?;
}
Ok(())
}

async fn run_fuzz(ctx: Arc<QueryContext>, rng: &mut ThreadRng, with_limit: bool) -> Result<()> {
let worker = rng.gen_range(1..=5);
let block_size = rng.gen_range(1..=20);
let (data, expected, limit) = random_test_data(rng, with_limit);

// println!("\nwith_limit {with_limit}");
// for (input, blocks) in data.iter().enumerate() {
// println!("intput {input}");
// for b in blocks {
// println!("{:?}", b.columns()[0].value);
// }
// }

let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?;
executor.execute()?;

let mut got: Vec<DataBlock> = Vec::new();
while !rx.is_empty() {
got.push(rx.recv().await.unwrap()?);
}

check_result(got, expected);

Ok(())
}

fn create_pipeline(
ctx: Arc<QueryContext>,
data: Vec<Vec<DataBlock>>,
worker: usize,
block_size: usize,
limit: Option<usize>,
) -> Result<(Arc<QueryPipelineExecutor>, Receiver<Result<DataBlock>>)> {
let mut pipeline = Pipeline::create();

let data_type = data[0][0].get_by_offset(0).data_type.clone();
let source_pipe = create_source_pipe(ctx, data)?;
pipeline.add_pipe(source_pipe);

let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]);
let sort_desc = Arc::new(vec![SortColumnDescription {
offset: 0,
asc: true,
nulls_first: true,
is_nullable: false,
}]);
add_k_way_merge_sort(
&mut pipeline,
schema,
worker,
block_size,
limit,
sort_desc,
false,
true,
)?;

let (mut rx, sink_pipe) = create_sink_pipe(1)?;
let rx = rx.pop().unwrap();
pipeline.add_pipe(sink_pipe);
pipeline.set_max_threads(3);

let settings = ExecutorSettings {
query_id: Arc::new("".to_string()),
max_execute_time_in_seconds: Default::default(),
enable_queries_executor: false,
max_threads: 8,
executor_node_id: "".to_string(),
};
let executor = QueryPipelineExecutor::create(pipeline, settings)?;
Ok((executor, rx))
}

fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Result<Pipe> {
use parking_lot::Mutex;

let size = data.len();
let mut items = Vec::with_capacity(size);

Expand All @@ -179,7 +64,7 @@ fn create_source_pipe(ctx: Arc<QueryContext>, data: Vec<Vec<DataBlock>>) -> Resu
fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pipe)> {
let mut rxs = Vec::with_capacity(size);
let mut items = Vec::with_capacity(size);
for _index in 0..size {
for _ in 0..size {
let input = InputPort::create();
let (tx, rx) = channel(1000);
rxs.push(rx);
Expand All @@ -193,21 +78,11 @@ fn create_sink_pipe(size: usize) -> Result<(Vec<Receiver<Result<DataBlock>>>, Pi
Ok((rxs, Pipe::create(size, 0, items)))
}

/// Returns (input, expected)
pub fn basic_test_data(limit: Option<usize>) -> (Vec<Vec<DataBlock>>, DataBlock) {
let data = vec![
vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]],
vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]],
vec![vec![0, 2, 4, 5]],
];

prepare_input_and_result(data, limit)
}

fn prepare_input_and_result(
fn prepare_multi_input_and_result(
data: Vec<Vec<Vec<i32>>>,
limit: Option<usize>,
) -> (Vec<Vec<DataBlock>>, DataBlock) {
use itertools::Itertools;
let input = data
.clone()
.into_iter()
Expand All @@ -229,7 +104,17 @@ fn prepare_input_and_result(
(input, result)
}

fn prepare_single_input_and_result(
data: Vec<Vec<i32>>,
limit: Option<usize>,
) -> (Vec<DataBlock>, DataBlock) {
let (mut input, expected) = prepare_multi_input_and_result(vec![data], limit);
(input.remove(0), expected)
}

fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
use databend_common_expression::block_debug::pretty_format_blocks;

if expected.is_empty() {
if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() {
panic!(
Expand All @@ -240,46 +125,15 @@ fn check_result(result: Vec<DataBlock>, expected: DataBlock) {
return;
}

let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
let result = pretty_format_blocks(&result).unwrap();
let expected_rows = expected.num_rows();
let expected = pretty_format_blocks(&[expected]).unwrap();
let result_rows: usize = result.iter().map(|v| v.num_rows()).sum();
let result = pretty_format_blocks(&result).unwrap();
assert_eq!(
expected, result,
"\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}",
expected_rows, expected, result_rows, result
"\nexpected (num_rows = {expected_rows}):\n{expected}\nactual (num_rows = {result_rows}):\n{result}",
);
}

fn random_test_data(
rng: &mut ThreadRng,
with_limit: bool,
) -> (Vec<Vec<DataBlock>>, DataBlock, Option<usize>) {
let random_batch_size = rng.gen_range(1..=10);
let random_num_streams = rng.gen_range(5..=10);

let random_data = (0..random_num_streams)
.map(|_| {
let random_num_blocks = rng.gen_range(1..=10);
let mut data = (0..random_batch_size * random_num_blocks)
.map(|_| rng.gen_range(0..=1000))
.collect::<Vec<_>>();
data.sort();
data.chunks(random_batch_size)
.map(|v| v.to_vec())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let num_rows = random_data
.iter()
.map(|v| v.iter().map(|v| v.len()).sum::<usize>())
.sum::<usize>();
let limit = if with_limit {
Some(rng.gen_range(0..=num_rows))
} else {
None
};
let (input, expected) = prepare_input_and_result(random_data, limit);
(input, expected, limit)
}
mod k_way;
mod spill;
Loading

0 comments on commit 4a80ca2

Please sign in to comment.