Skip to content

Commit 3e5548d

Browse files
authored
Use sort ordering on timestamp array (#443)
Write timestamp sorted metadata to parquet and provide external sort information to datafusion. This way the SortExec can be avoided in execution plan with most queries which use order by p_timestamp. For example, the query "explain select p_timestamp from {{stream_name}} order by p_timestamp asc" In physical plan it is visible that SortExec is eliminated as output_ordering is pushed to ParquetExec node "plan": "SortPreservingMergeExec: [p_timestamp@0 ASC NULLS LAST] ParquetExec: file_groups={4 groups: [.....]}, projection=[p_timestamp], output_ordering=[p_timestamp@0 ASC NULLS LAST]", Note that this is still not the most optimised version of this query as SortPreservingExec is not really needed here. The issue here is that the datafusion is not aware that the partitions / files are non overlapping when considering timestamp Also if the target partition limit is crossed then datafusion again adds SortExec to physical plan. Fixes #430
1 parent f9fb6c5 commit 3e5548d

File tree

8 files changed

+66
-106
lines changed

8 files changed

+66
-106
lines changed

server/src/event/format.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
use std::{collections::HashMap, sync::Arc};
2121

2222
use anyhow::{anyhow, Error as AnyError};
23-
use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray};
23+
use arrow_array::{RecordBatch, StringArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
25-
use chrono::Utc;
2625

2726
use crate::utils::{self, arrow::get_field};
2827

@@ -94,25 +93,14 @@ pub trait EventFormat: Sized {
9493
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
9594
let metadata_arr =
9695
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
97-
let timestamp_array = get_timestamp_array(rb.num_rows());
98-
9996
// modify the record batch to add fields to respective indexes
10097
let rb = utils::arrow::replace_columns(
10198
Arc::clone(&schema),
102-
rb,
103-
&[0, tags_index, metadata_index],
104-
&[
105-
Arc::new(timestamp_array),
106-
Arc::new(tags_arr),
107-
Arc::new(metadata_arr),
108-
],
99+
&rb,
100+
&[tags_index, metadata_index],
101+
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
109102
);
110103

111104
Ok((rb, is_first))
112105
}
113106
}
114-
115-
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
116-
let time = Utc::now();
117-
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
118-
}

server/src/event/writer/file_writer.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
*
1818
*/
1919

20-
use arrow_array::RecordBatch;
20+
use arrow_array::{RecordBatch, TimestampMillisecondArray};
2121
use arrow_ipc::writer::StreamWriter;
22+
use chrono::Utc;
2223
use derive_more::{Deref, DerefMut};
2324
use std::collections::HashMap;
2425
use std::fs::{File, OpenOptions};
2526
use std::path::PathBuf;
27+
use std::sync::Arc;
2628

2729
use crate::storage::staging::StorageDir;
30+
use crate::utils;
2831

2932
use super::errors::StreamWriterError;
3033

