Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo committed Jun 25, 2023
1 parent f7726a4 commit 122d135
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 91 deletions.
29 changes: 26 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use chrono::TimeZone;
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
Expand All @@ -32,6 +34,7 @@ use std::path::Path;
use std::sync::Arc;
use sysinfo::{System, SystemExt};

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::option::CONFIG;
use crate::storage::ObjectStorageError;
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
Expand Down Expand Up @@ -121,10 +124,30 @@ impl Query {
storage: Arc<dyn ObjectStorage + Send>,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let ctx = self.create_session_context();
let prefixes = self.get_prefixes();
let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) };
let prefixes = storage.query_prefixes(self.get_prefixes());

ctx.register_table(&*self.stream_name, Arc::new(table))
if prefixes.is_empty() {
return Ok((Vec::new(), Vec::new()));
}

let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
infinite_source: false,
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 32,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
.with_schema(self.schema.clone());

let table = Arc::new(ListingTable::try_new(config)?);

ctx.register_table(&*self.stream_name, table)
.map_err(ObjectStorageError::DataFusionError)?;
// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
Expand Down
46 changes: 5 additions & 41 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,14 @@ use std::{

use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{arrow::datatypes::Schema, prelude::col};
use datafusion::{
datasource::{
file_format::parquet::ParquetFormat,
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
execution::runtime_env::RuntimeConfig,
};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
use fs_extra::file::{move_file, CopyOptions};
use futures::{stream::FuturesUnordered, TryStreamExt};
use relative_path::RelativePath;
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics},
};
use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::{option::validation, utils::validate_path_is_writeable};

use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
Expand Down Expand Up @@ -205,39 +194,14 @@ impl ObjectStorage for LocalFS {
Ok(())
}

fn query_table(
&self,
prefixes: Vec<String>,
schema: Arc<Schema>,
) -> Result<Option<ListingTable>, DataFusionError> {
let prefixes: Vec<ListingTableUrl> = prefixes
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
prefixes
.into_iter()
.filter_map(|prefix| {
let path = self.root.join(prefix);
ListingTableUrl::parse(path.to_str().unwrap()).ok()
})
.collect();

if prefixes.is_empty() {
return Ok(None);
}

let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
infinite_source: false,
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 32,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
.with_schema(schema);

Ok(Some(ListingTable::try_new(config)?))
.collect()
}
}

Expand Down
12 changes: 3 additions & 9 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::{
retention::Retention, staging::convert_disk_files_to_parquet, LogStream, ObjectStorageError,
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
};

use crate::{
alerts::Alerts,
metadata::STREAM_INFO,
Expand All @@ -32,10 +33,7 @@ use actix_web_prometheus::PrometheusMetrics;
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{
datasource::listing::ListingTable, error::DataFusionError,
execution::runtime_env::RuntimeConfig,
};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig};
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
use serde_json::Value;
Expand Down Expand Up @@ -69,11 +67,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
fn query_table(
&self,
prefixes: Vec<String>,
schema: Arc<Schema>,
) -> Result<Option<ListingTable>, DataFusionError>;
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl>;

async fn put_schema(
&self,
Expand Down
42 changes: 4 additions & 38 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@

use async_trait::async_trait;
use bytes::Bytes;
use datafusion::arrow::datatypes::Schema;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::prelude::col;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, Checksum};
Expand All @@ -45,7 +38,6 @@ use std::path::Path as StdPath;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};

Expand Down Expand Up @@ -455,40 +447,14 @@ impl ObjectStorage for S3 {
Ok(())
}

fn query_table(
&self,
prefixes: Vec<String>,
schema: Arc<Schema>,
) -> Result<Option<ListingTable>, DataFusionError> {
// Get all prefix paths and convert them into futures which yeilds ListingTableUrl
let prefixes: Vec<ListingTableUrl> = prefixes
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
prefixes
.into_iter()
.map(|prefix| {
let path = format!("s3://{}/{}", &self.bucket, prefix);
ListingTableUrl::parse(path).unwrap()
})
.collect();

if prefixes.is_empty() {
return Ok(None);
}

let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions {
file_extension: ".parquet".to_string(),
file_sort_order: vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]],
infinite_source: false,
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
target_partitions: 32,
};

let config = ListingTableConfig::new_with_multi_paths(prefixes)
.with_listing_options(listing_options)
.with_schema(schema);

Ok(Some(ListingTable::try_new(config)?))
.collect()
}
}

Expand Down

0 comments on commit 122d135

Please sign in to comment.