Skip to content

Commit e7593a9

Browse files
CoalesceBatchesExec and RepartitionExec
1 parent 84ae3e8 commit e7593a9

File tree

2 files changed

+71
-107
lines changed

2 files changed

+71
-107
lines changed

src/handlers/http/query.rs

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ pub struct Query {
7070

7171
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
7272
let _ = run_benchmark().await;
73-
println!("benchmarking complete");
7473
let session_state = QUERY_SESSION.state();
7574
let raw_logical_plan = match session_state
7675
.create_logical_plan(&query_request.query)

src/query/mod.rs

+71-106
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,16 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr
3333
// use datafusion::config::ConfigFileType;
3434
use datafusion::error::{DataFusionError, Result};
3535
use datafusion::execution::disk_manager::DiskManagerConfig;
36-
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
36+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3737
// use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3838
use datafusion::execution::SessionStateBuilder;
3939
use datafusion::logical_expr::expr::Alias;
4040
use datafusion::logical_expr::{
4141
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
4242
};
43-
use datafusion::physical_expr::create_physical_expr;
4443
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
4544
use datafusion::physical_plan::repartition::RepartitionExec;
46-
use datafusion::physical_plan::{collect as PhysicalPlanCollect, ExecutionPlan, Partitioning};
47-
use datafusion::physical_plan::filter::FilterExec;
45+
use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
4846
// use datafusion::physical_plan::execution_plan::EmissionType;
4947
// use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
5048
use datafusion::prelude::*;
@@ -56,14 +54,12 @@ use serde::{Deserialize, Serialize};
5654
use serde_json::{json, Value};
5755
use std::ops::Bound;
5856
// use std::path::{Path, PathBuf};
59-
use std::process::Command;
6057
use std::sync::Arc;
6158
use std::time::Instant;
6259
use stream_schema_provider::collect_manifest_files;
6360
use sysinfo::System;
6461

6562
use std::fs;
66-
use std::io;
6763

6864
use self::error::ExecuteError;
6965
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -758,31 +754,28 @@ pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
758754
// Ok(())
759755
// }
760756

761-
#[derive(Debug, Serialize)]
762-
struct BenchmarkResult {
763-
query_num: usize,
764-
iteration: usize,
765-
elapsed_seconds: f64,
766-
}
767-
768-
769757
pub async fn run_benchmark() {
770758
const TRIES: usize = 1;
771-
772-
let mut results = Vec::new();
773759
let mut query_num = 1;
774-
775-
// 1. Configure Runtime Environment with parallelism
776760
let runtime_config = RuntimeEnvBuilder::new() // Number of partitions for parallel processing
777761
.with_disk_manager(DiskManagerConfig::NewOs);
778762

779-
let runtime = RuntimeEnv::new(runtime_config).unwrap();
763+
764+
let runtime = runtime_config.build().unwrap();
780765

781766

782767
// Create session context
783768
let mut config = SessionConfig::new().with_coalesce_batches(true)
784-
.with_target_partitions(8)
785-
.with_batch_size(50000);
769+
.with_collect_statistics(true)
770+
.with_parquet_bloom_filter_pruning(true)
771+
.with_parquet_page_index_pruning(true)
772+
.with_parquet_pruning(true)
773+
.with_prefer_existing_sort(true)
774+
.with_repartition_file_scans(true)
775+
.with_round_robin_repartition(true)
776+
.with_repartition_sorts(true)
777+
.with_batch_size(50000)
778+
.with_target_partitions(8);
786779
config.options_mut().execution.parquet.binary_as_string = true;
787780
config.options_mut().execution.use_row_number_estimates_to_optimize_partitioning = true;
788781
config.options_mut().execution.parquet.pushdown_filters = true;
@@ -800,112 +793,84 @@ pub async fn run_benchmark() {
800793
// Read queries from file
801794
let queries = fs::read_to_string("/home/ubuntu/queries.sql").unwrap();
802795

803-
796+
let mut total_elapsed = 0.0;
804797
for query in queries.lines() {
805798
fs::write("/tmp/query.sql", &query).unwrap();
806799

807-
for iteration in 1..=TRIES {
808-
clear_caches().unwrap();
809-
800+
for iteration in 1..=TRIES {
810801

811802
// Create the query plan
812803
let df = ctx.sql(&query).await.unwrap();
813-
let logical_plan = df.logical_plan().clone();
804+
//let logical_plan = df.logical_plan().clone();
814805
let physical_plan = df.create_physical_plan().await.unwrap();
815806

816807
// Add coalesce
817-
let mut exec_plan: Arc<dyn ExecutionPlan> = Arc::new(CoalesceBatchesExec::new(physical_plan, 50000));
818-
819-
// Check if plan contains filter and add FilterExec
820-
fn has_filter(plan: &LogicalPlan) -> bool {
821-
match plan {
822-
LogicalPlan::Filter(_) => true,
823-
LogicalPlan::Projection(proj) => has_filter(proj.input.as_ref()),
824-
LogicalPlan::Aggregate(agg) => has_filter(agg.input.as_ref()),
825-
LogicalPlan::Join(join) => {
826-
has_filter(join.left.as_ref()) || has_filter(join.right.as_ref())
827-
},
828-
LogicalPlan::Window(window) => has_filter(window.input.as_ref()),
829-
LogicalPlan::Sort(sort) => has_filter(sort.input.as_ref()),
830-
LogicalPlan::Limit(limit) => has_filter(limit.input.as_ref()),
831-
_ => false,
832-
}
833-
}
834-
835-
// Extract filter expressions from logical plan
836-
fn extract_filters(plan: &LogicalPlan) -> Vec<Expr> {
837-
match plan {
838-
LogicalPlan::Filter(filter) => vec![filter.predicate.clone()],
839-
LogicalPlan::Projection(proj) => extract_filters(proj.input.as_ref()),
840-
LogicalPlan::Aggregate(agg) => extract_filters(agg.input.as_ref()),
841-
LogicalPlan::Join(join) => {
842-
let mut filters = extract_filters(join.left.as_ref());
843-
filters.extend(extract_filters(join.right.as_ref()));
844-
filters
845-
},
846-
_ => vec![],
847-
}
848-
}
849-
850-
if has_filter(&logical_plan) {
851-
let filters = extract_filters(&logical_plan);
852-
for filter_expr in filters {
853-
854-
855-
if let Ok(physical_filter_expr) = create_physical_expr(
856-
&filter_expr,
857-
&logical_plan.schema(),
858-
&ctx.state().execution_props(),
859-
860-
) {
861-
exec_plan = Arc::new(FilterExec::try_new(
862-
physical_filter_expr,
863-
exec_plan,
864-
).unwrap());
865-
}
866-
867-
868-
}
869-
}
870-
871-
// Execute the plan
808+
let exec_plan: Arc<dyn ExecutionPlan> = Arc::new(CoalesceBatchesExec::new(physical_plan, 8192));
872809
let task_ctx = ctx.task_ctx();
873-
let start = Instant::now();
874-
875-
//let _ = execute_parallel(exec_plan.clone(), ctx.task_ctx()).await.unwrap();
876-
// Add repartitioning for better parallelism
877-
let repartitioned = Arc::new(RepartitionExec::try_new(
878-
exec_plan,
879-
Partitioning::RoundRobinBatch(8),
880-
).unwrap());
881-
let _ = PhysicalPlanCollect(repartitioned, task_ctx).await.unwrap();
810+
let repartitioned = Arc::new(RepartitionExec::try_new(
811+
exec_plan,
812+
Partitioning::RoundRobinBatch(8),
813+
).unwrap());
814+
let start = Instant::now();
815+
let _query_response = collect(repartitioned, task_ctx).await.unwrap();
882816

883817
let elapsed = start.elapsed().as_secs_f64();
884-
let benchmark_result = BenchmarkResult {
885-
query_num,
886-
iteration,
887-
elapsed_seconds: elapsed,
888-
};
818+
total_elapsed += elapsed;
889819
println!("Query {query_num} iteration {iteration} took {elapsed} seconds");
890-
results.push(benchmark_result);
820+
891821
}
892822
query_num += 1;
893823
}
824+
println!("Total time: {total_elapsed} seconds");
894825

895826
}
896827

897-
fn clear_caches() -> io::Result<()> {
898-
// Sync filesystems
899-
Command::new("sync").status()?;
828+
// // Check if plan contains filter and add FilterExec
829+
// fn has_filter(plan: &LogicalPlan) -> bool {
830+
// println!("Plan: {plan}");
831+
// match plan {
832+
// LogicalPlan::Filter(_) => true,
833+
// LogicalPlan::Projection(proj) => has_filter(proj.input.as_ref()),
834+
// LogicalPlan::Aggregate(agg) => has_filter(agg.input.as_ref()),
835+
// LogicalPlan::Join(join) => {
836+
// has_filter(join.left.as_ref()) || has_filter(join.right.as_ref())
837+
// },
838+
// LogicalPlan::Window(window) => has_filter(window.input.as_ref()),
839+
// LogicalPlan::Sort(sort) => has_filter(sort.input.as_ref()),
840+
// LogicalPlan::Limit(limit) => has_filter(limit.input.as_ref()),
841+
// _ => false,
842+
// }
843+
// }
844+
845+
// Extract filter expressions from logical plan
846+
// fn extract_filters(plan: &LogicalPlan) -> Vec<Expr> {
847+
// match plan {
848+
// LogicalPlan::Filter(filter) => vec![filter.predicate.clone()],
849+
// LogicalPlan::Projection(proj) => extract_filters(proj.input.as_ref()),
850+
// LogicalPlan::Aggregate(agg) => extract_filters(agg.input.as_ref()),
851+
// LogicalPlan::Join(join) => {
852+
// let mut filters = extract_filters(join.left.as_ref());
853+
// filters.extend(extract_filters(join.right.as_ref()));
854+
// filters
855+
// },
856+
// LogicalPlan::Limit(limit) => extract_filters(limit.input.as_ref()),
857+
// LogicalPlan::Sort(sort) => extract_filters(sort.input.as_ref()),
858+
// _ => vec![],
859+
// }
860+
// }
861+
862+
// fn clear_caches() -> io::Result<()> {
863+
// // Sync filesystems
864+
// Command::new("sync").status()?;
900865

901-
// Clear caches using sudo
902-
Command::new("sudo")
903-
.args(&["tee", "/proc/sys/vm/drop_caches"])
904-
.arg("3")
905-
.output()?;
866+
// // Clear caches using sudo
867+
// Command::new("sudo")
868+
// .args(&["tee", "/proc/sys/vm/drop_caches"])
869+
// .arg("3")
870+
// .output()?;
906871

907-
Ok(())
908-
}
872+
// Ok(())
873+
// }
909874
pub mod error {
910875
use crate::{metadata::error::stream_info::MetadataError, storage::ObjectStorageError};
911876
use datafusion::error::DataFusionError;

0 commit comments

Comments
 (0)