Skip to content

Commit

Permalink
Use sort ordering on timestamp array
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo committed Jun 24, 2023
1 parent f9fb6c5 commit f7726a4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 27 deletions.
20 changes: 4 additions & 16 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray};
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::Utc;

use crate::utils::{self, arrow::get_field};

Expand Down Expand Up @@ -94,25 +93,14 @@ pub trait EventFormat: Sized {
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
let timestamp_array = get_timestamp_array(rb.num_rows());

// modify the record batch to add fields to respective indexes
let rb = utils::arrow::replace_columns(
Arc::clone(&schema),
rb,
&[0, tags_index, metadata_index],
&[
Arc::new(timestamp_array),
Arc::new(tags_arr),
Arc::new(metadata_arr),
],
&rb,
&[tags_index, metadata_index],
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
);

Ok((rb, is_first))
}
}

fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
let time = Utc::now();
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
}
20 changes: 17 additions & 3 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*
*/

use arrow_array::RecordBatch;
use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_ipc::writer::StreamWriter;
use chrono::Utc;
use derive_more::{Deref, DerefMut};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;
use std::sync::Arc;

use crate::storage::staging::StorageDir;
use crate::utils;

use super::errors::StreamWriterError;

Expand All @@ -44,17 +47,24 @@ impl FileWriter {
schema_key: &str,
record: &RecordBatch,
) -> Result<(), StreamWriterError> {
let record = utils::arrow::replace_columns(
record.schema(),
record,
&[0],
&[Arc::new(get_timestamp_array(record.num_rows()))],
);

match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.write(&record)
.map_err(StreamWriterError::Writer)?;
}
// entry is not present thus we create it
None => {
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?;
let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, &record)?;
self.insert(
schema_key.to_owned(),
ArrowWriter {
Expand All @@ -75,6 +85,10 @@ impl FileWriter {
}
}

fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
}

fn init_new_stream_writer_file(
stream_name: &str,
schema_key: &str,
Expand Down
11 changes: 7 additions & 4 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{

use async_trait::async_trait;
use bytes::Bytes;
use datafusion::arrow::datatypes::Schema;
use datafusion::{arrow::datatypes::Schema, prelude::col};
use datafusion::{
datasource::{
file_format::parquet::ParquetFormat,
Expand All @@ -39,7 +39,10 @@ use relative_path::RelativePath;
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;

use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::{
event::DEFAULT_TIMESTAMP_KEY,
metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics},
};
use crate::{option::validation, utils::validate_path_is_writeable};

use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
Expand Down Expand Up @@ -222,12 +225,12 @@ impl ObjectStorage for LocalFS {
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
file_sort_order: Vec::new(),
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
infinite_source: false,
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
target_partitions: 32,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
Expand Down
6 changes: 4 additions & 2 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::datasource::object_store::{
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::prelude::col;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum};
Expand All @@ -44,6 +45,7 @@ use std::path::Path as StdPath;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};

Expand Down Expand Up @@ -474,12 +476,12 @@ impl ObjectStorage for S3 {
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
file_sort_order: Vec::default(),
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
infinite_source: false,
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 1,
target_partitions: 32,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
Expand Down
6 changes: 6 additions & 0 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use parquet::{
basic::Encoding,
errors::ParquetError,
file::properties::{WriterProperties, WriterPropertiesBuilder},
format::SortingColumn,
schema::types::ColumnPath,
};

Expand Down Expand Up @@ -234,6 +235,11 @@ fn parquet_writer_props() -> WriterPropertiesBuilder {
ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]),
Encoding::DELTA_BINARY_PACKED,
)
.set_sorting_columns(Some(vec![SortingColumn {
column_idx: 0,
descending: false,
nulls_first: false,
}]))
}

#[derive(Debug, thiserror::Error)]
Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use merged_reader::MergedRecordReader;

pub fn replace_columns(
schema: Arc<Schema>,
batch: RecordBatch,
batch: &RecordBatch,
indexes: &[usize],
arrays: &[Arc<dyn Array + 'static>],
) -> RecordBatch {
Expand Down Expand Up @@ -73,7 +73,7 @@ mod tests {

let arr: Arc<dyn Array + 'static> = Arc::new(Int32Array::from_value(0, 3));

let new_rb = replace_columns(schema_ref.clone(), rb, &[2], &[arr]);
let new_rb = replace_columns(schema_ref.clone(), &rb, &[2], &[arr]);

assert_eq!(new_rb.schema(), schema_ref);
assert_eq!(new_rb.num_columns(), 3);
Expand Down

0 comments on commit f7726a4

Please sign in to comment.