Skip to content

Commit

Permalink
refactor more
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jan 18, 2025
1 parent d7e49b5 commit de06029
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 171 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ leptos_router = { version = "0.7.0", features = ["nightly"] }
object_store = "0.11.1"
async-trait = "0.1.83"
url = "2.5.4"
anyhow = "1.0"

[profile.release]
strip = true
Expand Down
137 changes: 57 additions & 80 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
mod schema;
use anyhow::Result;
use arrow::datatypes::SchemaRef;
use datafusion::{
execution::object_store::ObjectStoreUrl,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
};
use leptos::{logging, prelude::*};
use leptos_router::components::Router;
use object_store::path::Path;
use parquet_reader::{ParquetInfo, ParquetReader, INMEMORY_STORE};

use query_results::{QueryResult, QueryResultView};
use schema::SchemaSection;

mod parquet_reader;
mod query_results;
mod row_group_column;

mod metadata;
mod object_store_cache;
use metadata::MetadataSection;

use std::{sync::Arc, sync::LazyLock};

use arrow::datatypes::SchemaRef;
use leptos::{logging, prelude::*};
use parquet::{
arrow::{
async_reader::{AsyncFileReader, ParquetObjectReader},
parquet_to_arrow_schema,
},
errors::ParquetError,
file::metadata::ParquetMetaData,
};
use std::{sync::Arc, sync::LazyLock};

mod metadata;
mod object_store_cache;
mod parquet_reader;
mod query_input;
use query_input::{execute_query_inner, QueryInput};

mod query_results;
mod row_group_column;
mod schema;
mod settings;

use metadata::MetadataSection;
use parquet_reader::{ParquetInfo, ParquetReader, INMEMORY_STORE};
use query_input::{execute_query_inner, QueryInput};
use query_results::{QueryResult, QueryResultView};
use schema::SchemaSection;
use settings::Settings;

