Skip to content

Commit

Permalink
minor adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jan 12, 2025
1 parent 687a53c commit 6de41ce
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 3 deletions.
5 changes: 3 additions & 2 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use url::Url;
use web_sys::js_sys;

use crate::object_store_cache::ObjectStoreCache;
use crate::{ParquetTable, SESSION_CTX};

pub(crate) static INMEMORY_STORE: LazyLock<Arc<InMemory>> =
Expand Down Expand Up @@ -175,7 +176,7 @@ pub fn FileReader(
return;
};
let op = op.finish();
let object_store = Arc::new(OpendalStore::new(op));
let object_store = Arc::new(ObjectStoreCache::new(OpendalStore::new(op)));
let meta = object_store
.head(&Path::parse(&path).unwrap())
.await
Expand Down Expand Up @@ -233,7 +234,7 @@ pub fn FileReader(
let table_path = format!("{}/{}", path, file_name);

let op = Operator::new(cfg).unwrap().finish();
let object_store = Arc::new(OpendalStore::new(op));
let object_store = Arc::new(ObjectStoreCache::new(OpendalStore::new(op)));
let meta = object_store
.head(&Path::parse(&file_name).unwrap())
.await
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod query_results;
mod row_group_column;

mod metadata;
mod object_store_cache;
use metadata::MetadataSection;

use std::{sync::Arc, sync::LazyLock};
Expand Down
120 changes: 120 additions & 0 deletions src/object_store_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{
collections::{hash_map::Entry, HashMap},
fmt::{Display, Formatter},
ops::Range,
};

use async_trait::async_trait;
use bytes::Bytes;
use futures::{lock::Mutex, stream::BoxStream};
use leptos::logging::log;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use object_store_opendal::OpendalStore;

#[derive(Debug)]
pub(crate) struct ObjectStoreCache {
inner: OpendalStore,
cache: Mutex<HashMap<(Path, Range<usize>), Bytes>>,
}

impl ObjectStoreCache {
pub(crate) fn new(inner: OpendalStore) -> Self {
Self {
inner,
cache: Mutex::new(HashMap::new()),
}
}
}

impl Display for ObjectStoreCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ObjectStoreCache")
}
}

#[async_trait]
impl ObjectStore for ObjectStoreCache {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult, object_store::Error> {
self.inner.put_opts(location, payload, opts).await
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>, object_store::Error> {
self.inner.put_multipart_opts(location, opts).await
}

async fn get(&self, location: &Path) -> Result<GetResult, object_store::Error> {
self.inner.get(location).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta, object_store::Error> {
self.inner.head(location).await
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> Result<GetResult, object_store::Error> {
return self.inner.get_opts(location, options).await;
}

async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> Result<Bytes, object_store::Error> {
let key = (location.clone(), range);
let mut cache = self.cache.lock().await;
let bytes = match cache.entry(key) {
Entry::Occupied(o) => {
log!("hit cache");
o.get().clone()
}
Entry::Vacant(v) => {
let k = v.key();
let bs = self.inner.get_range(location, k.1.clone()).await?;
v.insert(bs.clone());
bs
}
};
Ok(bytes)
}

async fn delete(&self, location: &Path) -> Result<(), object_store::Error> {
self.inner.delete(location).await
}

fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> Result<ListResult, object_store::Error> {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), object_store::Error> {
self.inner.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<(), object_store::Error> {
self.inner.copy_if_not_exists(from, to).await
}
}
2 changes: 1 addition & 1 deletion src/query_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub(crate) async fn user_input_to_sql(
logging::log!("Processing user input: {}", input);

let prompt = format!(
"Generate a SQL query to answer the following question: {}. You should generate PostgreSQL SQL dialect, all field names and table names should be double quoted, and the output SQL should be executable, be careful about the available columns. The table name is: {}, the schema of the table is: {}. ",
"Generate a SQL query to answer the following question: {}. You should generate PostgreSQL SQL dialect, all field names and table names should be double quoted, and the output SQL should be executable, be careful about the available columns. The table name is: \"{}\" (without quotes), the schema of the table is: {}. ",
input, file_name, schema_str
);
logging::log!("{}", prompt);
Expand Down

0 comments on commit 6de41ce

Please sign in to comment.