@@ -44,17 +47,24 @@ impl FileWriter {
4447
schema_key: &str,
4548
record: &RecordBatch,
4649
) -> Result<(), StreamWriterError> {
50+
let record = utils::arrow::replace_columns(
51+
record.schema(),
52+
record,
53+
&[0],
54+
&[Arc::new(get_timestamp_array(record.num_rows()))],
55+
);
56+
4757
match self.get_mut(schema_key) {
4858
Some(writer) => {
4959
writer
5060
.writer
51-
.write(record)
61+
.write(&record)
5262
.map_err(StreamWriterError::Writer)?;
5363
}
5464
// entry is not present thus we create it
5565
None => {
5666
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
57-
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
67+
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, &record)?;
5868
self.insert(
5969
schema_key.to_owned(),
6070
ArrowWriter {
@@ -75,6 +85,10 @@ impl FileWriter {
7585
}
7686
}
7787

88+
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
89+
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
90+
}
91+
7892
fn init_new_stream_writer_file(
7993
stream_name: &str,
8094
schema_key: &str,

server/src/query.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use chrono::TimeZone;
2222
use chrono::{DateTime, Utc};
2323
use datafusion::arrow::datatypes::Schema;
2424
use datafusion::arrow::record_batch::RecordBatch;
25+
use datafusion::datasource::file_format::parquet::ParquetFormat;
26+
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
2527
use datafusion::execution::context::SessionState;
2628
use datafusion::execution::disk_manager::DiskManagerConfig;
2729
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -32,6 +34,7 @@ use std::path::Path;
3234
use std::sync::Arc;
3335
use sysinfo::{System, SystemExt};
3436

37+
use crate::event::DEFAULT_TIMESTAMP_KEY;
3538
use crate::option::CONFIG;
3639
use crate::storage::ObjectStorageError;
3740
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
@@ -121,10 +124,30 @@ impl Query {
121124
storage: Arc<dyn ObjectStorage + Send>,
122125
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
123126
let ctx = self.create_session_context();
124-
let prefixes = self.get_prefixes();
125-
let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) };
127+
let prefixes = storage.query_prefixes(self.get_prefixes());
126128

127-
ctx.register_table(&*self.stream_name, Arc::new(table))
129+
if prefixes.is_empty() {
130+
return Ok((Vec::new(), Vec::new()));
131+
}
132+
133+
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
134+
let listing_options = ListingOptions {
135+
file_extension: ".parquet".to_string(),
136+
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
137+
infinite_source: false,
138+
format: Arc::new(file_format),
139+
table_partition_cols: vec![],
140+
collect_stat: true,
141+
target_partitions: 32,
142+
};
143+
144+
let config = ListingTableConfig::new_with_multi_paths(prefixes)
145+
.with_listing_options(listing_options)
146+
.with_schema(self.schema.clone());
147+
148+
let table = Arc::new(ListingTable::try_new(config)?);
149+
150+
ctx.register_table(&*self.stream_name, table)
128151
.map_err(ObjectStorageError::DataFusionError)?;
129152
// execute the query and collect results
130153
let df = ctx.sql(self.query.as_str()).await?;

server/src/storage/localfs.rs

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,7 @@ use std::{
2424

2525
use async_trait::async_trait;
2626
use bytes::Bytes;
27-
use datafusion::arrow::datatypes::Schema;
28-
use datafusion::{
29-
datasource::{
30-
file_format::parquet::ParquetFormat,
31-
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
32-
},
33-
error::DataFusionError,
34-
execution::runtime_env::RuntimeConfig,
35-
};
27+
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
3628
use fs_extra::file::{move_file, CopyOptions};
3729
use futures::{stream::FuturesUnordered, TryStreamExt};
3830
use relative_path::RelativePath;
@@ -202,39 +194,14 @@ impl ObjectStorage for LocalFS {
202194
Ok(())
203195
}
204196

205-
fn query_table(
206-
&self,
207-
prefixes: Vec<String>,
208-
schema: Arc<Schema>,
209-
) -> Result<Option<ListingTable>, DataFusionError> {
210-
let prefixes: Vec<ListingTableUrl> = prefixes
197+
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
198+
prefixes
211199
.into_iter()
212200
.filter_map(|prefix| {
213201
let path = self.root.join(prefix);
214202
ListingTableUrl::parse(path.to_str().unwrap()).ok()
215203
})
216-
.collect();
217-
218-
if prefixes.is_empty() {
219-
return Ok(None);
220-
}
221-
222-
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
223-
let listing_options = ListingOptions {
224-
file_extension: ".parquet".to_string(),
225-
file_sort_order: Vec::new(),
226-
infinite_source: false,
227-
format: Arc::new(file_format),
228-
table_partition_cols: vec![],
229-
collect_stat: true,
230-
target_partitions: 1,
231-
};
232-
233-
let config = ListingTableConfig::new_with_multi_paths(prefixes)
234-
.with_listing_options(listing_options)
235-
.with_schema(schema);
236-
237-
Ok(Some(ListingTable::try_new(config)?))
204+
.collect()
238205
}
239206
}
240207

server/src/storage/object_storage.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::{
2020
retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError,
2121
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
2222
};
23+
2324
use crate::{
2425
alerts::Alerts,
2526
metadata::STREAM_INFO,
@@ -32,10 +33,7 @@ use actix_web_prometheus::PrometheusMetrics;
3233
use arrow_schema::Schema;
3334
use async_trait::async_trait;
3435
use bytes::Bytes;
35-
use datafusion::{
36-
datasource::listing::ListingTable, error::DataFusionError,
37-
execution::runtime_env::RuntimeConfig,
38-
};
36+
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
3937
use relative_path::RelativePath;
4038
use relative_path::RelativePathBuf;
4139
use serde_json::Value;
@@ -69,11 +67,7 @@ pub trait ObjectStorage: Sync + 'static {
6967
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
7068
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
7169
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
72-
fn query_table(
73-
&self,
74-
prefixes: Vec<String>,
75-
schema: Arc<Schema>,
76-
) -> Result<Option<ListingTable>, DataFusionError>;
70+
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl>;
7771

7872
async fn put_schema(
7973
&self,

server/src/storage/s3.rs

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,10 @@
1818

1919
use async_trait::async_trait;
2020
use bytes::Bytes;
21-
use datafusion::arrow::datatypes::Schema;
22-
23-
use datafusion::datasource::file_format::parquet::ParquetFormat;
24-
use datafusion::datasource::listing::{
25-
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
26-
};
21+
use datafusion::datasource::listing::ListingTableUrl;
2722
use datafusion::datasource::object_store::{
2823
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
2924
};
30-
use datafusion::error::DataFusionError;
3125
use datafusion::execution::runtime_env::RuntimeConfig;
3226
use futures::stream::FuturesUnordered;
3327
use futures::{StreamExt, TryStreamExt};
@@ -453,40 +447,14 @@ impl ObjectStorage for S3 {
453447
Ok(())
454448
}
455449

456-
fn query_table(
457-
&self,
458-
prefixes: Vec<String>,
459-
schema: Arc<Schema>,
460-
) -> Result<Option<ListingTable>, DataFusionError> {
461-
// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
462-
let prefixes: Vec<ListingTableUrl> = prefixes
450+
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
451+
prefixes
463452
.into_iter()
464453
.map(|prefix| {
465454
let path = format!("s3://{}/{}", &self.bucket, prefix);
466455
ListingTableUrl::parse(path).unwrap()
467456
})
468-
.collect();
469-
470-
if prefixes.is_empty() {
471-
return Ok(None);
472-
}
473-
474-
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
475-
let listing_options = ListingOptions {
476-
file_extension: ".parquet".to_string(),
477-
file_sort_order: Vec::default(),
478-
infinite_source: false,
479-
format: Arc::new(file_format),
480-
table_partition_cols: vec![],
481-
collect_stat: true,
482-
target_partitions: 1,
483-
};
484-
485-
let config = ListingTableConfig::new_with_multi_paths(prefixes)
486-
.with_listing_options(listing_options)
487-
.with_schema(schema);
488-
489-
Ok(Some(ListingTable::try_new(config)?))
457+
.collect()
490458
}
491459
}
492460

server/src/storage/staging.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use parquet::{
3232
basic::Encoding,
3333
errors::ParquetError,
3434
file::properties::{WriterProperties, WriterPropertiesBuilder},
35+
format::SortingColumn,
3536
schema::types::ColumnPath,
3637
};
3738

@@ -234,6 +235,11 @@ fn parquet_writer_props() -> WriterPropertiesBuilder {
234235
ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]),
235236
Encoding::DELTA_BINARY_PACKED,
236237
)
238+
.set_sorting_columns(Some(vec![SortingColumn {
239+
column_idx: 0,
240+
descending: false,
241+
nulls_first: false,
242+
}]))
237243
}
238244

239245
#[derive(Debug, thiserror::Error)]

server/src/utils/arrow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub use merged_reader::MergedRecordReader;
3131

3232
pub fn replace_columns(
3333
schema: Arc<Schema>,
34-
batch: RecordBatch,
34+
batch: &RecordBatch,
3535
indexes: &[usize],
3636
arrays: &[Arc<dyn Array + 'static>],
3737
) -> RecordBatch {
@@ -73,7 +73,7 @@ mod tests {
7373

7474
let arr: Arc<dyn Array + 'static> = Arc::new(Int32Array::from_value(0, 3));
7575

76-
let new_rb = replace_columns(schema_ref.clone(), rb, &[2], &[arr]);
76+
let new_rb = replace_columns(schema_ref.clone(), &rb, &[2], &[arr]);
7777

7878
assert_eq!(new_rb.schema(), schema_ref);
7979
assert_eq!(new_rb.num_columns(), 3);

0 commit comments

Comments
 (0)