pub(crate) static SESSION_CTX: LazyLock<Arc<SessionContext>> = LazyLock::new(|| {
Expand Down Expand Up @@ -67,10 +61,7 @@ struct DisplayInfo {
}

impl DisplayInfo {
fn from_metadata(
metadata: Arc<ParquetMetaData>,
metadata_len: u64,
) -> Result<Self, ParquetError> {
fn from_metadata(metadata: Arc<ParquetMetaData>, metadata_len: u64) -> Result<Self> {
let compressed_size = metadata
.row_groups()
.iter()
Expand Down Expand Up @@ -163,16 +154,6 @@ impl std::fmt::Display for DisplayInfo {
}
}

async fn execute_query_async(
query: &str,
) -> Result<(Vec<arrow::array::RecordBatch>, Arc<dyn ExecutionPlan>), String> {
let (results, physical_plan) = execute_query_inner(query)
.await
.map_err(|e| format!("Failed to execute query: {}", e))?;

Ok((results, physical_plan))
}

#[derive(Debug, Clone)]
struct ParquetTable {
reader: ParquetObjectReader,
Expand All @@ -190,16 +171,12 @@ impl PartialEq for ParquetTable {
}
}

async fn get_parquet_table(parquet_info: ParquetInfo) -> ParquetTable {
let meta = parquet_info
.object_store
.head(&Path::parse(&parquet_info.path).unwrap())
.await
.unwrap();
async fn get_parquet_table(parquet_info: ParquetInfo) -> Result<ParquetTable> {
let meta = parquet_info.object_store.head(&parquet_info.path).await?;
let mut reader = ParquetObjectReader::new(parquet_info.object_store.clone(), meta)
.with_preload_column_index(true)
.with_preload_offset_index(true);
let metadata = reader.get_metadata().await.unwrap();
let metadata = reader.get_metadata().await?;

let table_path = parquet_info.table_path();

Expand All @@ -224,16 +201,15 @@ async fn get_parquet_table(parquet_info: ParquetInfo) -> ParquetTable {
);
}
ctx.register_parquet(&parquet_info.table_name, &table_path, Default::default())
.await
.unwrap();
.await?;
let size = metadata.memory_size();
ParquetTable {
Ok(ParquetTable {
reader,
table_name: parquet_info.table_name,
path: parquet_info.path,
object_store_url: parquet_info.object_store_url,
display_info: DisplayInfo::from_metadata(metadata, size as u64).unwrap(),
}
display_info: DisplayInfo::from_metadata(metadata, size as u64)?,
})
}

#[component]
Expand Down Expand Up @@ -261,33 +237,45 @@ fn App() -> impl IntoView {
let Some(table) = parquet_table.get() else {
return;
};
let sql = query_input::user_input_to_sql(&query, &table)
.await
.unwrap();
match execute_query_async(&sql).await {
let sql = match query_input::user_input_to_sql(&query, &table).await {
Ok(sql) => sql,
Err(e) => {
set_error_message.set(Some(format!("{:#?}", e)));
return;
}
};

match execute_query_inner(&sql).await {
Ok((results, execution_plan)) => {
set_error_message.set(None);
set_query_results.update(|r| {
let id = r.len();
r.push(QueryResult::new(id, sql, Arc::new(results), execution_plan));
});
}
Err(e) => set_error_message.set(Some(e)),
Err(e) => set_error_message.set(Some(format!("{:#?}", e))),
}
});
};

let on_parquet_read_call_back =
move |parquet_info: Result<ParquetInfo, String>| match parquet_info {
Ok(parquet_info) => {
leptos::task::spawn_local(async move {
let table = get_parquet_table(parquet_info).await;
let default_query = format!("select * from \"{}\" limit 10", table.table_name);
set_parquet_table.set(Some(Arc::new(table)));
on_user_submit_query_call_back(default_query);
});
}
Err(e) => set_error_message.set(Some(e)),
};
let on_parquet_read_call_back = move |parquet_info: Result<ParquetInfo>| match parquet_info {
Ok(parquet_info) => {
leptos::task::spawn_local(async move {
match get_parquet_table(parquet_info).await {
Ok(table) => {
let default_query =
format!("select * from \"{}\" limit 10", table.table_name);
set_parquet_table.set(Some(Arc::new(table)));
on_user_submit_query_call_back(default_query);
}
Err(e) => {
set_error_message.set(Some(format!("{:#?}", e)));
}
}
});
}
Err(e) => set_error_message.set(Some(format!("{:#?}", e))),
};

view! {
<div class="container mx-auto px-4 py-8 max-w-6xl">
Expand Down Expand Up @@ -329,28 +317,17 @@ fn App() -> impl IntoView {
</div>
</h1>
<div class="space-y-6">
<ParquetReader
read_call_back=on_parquet_read_call_back
/>
<ParquetReader read_call_back=on_parquet_read_call_back />

{move || {
error_message
.get()
.map(|msg| {
view! {
<div class="bg-red-50 border-l-4 border-red-500 p-4 my-4">
<div class="text-red-700">{msg}</div>
<div class="mt-2 text-sm text-gray-600">
"Tips:" <ul class="list-disc ml-6 mt-2 space-y-1">
<li>"Make sure the URL has CORS enabled."</li>
<li>
"If query with natural language, make sure to set the Claude API key."
</li>
<li>
"I usually download the file and use the file picker above."
</li>
</ul>
</div>
<pre class="text-red-700 whitespace-pre-wrap break-words">
{msg}
</pre>
</div>
}
})
Expand Down
6 changes: 5 additions & 1 deletion src/object_store_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ impl ObjectStore for ObjectStoreCache {
let mut cache = self.cache.lock().await;
let bytes = match cache.entry(key) {
Entry::Occupied(o) => {
log!("hit memory cache, range: {:?}", o.key().1);
log!(
"Request hit cache, path {}, range: {:?}",
location,
o.key().1
);
o.get().clone()
}
Entry::Vacant(v) => {
Expand Down
Loading

0 comments on commit de06029

Please sign in to comment.