From 94182d9731f2683df8716d95acfbcea44c229096 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 20 Aug 2024 13:16:53 +0530 Subject: [PATCH 01/14] implemented query wrapper --- server/src/handlers/http/query.rs | 52 ++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 576f838fe..10bd87dbe 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -16,20 +16,25 @@ * */ +use actix_web::body::MessageBody; use actix_web::http::header::ContentType; -use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, Responder}; +use actix_web::web::{self, Bytes, Json}; +use actix_web::{Error, FromRequest, HttpRequest, HttpResponse, Responder, rt}; use anyhow::anyhow; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; +use futures::SinkExt; +use futures::{channel::mpsc, StreamExt}; use futures_util::Future; use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -66,7 +71,46 @@ pub struct Query { pub filter_tags: Option>, } -pub async fn query(req: HttpRequest, query_request: Query) -> Result { +pub async fn query_keep_alive( + req: HttpRequest, + query_request: web::Json, +) -> Result { + let (mut tx, rx) = mpsc::channel::>(10); + + // Spawn a task to handle the query and keepalive messages + rt::spawn(async move { + let heartbeat_interval = 30; + // Create a stream that sends keepalive messages every 30 seconds + let interval = interval(Duration::from_secs(heartbeat_interval)); + let mut keepalive = IntervalStream::new(interval).map(|_| Ok(Bytes::from("keepalive\n"))); + + // Send keepalive messages + while let Some(item) = keepalive.next().await { + if tx.send(item).await.is_err() { + return; + } + } + + // Execute the query + match query(req.clone(), query_request.into_inner()).await { + Ok(response) => { + if let Ok(res_body) = response.respond_to(&req).into_body().try_into_bytes() { + let _ = tx.send(Ok(res_body)).await; + } + } + Err(e) => { + let _ = tx.send(Ok(Bytes::from(format!("Error: {}", e)))).await; + } + } + }); + + // Return the streaming response + Ok(HttpResponse::Ok() + .content_type("application/octet-stream") + .streaming(rx)) +} + +async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); // get the logical plan and extract the table name From be7a8c1f572b46ec374c282b2c9bbcd0a5fddb22 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 20 Aug 2024 13:17:02 +0530 Subject: [PATCH 02/14] configured keep alive --- server/src/handlers/http/modal/query_server.rs | 5 ++++- server/src/handlers/http/modal/server.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 9d6f3fbdf..d3ada33eb 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -32,6 +32,7 @@ use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; use std::sync::Arc; +use std::time::Duration; use crate::option::CONFIG; @@ -74,8 +75,10 @@ impl ParseableServer for QueryServer { .wrap(cross_origin_config()) }; + let keep_alive_timeout = 120; + // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()).keep_alive(Duration::from_secs(keep_alive_timeout)); if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index e17080e31..b8da7c8c6 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -228,7 +228,7 @@ impl Server { // get the query factory pub fn get_query_factory() -> Resource { - web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) + web::resource("/query").route(web::post().to(query::query_keep_alive).authorize(Action::Query)) } pub fn get_cache_webscope() -> Scope { From cac1daa48977ef6f6b4ca408d65ef3abbb0e566b Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 20 Aug 2024 14:05:04 +0530 Subject: [PATCH 03/14] match method signatures --- server/src/handlers/http/query.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 10bd87dbe..5f1c08414 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -73,7 +73,7 @@ pub struct Query { pub async fn query_keep_alive( req: HttpRequest, - query_request: web::Json, + query_request: Query, ) -> Result { let (mut tx, rx) = mpsc::channel::>(10); @@ -92,7 +92,7 @@ pub async fn query_keep_alive( } // Execute the query - match query(req.clone(), query_request.into_inner()).await { + match query(req.clone(), query_request).await { Ok(response) => { if let Ok(res_body) = response.respond_to(&req).into_body().try_into_bytes() { let _ = tx.send(Ok(res_body)).await; From 2b5fd8a74f7ae601cb16d36288ccd46b474fdd92 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Tue, 20 Aug 2024 16:23:18 +0530 Subject: [PATCH 04/14] dropped wrapper method --- server/src/handlers/http/modal/server.rs | 2 +- server/src/handlers/http/query.rs | 55 ++++++++++++++---------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index b8da7c8c6..e17080e31 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -228,7 +228,7 @@ impl Server { // get the query factory pub fn get_query_factory() -> Resource { - web::resource("/query").route(web::post().to(query::query_keep_alive).authorize(Action::Query)) + web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } pub fn get_cache_webscope() -> Scope { diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 5f1c08414..a1a16ecaa 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -26,7 +26,7 @@ use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures::SinkExt; -use futures::{channel::mpsc, StreamExt}; +use futures::channel::mpsc; use futures_util::Future; use http::StatusCode; use std::collections::HashMap; @@ -34,7 +34,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::time::interval; -use tokio_stream::wrappers::IntervalStream; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -80,28 +79,40 @@ pub async fn query_keep_alive( // Spawn a task to handle the query and keepalive messages rt::spawn(async move { let heartbeat_interval = 30; - // Create a stream that sends keepalive messages every 30 seconds - let interval = interval(Duration::from_secs(heartbeat_interval)); - let mut keepalive = IntervalStream::new(interval).map(|_| Ok(Bytes::from("keepalive\n"))); - - // Send keepalive messages - while let Some(item) = keepalive.next().await { - if tx.send(item).await.is_err() { - return; - } - } - - // Execute the query - match query(req.clone(), query_request).await { - Ok(response) => { - if let Ok(res_body) = response.respond_to(&req).into_body().try_into_bytes() { - let _ = tx.send(Ok(res_body)).await; + let mut interval = interval(Duration::from_secs(heartbeat_interval)); + + let query_future = query(req.clone(), query_request); + let mut query_result = Box::pin(query_future); + + loop { + tokio::select! { + _ = interval.tick() => { + // Send a keepalive message + if tx.send(Ok(Bytes::from("keepalive\n"))).await.is_err() { + // If the receiver is dropped, stop the loop + return; + } + }, + result = &mut query_result => { + // Handle the query result and send it + match result { + Ok(response) => { + if let Ok(res_body) = response.respond_to(&req).into_body().try_into_bytes() { + let _ = tx.send(Ok(res_body)).await; + } + } + Err(e) => { + let _ = tx.send(Ok(Bytes::from(format!("Error: {}", e)))).await; + } + } + // Break the loop after sending the query result + break; } } - Err(e) => { - let _ = tx.send(Ok(Bytes::from(format!("Error: {}", e)))).await; - } } + + // After the loop, close the channel to signal completion + drop(tx); }); // Return the streaming response @@ -110,7 +121,7 @@ pub async fn query_keep_alive( .streaming(rx)) } -async fn query(req: HttpRequest, query_request: Query) -> Result { +pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); // get the logical plan and extract the table name From 37a41919f536b224d5b9be81fede10900cae4efa Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 20 Aug 2024 16:55:45 +0530 Subject: [PATCH 05/14] keep alive for server bindingwq --- server/src/handlers/http/modal/server.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index e17080e31..a316d094a 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -37,6 +37,7 @@ use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use std::sync::Arc; +use std::time::Duration; use actix_web::web::resource; use actix_web::Resource; @@ -96,8 +97,10 @@ impl ParseableServer for Server { &CONFIG.parseable.tls_key_path, )?; + let keep_alive_timeout = 120; + // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()); + let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()).keep_alive(Duration::from_secs(keep_alive_timeout)); if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? From 6c83c3740b024ea296caa537f11a0384f1860c3f Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Thu, 22 Aug 2024 20:33:32 +0530 Subject: [PATCH 06/14] converted cache to `hash.parquet` files --- server/src/handlers/http/query.rs | 222 +++++++----------------------- server/src/querycache.rs | 146 ++++++++++---------- 2 files changed, 129 insertions(+), 239 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index a1a16ecaa..91feac4be 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -16,31 +16,25 @@ * */ -use actix_web::body::MessageBody; use actix_web::http::header::ContentType; -use actix_web::web::{self, Bytes, Json}; -use actix_web::{Error, FromRequest, HttpRequest, HttpResponse, Responder, rt}; -use anyhow::anyhow; +use actix_web::web::{self, Json}; +use actix_web::{FromRequest, HttpRequest, Responder}; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; -use futures::SinkExt; -use futures::channel::mpsc; use futures_util::Future; use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::time::interval; +use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use arrow_array::RecordBatch; use crate::event::commit_schema; -use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; @@ -70,55 +64,9 @@ pub struct Query { pub filter_tags: Option>, } -pub async fn query_keep_alive( - req: HttpRequest, - query_request: Query, -) -> Result { - let (mut tx, rx) = mpsc::channel::>(10); - - // Spawn a task to handle the query and keepalive messages - rt::spawn(async move { - let heartbeat_interval = 30; - let mut interval = interval(Duration::from_secs(heartbeat_interval)); - - let query_future = query(req.clone(), query_request); - let mut query_result = Box::pin(query_future); - - loop { - tokio::select! { - _ = interval.tick() => { - // Send a keepalive message - if tx.send(Ok(Bytes::from("keepalive\n"))).await.is_err() { - // If the receiver is dropped, stop the loop - return; - } - }, - result = &mut query_result => { - // Handle the query result and send it - match result { - Ok(response) => { - if let Ok(res_body) = response.respond_to(&req).into_body().try_into_bytes() { - let _ = tx.send(Ok(res_body)).await; - } - } - Err(e) => { - let _ = tx.send(Ok(Bytes::from(format!("Error: {}", e)))).await; - } - } - // Break the loop after sending the query result - break; - } - } - } - - // After the loop, close the channel to signal completion - drop(tx); - }); - - // Return the streaming response - Ok(HttpResponse::Ok() - .content_type("application/octet-stream") - .streaming(rx)) +enum QueryStatus { + Processing, + Result(Query), } pub async fn query(req: HttpRequest, query_request: Query) -> Result { @@ -132,33 +80,15 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Result<(), Q #[allow(clippy::too_many_arguments)] pub async fn put_results_in_cache( - cache_results: Option<&str>, - user_id: Option<&str>, - query_cache_manager: Option<&QueryCacheManager>, + query_cache_manager: &QueryCacheManager, stream: &str, records: &[RecordBatch], start: String, end: String, query: String, ) -> Result<(), QueryError> { - match (cache_results, query_cache_manager) { - (Some(_), None) => { - log::warn!( - "Instructed to cache query results but Query Caching is not Enabled in Server" - ); - - Ok(()) - } - // do cache - (Some(should_cache), Some(query_cache_manager)) => { - if should_cache != "true" { - log::error!("value of cache results header is false"); - return Err(QueryError::CacheError(CacheError::Other( - "should not cache results", - ))); - } + let mut cache = query_cache_manager.get_cache(&start, &end, &query).await?; - let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; - let mut cache = query_cache_manager.get_cache(stream, user_id).await?; + let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone()); - let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone()); - - // guard to stop multiple caching of the same content - if let Some(path) = cache.get_file(&cache_key) { - log::info!("File already exists in cache, Removing old file"); - cache.delete(&cache_key, path).await?; - } + // guard to stop multiple caching of the same content + if let Some(path) = cache.get_file(&cache_key) { + log::info!("File already exists in cache, Removing old file"); + cache.delete(&cache_key, path).await?; + } - if let Err(err) = query_cache_manager - .create_parquet_cache(stream, records, user_id, start, end, query) - .await - { - log::error!("Error occured while caching query results: {:?}", err); - if query_cache_manager - .clear_cache(stream, user_id) - .await - .is_err() - { - log::error!("Error Clearing Unwanted files from cache dir"); - } - } - // fallthrough - Ok(()) + if let Err(err) = query_cache_manager + .create_parquet_cache(stream, records, &start, &end, &query) + .await + { + log::error!("Error occured while caching query results: {:?}", err); + if query_cache_manager + .clear_cache(&start, &end, &query) + .await + .is_err() + { + log::error!("Error Clearing Unwanted files from cache dir"); } - (None, _) => Ok(()), } + // fallthrough + Ok(()) } #[allow(clippy::too_many_arguments)] pub async fn get_results_from_cache( - show_cached: Option<&str>, - query_cache_manager: Option<&QueryCacheManager>, - stream: &str, - user_id: Option<&str>, + query_cache_manager: &QueryCacheManager, start_time: &str, end_time: &str, query: &str, send_null: bool, send_fields: bool, ) -> Result { - match (show_cached, query_cache_manager) { - (Some(_), None) => { - log::warn!( - "Instructed to show cached results but Query Caching is not Enabled on Server" - ); - None - } - (Some(should_show), Some(query_cache_manager)) => { - if should_show != "true" { - log::error!("value of show cached header is false"); - return Err(QueryError::CacheError(CacheError::Other( - "should not return cached results", - ))); - } + let mut query_cache = query_cache_manager + .get_cache(start_time, end_time, query) + .await?; - let user_id = - user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?; - - let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; - - let (start, end) = parse_human_time(start_time, end_time)?; - - let file_path = query_cache.get_file(&CacheMetadata::new( - query.to_string(), - start.to_rfc3339(), - end.to_rfc3339(), - )); - if let Some(file_path) = file_path { - let (records, fields) = query_cache.get_cached_records(&file_path).await?; - let response = QueryResponse { - records, - fields, - fill_null: send_null, - with_fields: send_fields, - }; - - Some(Ok(response)) - } else { - None - } - } - (_, _) => None, + let (start, end) = parse_human_time(start_time, end_time)?; + + let file_path = query_cache.get_file(&CacheMetadata::new( + query.to_string(), + start.to_rfc3339(), + end.to_rfc3339(), + )); + if let Some(file_path) = file_path { + let (records, fields) = query_cache.get_cached_records(&file_path).await?; + let response = QueryResponse { + records, + fields, + fill_null: send_null, + with_fields: send_fields, + }; + + Ok(response) + } else { + Err(QueryError::CacheMiss) } - .map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val) } pub fn authorize_and_set_filter_tags( diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 4086de7b8..751b8a61b 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -17,7 +17,6 @@ */ use arrow_array::RecordBatch; -use chrono::Utc; use futures::TryStreamExt; use futures_util::TryFutureExt; use hashlru::Cache; @@ -26,7 +25,8 @@ use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; -use std::collections::HashMap; +use std::collections::{hash_map::DefaultHasher, HashMap}; +use std::hash::{Hash, Hasher}; use std::path::{Path, PathBuf}; use tokio::fs as AsyncFs; use tokio::{fs, sync::Mutex}; @@ -34,7 +34,7 @@ use tokio::{fs, sync::Mutex}; use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metadata::STREAM_INFO; use crate::storage::staging::parquet_writer_props; -use crate::{localcache::CacheError, option::CONFIG, utils::hostname_unchecked}; +use crate::{localcache::CacheError, option::CONFIG}; pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; @@ -148,44 +148,29 @@ pub struct QueryCacheManager { } impl QueryCacheManager { - pub fn gen_file_path(query_staging_path: &str, stream: &str, user_id: &str) -> PathBuf { + pub fn gen_file_path(query_staging_path: &str, start: &str, end: &str, query: &str) -> PathBuf { PathBuf::from_iter([ query_staging_path, USERS_ROOT_DIR, - user_id, - stream, - &format!( - "{}.{}.parquet", - hostname_unchecked(), - Utc::now().timestamp() - ), + &format!("{}.parquet", generate_hash(start, end, query)), ]) } - pub async fn global(config_capacity: u64) -> Result, CacheError> { + pub async fn global(config_capacity: u64) -> Result<&'static Self, CacheError> { static INSTANCE: OnceCell = OnceCell::new(); - let cache_path = CONFIG.parseable.query_cache_path.as_ref(); - - if cache_path.is_none() { - return Ok(None); - } - - let cache_path = cache_path.unwrap(); - let cache_manager = INSTANCE.get_or_init(|| { - let cache_path = cache_path.clone(); + let cache_path = String::from("/query-cache"); std::fs::create_dir_all(&cache_path).unwrap(); Self { filesystem: LocalFileSystem::new(), - cache_path, + cache_path: cache_path.into(), total_cache_capacity: CONFIG.parseable.query_cache_size, semaphore: Mutex::new(()), } }); cache_manager.validate(config_capacity).await?; - - Ok(Some(cache_manager)) + Ok(cache_manager) } async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { @@ -241,8 +226,13 @@ impl QueryCacheManager { Ok(()) } - pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result { - let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); + pub async fn get_cache( + &self, + start: &str, + end: &str, + query: &str, + ) -> Result { + let path = query_cache_file_path(&self.cache_path, start, end, query).unwrap(); let res = self .filesystem .get(&path) @@ -259,27 +249,30 @@ impl QueryCacheManager { pub async fn remove_from_cache( &self, key: CacheMetadata, - stream: &str, - user_id: &str, + start: &str, + end: &str, + query: &str, ) -> Result<(), CacheError> { - let mut cache = self.get_cache(stream, user_id).await?; - - if let Some(remove_result) = cache.remove(&key) { - self.put_cache(stream, &cache, user_id).await?; - tokio::spawn(fs::remove_file(remove_result)); + let mut cache = self.get_cache(start, end, query).await?; + + if let Some(file_for_removal) = cache.remove(&key) { + self.put_cache(start, end, query, &cache).await?; + tokio::spawn(fs::remove_file(file_for_removal)); Ok(()) } else { Err(CacheError::DoesNotExist) } } + pub async fn put_cache( &self, - stream: &str, + start: &str, + end: &str, + query: &str, cache: &QueryCache, - user_id: &str, ) -> Result<(), CacheError> { - let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); + let path = query_cache_file_path(&self.cache_path, start, end, query).unwrap(); let bytes = serde_json::to_vec(cache)?.into(); let result = self.filesystem.put(&path, bytes).await?; @@ -289,14 +282,15 @@ impl QueryCacheManager { pub async fn move_to_cache( &self, - stream: &str, key: CacheMetadata, file_path: &Path, - user_id: &str, + start: &str, + end: &str, + query: &str, ) -> Result<(), CacheError> { let lock = self.semaphore.lock().await; let file_size = std::fs::metadata(file_path)?.len(); - let mut cache = self.get_cache(stream, user_id).await?; + let mut cache = self.get_cache(start, end, query).await?; while cache.current_size + file_size > self.total_cache_capacity { if let Some((_, file_for_removal)) = cache.files.pop_lru() { @@ -315,7 +309,7 @@ impl QueryCacheManager { } cache.files.push(key, file_path.to_path_buf()); cache.current_size += file_size; - self.put_cache(stream, &cache, user_id).await?; + self.put_cache(start, end, query, &cache).await?; drop(lock); Ok(()) } @@ -324,15 +318,15 @@ impl QueryCacheManager { &self, table_name: &str, records: &[RecordBatch], - user_id: &str, - start: String, - end: String, - query: String, + start: &str, + end: &str, + query: &str, ) -> Result<(), CacheError> { let parquet_path = Self::gen_file_path( self.cache_path.to_str().expect("utf-8 compat path"), - user_id, - table_name, + &start, + &end, + &query, ); AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; let parquet_file = AsyncFs::File::create(&parquet_path).await?; @@ -356,32 +350,39 @@ impl QueryCacheManager { arrow_writer.close().await?; self.move_to_cache( - table_name, - CacheMetadata::new(query, start, end), + CacheMetadata::new(query.to_string(), start.to_string(), end.to_string()), &parquet_path, - user_id, + &start, + &end, + &query, ) .await } - pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> { - let cache = self.get_cache(stream, user_id).await?; - let map = cache.files.values().collect_vec(); - let p_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); - let path = self.cache_path.join(p_path); - let mut paths = fs::read_dir(path).await?; - while let Some(path) = paths.next_entry().await? { - let check = path.path().is_file() - && map.contains(&&path.path()) + pub async fn clear_cache(&self, start: &str, end: &str, query: &str) -> Result<(), CacheError> { + // Generate the hash using start, end, and query + let hash = generate_hash(start, end, query); + + // Construct the path to the cache directory + let p_path = self.cache_path.join(format!("{}", hash)); + + // Get the list of files in the cache directory + let mut paths = fs::read_dir(&p_path).await?; + + while let Some(entry) = paths.next_entry().await? { + let path = entry.path(); + + // Check if the file should be deleted + if path.is_file() && !path - .path() .file_name() .expect("File Name is Proper") .to_str() - .expect("Path is Proper utf-8 ") - .ends_with(".json"); - if check { - fs::remove_file(path.path()).await?; + .expect("Path is Proper utf-8") + .ends_with(".json") + { + // Remove the file asynchronously + fs::remove_file(path).await?; } } @@ -389,15 +390,22 @@ impl QueryCacheManager { } } +fn generate_hash(start: &str, end: &str, query: &str) -> u64 { + let mut hasher = DefaultHasher::new(); + start.hash(&mut hasher); + end.hash(&mut hasher); + query.hash(&mut hasher); + hasher.finish() +} + fn query_cache_file_path( root: impl AsRef, - stream: &str, - user_id: &str, + start: &str, + end: &str, + query: &str, ) -> Result { - let local_meta_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); - let mut path = root.as_ref().join(local_meta_path); - - path.push(QUERY_CACHE_FILENAME); + let mut path = root.as_ref().to_path_buf(); + path.push(format!("{}.parquet", generate_hash(start, end, query))); object_store::path::Path::from_absolute_path(path) } From b286a73aa66390aa40fef3ceb470a871b4c8eb7c Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Thu, 22 Aug 2024 22:58:51 +0530 Subject: [PATCH 07/14] implemented logic --- server/src/handlers/http/modal/server.rs | 7 +- server/src/handlers/http/query.rs | 148 ++++++++++++++++------- server/src/query.rs | 2 +- server/src/querycache.rs | 2 +- server/src/response.rs | 7 +- 5 files changed, 118 insertions(+), 48 deletions(-) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index a316d094a..5a84e9915 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -36,16 +36,18 @@ use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use actix_web::web::resource; +use actix_web::web::{resource, Data}; use actix_web::Resource; use actix_web::Scope; use actix_web::{web, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; +use tokio::sync::Mutex; use crate::{ handlers::http::{ @@ -84,7 +86,10 @@ impl ParseableServer for Server { }; let create_app_fn = move || { + let query_map: query::QueryMap = Arc::new(Mutex::new(HashMap::new())); + App::new() + .app_data(Data::new(query_map)) .wrap(prometheus.clone()) .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) .wrap(actix_web::middleware::Logger::default()) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 91feac4be..871bd0d78 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -18,7 +18,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, Responder}; +use actix_web::{FromRequest, HttpRequest, HttpResponse}; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -28,10 +28,13 @@ use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; +use tokio::time::sleep; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::rbac::map::SessionKey; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -41,7 +44,7 @@ use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::querycache::{CacheMetadata, QueryCacheManager}; +use crate::querycache::{generate_hash, CacheMetadata, QueryCacheManager}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -50,7 +53,7 @@ use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, @@ -64,81 +67,142 @@ pub struct Query { pub filter_tags: Option>, } +pub type QueryMap = Arc>>; enum QueryStatus { Processing, - Result(Query), + Result(QueryResponse), } -pub async fn query(req: HttpRequest, query_request: Query) -> Result { +pub async fn query( + req: HttpRequest, + query_request: Query, + query_map: web::Data, +) -> Result { let session_state = QUERY_SESSION.state(); - // get the logical plan and extract the table name + // Generate hash for the query based on start, end, and query string + let hash = generate_hash( + &query_request.start_time, + &query_request.end_time, + &query_request.query, + ); + + // Check if the query is already being processed or completed + { + let query_map = query_map.lock().await; + if let Some(status) = query_map.get(&hash) { + match status { + QueryStatus::Processing => { + // Wait for 55 seconds and check again + sleep(Duration::from_secs(55)).await; + if let Some(QueryStatus::Result(response)) = query_map.get(&hash) { + return Ok(response.clone().to_http()?); + } else { + return Ok(HttpResponse::Accepted().finish()); + } + } + QueryStatus::Result(response) => { + return Ok(response.clone().to_http()?); + } + } + } + } + + // Insert the query into the map as Processing + { + let mut query_map = query_map.lock().await; + query_map.insert(hash.clone(), QueryStatus::Processing); + } + + // Clone necessary data for the spawned task + let query_request_clone = query_request.clone(); + let hash_clone = hash.clone(); + let session_state_clone = session_state.clone(); + let creds = extract_session_key_from_req(&req).unwrap().to_owned(); + + // Spawn a separate task to process the query and cache the results + tokio::spawn(async move { + let mut query_map = query_map.lock().await; + + let result = process_query( + query_request_clone, + Arc::new(session_state_clone), + creds, + ).await; + + // Update the query status in the map + match result { + Ok(response) => { + query_map.insert(hash_clone, QueryStatus::Result(response)); + } + Err(err) => { + log::error!("Error processing query: {:?}", err); + query_map.remove(&hash_clone); + } + } + }); + + // Wait for 55 seconds and respond with HTTP 202 + sleep(Duration::from_secs(55)).await; + Ok(HttpResponse::Accepted().finish()) +} + +async fn process_query( + query_request: Query, + session_state: Arc, + creds: SessionKey, +) -> Result { let raw_logical_plan = session_state .create_logical_plan(&query_request.query) - .await?; + .await.unwrap(); - // create a visitor to extract the table name let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); - let _stream = visitor + + let visitor_clone = visitor.clone(); + let stream = visitor .top() .ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?; - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size).await?; - - // deal with cached data - if let Ok(results) = get_results_from_cache( - query_cache_manager, - &query_request.start_time, - &query_request.end_time, - &query_request.query, - query_request.send_null, - query_request.fields, - ) - .await - { - return results.to_http(); - }; - - let tables = visitor.into_inner(); + // Process the query + let tables = visitor_clone.into_inner(); update_schema_when_distributed(tables).await?; + let mut query: LogicalQuery = into_query(&query_request, &session_state).await?; - let creds = extract_session_key_from_req(&req)?; let permissions = Users.get_permissions(&creds); - let table_name = query - .first_table_name() + let table_name = query.first_table_name() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; authorize_and_set_filter_tags(&mut query, permissions, &table_name)?; let time = Instant::now(); let (records, fields) = query.execute(table_name.clone()).await?; - // deal with cache saving - if let Err(err) = put_results_in_cache( - query_cache_manager, - &table_name, + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await?; + + // Cache the results + put_results_in_cache( + &query_cache_manager, + stream, &records, query.start.to_rfc3339(), query.end.to_rfc3339(), - query_request.query, + query_request.query.clone(), ) - .await - { - log::error!("{}", err); - }; + .await?; + // Create the response let response = QueryResponse { records, fields, fill_null: query_request.send_null, with_fields: query_request.fields, - } - .to_http()?; + }; let time = time.elapsed().as_secs_f64(); - QUERY_EXECUTE_TIME .with_label_values(&[&table_name]) .observe(time); diff --git a/server/src/query.rs b/server/src/query.rs index 4e345148a..6b78c6079 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -176,7 +176,7 @@ impl Query { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct TableScanVisitor { tables: Vec, } diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 751b8a61b..4891896c5 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -390,7 +390,7 @@ impl QueryCacheManager { } } -fn generate_hash(start: &str, end: &str, query: &str) -> u64 { +pub fn generate_hash(start: &str, end: &str, query: &str) -> u64 { let mut hasher = DefaultHasher::new(); start.hash(&mut hasher); end.hash(&mut hasher); diff --git a/server/src/response.rs b/server/src/response.rs index e2abfa2d2..32dc3463e 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -23,12 +23,13 @@ use crate::{ record_batches_to_json, }, }; -use actix_web::{web, Responder}; +use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; use tonic::{Response, Status}; +#[derive(Clone)] pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -37,7 +38,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> Result { + pub fn to_http(&self) -> Result { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json(&records)?; @@ -62,7 +63,7 @@ impl QueryResponse { Value::Array(values) }; - Ok(web::Json(response)) + Ok(HttpResponse::Ok().json(response)) } pub fn into_flight(self) -> Result, Status> { From 3d9de04ffdd9e1c98398197ffbccd81e1160ee3e Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Fri, 23 Aug 2024 11:36:12 +0530 Subject: [PATCH 08/14] converted wait time to `P_PROXY_TIMEOUT` --- server/src/cli.rs | 14 ++++++++++++++ server/src/handlers/http/query.rs | 8 ++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 6f4160738..d9a339047 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -109,6 +109,8 @@ pub struct Cli { pub max_disk_usage: f64, pub ms_clarity_tag: Option, + + pub proxy_timeout: u64 } impl Cli { @@ -145,6 +147,7 @@ impl Cli { pub const HOT_TIER_PATH: &'static str = "hot-tier-path"; pub const MAX_DISK_USAGE: &'static str = "max-disk-usage"; pub const MS_CLARITY_TAG: &'static str = "ms-clarity-tag"; + pub const PROXY_TIMEOUT: &'static str = "proxy-timeout"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -434,6 +437,16 @@ impl Cli { .required(false) .help("Tag for MS Clarity"), ) + .arg( + Arg::new(Self::PROXY_TIMEOUT) + .long(Self::PROXY_TIMEOUT) + .env("P_PROXY_TIMEOUT") + .value_name("NUMBER") + .required(false) + .default_value("60") + .value_parser(value_parser!(u64)) + .help("Time to wait before responding to a query request.") + ) .group( ArgGroup::new("oidc") .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) @@ -578,6 +591,7 @@ impl FromArgMatches for Cli { .expect("default for max disk usage"); self.ms_clarity_tag = m.get_one::(Self::MS_CLARITY_TAG).cloned(); + self.proxy_timeout = m.get_one::(Self::PROXY_TIMEOUT).cloned().expect("Please specify a default timeout for query requests."); Ok(()) } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 871bd0d78..1553473fc 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -93,8 +93,8 @@ pub async fn query( if let Some(status) = query_map.get(&hash) { match status { QueryStatus::Processing => { - // Wait for 55 seconds and check again - sleep(Duration::from_secs(55)).await; + // Wait and check again + sleep(Duration::from_secs(CONFIG.parseable.proxy_timeout)).await; if let Some(QueryStatus::Result(response)) = query_map.get(&hash) { return Ok(response.clone().to_http()?); } else { @@ -142,8 +142,8 @@ pub async fn query( } }); - // Wait for 55 seconds and respond with HTTP 202 - sleep(Duration::from_secs(55)).await; + // Wait and respond with HTTP 202 + sleep(Duration::from_secs(CONFIG.parseable.proxy_timeout)).await; Ok(HttpResponse::Accepted().finish()) } From 57e9491b50da68ff8492462ffe4a6849fc47c301 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sat, 24 Aug 2024 19:47:58 +0530 Subject: [PATCH 09/14] disabled code that uses old method signature --- server/src/handlers/airplane.rs | 64 +++++++++++++------------- server/src/handlers/http/cache.rs | 75 ++++++++++++++++--------------- 2 files changed, 72 insertions(+), 67 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index c803ba194..39e726989 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -155,9 +155,9 @@ impl FlightService for AirServiceImpl { let streams = visitor.into_inner(); - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); + // let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + // .await + // .unwrap_or(None); let cache_results = req .metadata() @@ -179,21 +179,21 @@ impl FlightService for AirServiceImpl { .to_owned(); // send the cached results - if let Ok(cache_results) = get_results_from_cache( - show_cached, - query_cache_manager, - &stream_name, - user_id, - &ticket.start_time, - &ticket.end_time, - &ticket.query, - ticket.send_null, - ticket.fields, - ) - .await - { - return cache_results.into_flight(); - } + // if let Ok(cache_results) = get_results_from_cache( + // show_cached, + // query_cache_manager, + // &stream_name, + // user_id, + // &ticket.start_time, + // &ticket.end_time, + // &ticket.query, + // ticket.send_null, + // ticket.fields, + // ) + // .await + // { + // return cache_results.into_flight(); + // } update_schema_when_distributed(streams) .await @@ -257,20 +257,20 @@ impl FlightService for AirServiceImpl { .await .map_err(|err| Status::internal(err.to_string()))?; - if let Err(err) = put_results_in_cache( - cache_results, - user_id, - query_cache_manager, - &stream_name, - &records, - query.start.to_rfc3339(), - query.end.to_rfc3339(), - ticket.query, - ) - .await - { - log::error!("{}", err); - }; + // if let Err(err) = put_results_in_cache( + // cache_results, + // user_id, + // query_cache_manager, + // &stream_name, + // &records, + // query.start.to_rfc3339(), + // query.end.to_rfc3339(), + // ticket.query, + // ) + // .await + // { + // log::error!("{}", err); + // }; /* * INFO: No returning the schema with the data. diff --git a/server/src/handlers/http/cache.rs b/server/src/handlers/http/cache.rs index 29efd09e4..88710d600 100644 --- a/server/src/handlers/http/cache.rs +++ b/server/src/handlers/http/cache.rs @@ -40,30 +40,33 @@ pub async fn list(req: HttpRequest) -> Result { .get("user_id") .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); - - if let Some(query_cache_manager) = query_cache_manager { - let cache = query_cache_manager - .get_cache(stream, user_id) - .await - .map_err(PostError::CacheError)?; - - let size = cache.used_cache_size(); - let queries = cache.queries(); - - let out = json!({ - "used_capacity": size, - "cache": queries - }); - - Ok((web::Json(out), StatusCode::OK)) - } else { - Err(PostError::Invalid(anyhow!( - "Query Caching is not active on server " - ))) - } + // let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + // .await + // .unwrap_or(None); + + // if let Some(query_cache_manager) = query_cache_manager { + // let cache = query_cache_manager + // .get_cache(stream, user_id) + // .await + // .map_err(PostError::CacheError)?; + + // let size = cache.used_cache_size(); + // let queries = cache.queries(); + + // let out = json!({ + // "used_capacity": size, + // "cache": queries + // }); + + // Ok((web::Json(out), StatusCode::OK)) + // } else { + // Err(PostError::Invalid(anyhow!( + // "Query Caching is not active on server " + // ))) + // } + + Ok(HttpResponse::Ok().finish()) + } pub async fn remove(req: HttpRequest, body: Bytes) -> Result { @@ -79,17 +82,19 @@ pub async fn remove(req: HttpRequest, body: Bytes) -> Result(&body)?; - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); + // let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + // .await + // .unwrap_or(None); + + // if let Some(query_cache_manager) = query_cache_manager { + // query_cache_manager + // .remove_from_cache(query, stream, user_id) + // .await?; - if let Some(query_cache_manager) = query_cache_manager { - query_cache_manager - .remove_from_cache(query, stream, user_id) - .await?; + // Ok(HttpResponse::Ok().finish()) + // } else { + // Err(PostError::Invalid(anyhow!("Query Caching is not enabled"))) + // } - Ok(HttpResponse::Ok().finish()) - } else { - Err(PostError::Invalid(anyhow!("Query Caching is not enabled"))) - } + Ok(HttpResponse::Ok().finish()) } From df48a86ba12234dcd07b44d2163fb8ae3be60712 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Sat, 24 Aug 2024 19:48:22 +0530 Subject: [PATCH 10/14] corrected logic --- server/src/handlers/http/query.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 1553473fc..2327453a8 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -68,7 +68,7 @@ pub struct Query { } pub type QueryMap = Arc>>; -enum QueryStatus { +pub enum QueryStatus { Processing, Result(QueryResponse), } @@ -78,6 +78,21 @@ pub async fn query( query_request: Query, query_map: web::Data, ) -> Result { + + // If result is there in cache, just return it. + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await?; // since query_cache_manager is used inside get_results_from_cache() only, it'd be better to move it inside function body. + if let Ok(result) = get_results_from_cache( + query_cache_manager, + &query_request.start_time, + &query_request.end_time, + &query_request.query, + true, + true + ).await { + return Ok(result.to_http()?); + } + let session_state = QUERY_SESSION.state(); // Generate hash for the query based on start, end, and query string From ab836bb8bd97c34b6d9b4f0f6b423b5a7cc9b288 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Mon, 26 Aug 2024 17:00:00 +0530 Subject: [PATCH 11/14] new put & get cache methods --- server/src/handlers/http/query.rs | 107 ++++++++++++++++++++++++------ 1 file changed, 86 insertions(+), 21 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 2327453a8..c4a42d31e 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -23,12 +23,17 @@ use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; +use futures::TryStreamExt; use futures_util::Future; use http::StatusCode; +use itertools::Itertools; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; use std::collections::HashMap; +use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; +use tokio::fs as AsyncFs; use tokio::sync::Mutex; use tokio::time::sleep; @@ -78,18 +83,12 @@ pub async fn query( query_request: Query, query_map: web::Data, ) -> Result { - // If result is there in cache, just return it. - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await?; // since query_cache_manager is used inside get_results_from_cache() only, it'd be better to move it inside function body. - if let Ok(result) = get_results_from_cache( - query_cache_manager, - &query_request.start_time, - &query_request.end_time, - &query_request.query, - true, - true - ).await { + if let Some(result) = find_from_cache( + &query_request.start_time, + &query_request.end_time, + &query_request.query, + ).await.unwrap() { return Ok(result.to_http()?); } @@ -139,11 +138,7 @@ pub async fn query( tokio::spawn(async move { let mut query_map = query_map.lock().await; - let result = process_query( - query_request_clone, - Arc::new(session_state_clone), - creds, - ).await; + let result = process_query(query_request_clone, Arc::new(session_state_clone), creds).await; // Update the query status in the map match result { @@ -167,9 +162,12 @@ async fn process_query( session_state: Arc, creds: SessionKey, ) -> Result { + sleep(Duration::from_secs(120)).await; + let raw_logical_plan = session_state .create_logical_plan(&query_request.query) - .await.unwrap(); + .await + .unwrap(); let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -187,7 +185,8 @@ async fn process_query( let permissions = Users.get_permissions(&creds); - let table_name = query.first_table_name() + let table_name = query + .first_table_name() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; authorize_and_set_filter_tags(&mut query, permissions, &table_name)?; @@ -195,9 +194,8 @@ async fn process_query( let time = Instant::now(); let (records, fields) = query.execute(table_name.clone()).await?; - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await?; - + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size).await?; + // Cache the results put_results_in_cache( &query_cache_manager, @@ -240,6 +238,73 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q Ok(()) } +async fn cache_query_results( + records: &[RecordBatch], + start: &str, + end: &str, + query: &str, +) -> Result<(), Box> { + let parquet_path = PathBuf::from_iter([ + "/home/vishalds/query-cache", + &format!("{}.parquet", generate_hash(start, end, query)), + ]); + AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; + let parquet_file = AsyncFs::File::create(&parquet_path).await?; + + let sch = if let Some(record) = records.first() { + record.schema() + } else { + // the record batch is empty, do not cache and return early + return Ok(()); + }; + + let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, None)?; + + for record in records { + if let Err(e) = arrow_writer.write(record).await { + log::error!("Error While Writing to Query Cache: {}", e); + } + } + + arrow_writer.close().await?; + Ok(()) +} + +async fn find_from_cache( + start: &str, + end: &str, + query: &str, +) -> Result, Box> { + let parquet_path = PathBuf::from_iter([ + "/home/vishalds/query-cache", + &format!("{}.parquet", generate_hash(start, end, query)), + ]); + + if let Ok(file) = AsyncFs::File::open(parquet_path).await { + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + // Build a async parquet reader. + let stream = builder.build()?; + + let records = stream.try_collect::>().await?; + let fields = records.first().map_or_else(Vec::new, |record| { + record + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec() + }); + + Ok(Some(QueryResponse { + records, + fields, + fill_null: true, + with_fields: true, + })) + } else{ Ok(None) } +} + #[allow(clippy::too_many_arguments)] pub async fn put_results_in_cache( query_cache_manager: &QueryCacheManager, From d03d524acd85d296f7c8a46b17177c754c116f9d Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Wed, 28 Aug 2024 09:23:12 +0530 Subject: [PATCH 12/14] fixed some issues --- .../src/handlers/http/modal/query_server.rs | 13 +- server/src/handlers/http/modal/server.rs | 8 +- server/src/handlers/http/query.rs | 198 +++++++++--------- server/src/query.rs | 2 + server/src/querycache.rs | 14 +- 5 files changed, 127 insertions(+), 108 deletions(-) diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index d3ada33eb..900bec4e2 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -20,17 +20,19 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::RouteExt; -use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; +use crate::handlers::http::{base_path, cross_origin_config, query, API_BASE_PATH, API_VERSION}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::web; +use actix_web::web::{self, Data}; use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; +use tokio::sync::Mutex; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -67,7 +69,10 @@ impl ParseableServer for QueryServer { )?; let create_app_fn = move || { + let query_set: query::QuerySet = Arc::new(Mutex::new(HashSet::new())); + App::new() + .app_data(Data::new(query_set)) .wrap(prometheus.clone()) .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) .wrap(actix_web::middleware::Logger::default()) @@ -78,7 +83,9 @@ impl ParseableServer for QueryServer { let keep_alive_timeout = 120; // concurrent workers equal to number of cores on the cpu - let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()).keep_alive(Duration::from_secs(keep_alive_timeout)); + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .keep_alive(Duration::from_secs(keep_alive_timeout)); if let Some(config) = ssl { http_server .bind_rustls_0_22(&CONFIG.parseable.address, config)? diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 5a84e9915..34309d2a6 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -36,7 +36,7 @@ use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -86,10 +86,10 @@ impl ParseableServer for Server { }; let create_app_fn = move || { - let query_map: query::QueryMap = Arc::new(Mutex::new(HashMap::new())); - + let query_set: query::QuerySet = Arc::new(Mutex::new(HashSet::new())); + App::new() - .app_data(Data::new(query_map)) + .app_data(Data::new(query_set)) .wrap(prometheus.clone()) .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) .wrap(actix_web::middleware::Logger::default()) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index c4a42d31e..7c9ce359d 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -28,7 +28,7 @@ use futures_util::Future; use http::StatusCode; use itertools::Itertools; use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; @@ -39,6 +39,7 @@ use tokio::time::sleep; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use crate::metadata::STREAM_INFO; use crate::rbac::map::SessionKey; use arrow_array::RecordBatch; @@ -55,6 +56,7 @@ use crate::rbac::Users; use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; +use crate::storage::staging::parquet_writer_props; use crate::utils::actix::extract_session_key_from_req; /// Query Request through http endpoint. @@ -72,28 +74,13 @@ pub struct Query { pub filter_tags: Option>, } -pub type QueryMap = Arc>>; -pub enum QueryStatus { - Processing, - Result(QueryResponse), -} +pub type QuerySet = Arc>>; pub async fn query( req: HttpRequest, query_request: Query, - query_map: web::Data, + query_set: web::Data, ) -> Result { - // If result is there in cache, just return it. - if let Some(result) = find_from_cache( - &query_request.start_time, - &query_request.end_time, - &query_request.query, - ).await.unwrap() { - return Ok(result.to_http()?); - } - - let session_state = QUERY_SESSION.state(); - // Generate hash for the query based on start, end, and query string let hash = generate_hash( &query_request.start_time, @@ -101,66 +88,68 @@ pub async fn query( &query_request.query, ); - // Check if the query is already being processed or completed - { - let query_map = query_map.lock().await; - if let Some(status) = query_map.get(&hash) { - match status { - QueryStatus::Processing => { - // Wait and check again - sleep(Duration::from_secs(CONFIG.parseable.proxy_timeout)).await; - if let Some(QueryStatus::Result(response)) = query_map.get(&hash) { - return Ok(response.clone().to_http()?); - } else { - return Ok(HttpResponse::Accepted().finish()); - } - } - QueryStatus::Result(response) => { - return Ok(response.clone().to_http()?); - } - } - } + // If result is in cache, just return it. + if let Some(result) = find_from_cache( + hash, + query_request.send_null, + query_request.fields + ).await.unwrap() { + return Ok(result.to_http()?); } - // Insert the query into the map as Processing - { - let mut query_map = query_map.lock().await; - query_map.insert(hash.clone(), QueryStatus::Processing); - } + let session_state = QUERY_SESSION.state(); - // Clone necessary data for the spawned task - let query_request_clone = query_request.clone(); - let hash_clone = hash.clone(); - let session_state_clone = session_state.clone(); - let creds = extract_session_key_from_req(&req).unwrap().to_owned(); + let should_spawn = { + let mut query_set = query_set.lock().await; + query_set.insert(hash.clone()) + }; - // Spawn a separate task to process the query and cache the results - tokio::spawn(async move { - let mut query_map = query_map.lock().await; + // insert the hash into the set anyway, it'll return true if not previously present + if should_spawn { // if this hash is new to the set, + // Clone necessary data for the spawned task + let query_request_clone = query_request.clone(); + let hash_clone = hash.clone(); + let session_state_clone = session_state.clone(); + let creds = extract_session_key_from_req(&req).unwrap().to_owned(); - let result = process_query(query_request_clone, Arc::new(session_state_clone), creds).await; + // Spawn a separate task to process the query and cache the results + tokio::spawn(async move { + let mut query_set = query_set.lock().await; - // Update the query status in the map - match result { - Ok(response) => { - query_map.insert(hash_clone, QueryStatus::Result(response)); - } - Err(err) => { + if let Err(err) = + process_query(query_request_clone, Arc::new(session_state_clone), creds, hash_clone).await + { log::error!("Error processing query: {:?}", err); - query_map.remove(&hash_clone); } + + query_set.remove(&hash_clone); + }); + } + + // wait for a (proxy timeout - 5) seconds and at each second, check if the query has finished processing + let start_time = Instant::now(); + let timeout = Duration::from_secs(CONFIG.parseable.proxy_timeout - 5); + + while start_time.elapsed() < timeout { + if let Some(result) = find_from_cache( + hash, + query_request.send_null, + query_request.fields + ).await.unwrap() { + return Ok(result.to_http()?); } - }); + sleep(Duration::from_secs(1)).await; + } - // Wait and respond with HTTP 202 - sleep(Duration::from_secs(CONFIG.parseable.proxy_timeout)).await; - Ok(HttpResponse::Accepted().finish()) + // If we've timed out, return HTTP 202 + return Ok(HttpResponse::Accepted().finish()); } async fn process_query( query_request: Query, session_state: Arc, creds: SessionKey, + hash: u64 ) -> Result { sleep(Duration::from_secs(120)).await; @@ -173,9 +162,6 @@ async fn process_query( let _ = raw_logical_plan.visit(&mut visitor); let visitor_clone = visitor.clone(); - let stream = visitor - .top() - .ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?; // Process the query let tables = visitor_clone.into_inner(); @@ -194,18 +180,14 @@ async fn process_query( let time = Instant::now(); let (records, fields) = query.execute(table_name.clone()).await?; - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size).await?; - // Cache the results - put_results_in_cache( - &query_cache_manager, - stream, + cache_query_results( &records, - query.start.to_rfc3339(), - query.end.to_rfc3339(), - query_request.query.clone(), + &table_name, + hash ) - .await?; + .await + .unwrap(); // Create the response let response = QueryResponse { @@ -240,16 +222,19 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q async fn cache_query_results( records: &[RecordBatch], - start: &str, - end: &str, - query: &str, + table_name: &str, + hash: u64 ) -> Result<(), Box> { - let parquet_path = PathBuf::from_iter([ - "/home/vishalds/query-cache", - &format!("{}.parquet", generate_hash(start, end, query)), - ]); + let root_path = match &CONFIG.parseable.query_cache_path { + Some(path) => path, + None => &PathBuf::from("~/.query-cache") + }; + let parquet_path = root_path.join(&format!("{}.parquet", hash)); + AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; let parquet_file = AsyncFs::File::create(&parquet_path).await?; + let time_partition = STREAM_INFO.get_time_partition(table_name)?; + let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build(); let sch = if let Some(record) = records.first() { record.schema() @@ -258,7 +243,7 @@ async fn cache_query_results( return Ok(()); }; - let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, None)?; + let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?; for record in records { if let Err(e) = arrow_writer.write(record).await { @@ -267,20 +252,39 @@ async fn cache_query_results( } arrow_writer.close().await?; + + // delete this file after the preset time + let parquet_path_clone = parquet_path.clone(); + tokio::spawn(async move { + + // 24 hours for now + sleep(Duration::from_secs(24 * 60 * 60)).await; + if let Err(e) = AsyncFs::remove_file(&parquet_path_clone).await { + log::error!("Error deleting cached query file: {}", e); + } else { + log::info!("Successfully deleted cached query file: {:?}", parquet_path_clone); + } + }); + Ok(()) } -async fn find_from_cache( - start: &str, - end: &str, - query: &str, -) -> Result, Box> { - let parquet_path = PathBuf::from_iter([ - "/home/vishalds/query-cache", - &format!("{}.parquet", generate_hash(start, end, query)), - ]); +async fn find_from_cache(hash: u64, fill_null: bool, with_fields: bool) -> Result, Box> { + let root_path = match &CONFIG.parseable.query_cache_path { + Some(path) => path, + None => &PathBuf::from("~/.query-cache") + }; + let parquet_path = root_path.join(&format!("{}.parquet", hash)); if let Ok(file) = AsyncFs::File::open(parquet_path).await { + + // check if this is an empty response + let length = file.metadata().await.unwrap().len(); + + if length < 1 { + return Ok(Some(QueryResponse { records: vec![], fields: vec![], fill_null, with_fields})) + } + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; // Build a async parquet reader. let stream = builder.build()?; @@ -299,12 +303,15 @@ async fn find_from_cache( Ok(Some(QueryResponse { records, fields, - fill_null: true, - with_fields: true, + fill_null, + with_fields, })) - } else{ Ok(None) } + } else { + Ok(None) + } } +#[allow(dead_code)] #[allow(clippy::too_many_arguments)] pub async fn put_results_in_cache( query_cache_manager: &QueryCacheManager, @@ -341,6 +348,7 @@ pub async fn put_results_in_cache( Ok(()) } +#[allow(dead_code)] #[allow(clippy::too_many_arguments)] pub async fn get_results_from_cache( query_cache_manager: &QueryCacheManager, @@ -473,7 +481,7 @@ pub async fn into_query( }) } -fn parse_human_time( +pub fn parse_human_time( start_time: &str, end_time: &str, ) -> Result<(DateTime, DateTime), QueryError> { diff --git a/server/src/query.rs b/server/src/query.rs index 6b78c6079..bfaec8092 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -185,6 +185,8 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } + + #[allow(dead_code)] pub fn top(&self) -> Option<&str> { self.tables.first().map(|s| s.as_ref()) } diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 4891896c5..5998cbb15 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -36,7 +36,7 @@ use crate::metadata::STREAM_INFO; use crate::storage::staging::parquet_writer_props; use crate::{localcache::CacheError, option::CONFIG}; -pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; +// pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; @@ -159,11 +159,11 @@ impl QueryCacheManager { static INSTANCE: OnceCell = OnceCell::new(); let cache_manager = INSTANCE.get_or_init(|| { - let cache_path = String::from("/query-cache"); + let cache_path = PathBuf::from("/home/vishalds/query-cache"); std::fs::create_dir_all(&cache_path).unwrap(); Self { filesystem: LocalFileSystem::new(), - cache_path: cache_path.into(), + cache_path, total_cache_capacity: CONFIG.parseable.query_cache_size, semaphore: Mutex::new(()), } @@ -254,16 +254,15 @@ impl QueryCacheManager { query: &str, ) -> Result<(), CacheError> { let mut cache = self.get_cache(start, end, query).await?; - + if let Some(file_for_removal) = cache.remove(&key) { self.put_cache(start, end, query, &cache).await?; - tokio::spawn(fs::remove_file(file_for_removal)); + tokio::spawn(fs::remove_file(file_for_removal)); Ok(()) } else { Err(CacheError::DoesNotExist) } } - pub async fn put_cache( &self, @@ -392,6 +391,9 @@ impl QueryCacheManager { pub fn generate_hash(start: &str, end: &str, query: &str) -> u64 { let mut hasher = DefaultHasher::new(); + // let (start_t, end_t) = parse_human_time(start, end).unwrap(); + // start_t.to_string().as_str().hash(&mut hasher); + // end_t.to_string().as_str().hash(&mut hasher); start.hash(&mut hasher); end.hash(&mut hasher); query.hash(&mut hasher); From 4769908f8c9365e6a1e36d75a51027740a181d02 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Wed, 28 Aug 2024 10:55:12 +0530 Subject: [PATCH 13/14] add disk usage check --- server/src/handlers/http/query.rs | 91 ++++++++++++++++++++----------- server/src/hottier.rs | 35 ++---------- server/src/utils.rs | 30 +++++++++- 3 files changed, 94 insertions(+), 62 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 7c9ce359d..c6c1365d0 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -41,6 +41,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::metadata::STREAM_INFO; use crate::rbac::map::SessionKey; +use crate::utils::get_disk_usage; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -55,8 +56,8 @@ use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; -use crate::storage::ObjectStorageError; use crate::storage::staging::parquet_writer_props; +use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; /// Query Request through http endpoint. @@ -89,11 +90,10 @@ pub async fn query( ); // If result is in cache, just return it. - if let Some(result) = find_from_cache( - hash, - query_request.send_null, - query_request.fields - ).await.unwrap() { + if let Some(result) = find_from_cache(hash, query_request.send_null, query_request.fields) + .await + .unwrap() + { return Ok(result.to_http()?); } @@ -105,7 +105,8 @@ pub async fn query( }; // insert the hash into the set anyway, it'll return true if not previously present - if should_spawn { // if this hash is new to the set, + if should_spawn { + // if this hash is new to the set, // Clone necessary data for the spawned task let query_request_clone = query_request.clone(); let hash_clone = hash.clone(); @@ -116,8 +117,13 @@ pub async fn query( tokio::spawn(async move { let mut query_set = query_set.lock().await; - if let Err(err) = - process_query(query_request_clone, Arc::new(session_state_clone), creds, hash_clone).await + if let Err(err) = process_query( + query_request_clone, + Arc::new(session_state_clone), + creds, + hash_clone, + ) + .await { log::error!("Error processing query: {:?}", err); } @@ -131,11 +137,10 @@ pub async fn query( let timeout = Duration::from_secs(CONFIG.parseable.proxy_timeout - 5); while start_time.elapsed() < timeout { - if let Some(result) = find_from_cache( - hash, - query_request.send_null, - query_request.fields - ).await.unwrap() { + if let Some(result) = find_from_cache(hash, query_request.send_null, query_request.fields) + .await + .unwrap() + { return Ok(result.to_http()?); } sleep(Duration::from_secs(1)).await; @@ -149,7 +154,7 @@ async fn process_query( query_request: Query, session_state: Arc, creds: SessionKey, - hash: u64 + hash: u64, ) -> Result { sleep(Duration::from_secs(120)).await; @@ -181,13 +186,9 @@ async fn process_query( let (records, fields) = query.execute(table_name.clone()).await?; // Cache the results - cache_query_results( - &records, - &table_name, - hash - ) - .await - .unwrap(); + cache_query_results(&records, &table_name, hash) + .await + .unwrap(); // Create the response let response = QueryResponse { @@ -223,12 +224,30 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q async fn cache_query_results( records: &[RecordBatch], table_name: &str, - hash: u64 + hash: u64, ) -> Result<(), Box> { let root_path = match &CONFIG.parseable.query_cache_path { Some(path) => path, - None => &PathBuf::from("~/.query-cache") + None => &PathBuf::from("~/.query-cache"), }; + + let (total_disk_space, available_disk_space, used_disk_space) = + get_disk_usage(root_path); + let size_to_download = records.len().try_into().unwrap(); + if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = + (total_disk_space, available_disk_space, used_disk_space) + { + if available_disk_space < size_to_download { + return Err("parseable is out of disk space to cache this query.".into()); + } + + if ((used_disk_space + size_to_download) as f64 * 100.0 / total_disk_space as f64) + > CONFIG.parseable.max_disk_usage + { + return Err("parseable is out of disk space to cache this query.".into()); + } + } + let parquet_path = root_path.join(&format!("{}.parquet", hash)); AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; @@ -256,35 +275,45 @@ async fn cache_query_results( // delete this file after the preset time let parquet_path_clone = parquet_path.clone(); tokio::spawn(async move { - // 24 hours for now sleep(Duration::from_secs(24 * 60 * 60)).await; if let Err(e) = AsyncFs::remove_file(&parquet_path_clone).await { log::error!("Error deleting cached query file: {}", e); } else { - log::info!("Successfully deleted cached query file: {:?}", parquet_path_clone); + log::info!( + "Successfully deleted cached query file: {:?}", + parquet_path_clone + ); } }); Ok(()) } -async fn find_from_cache(hash: u64, fill_null: bool, with_fields: bool) -> Result, Box> { +async fn find_from_cache( + hash: u64, + fill_null: bool, + with_fields: bool, +) -> Result, Box> { let root_path = match &CONFIG.parseable.query_cache_path { Some(path) => path, - None => &PathBuf::from("~/.query-cache") + None => &PathBuf::from("~/.query-cache"), }; let parquet_path = root_path.join(&format!("{}.parquet", hash)); if let Ok(file) = AsyncFs::File::open(parquet_path).await { - // check if this is an empty response let length = file.metadata().await.unwrap().len(); if length < 1 { - return Ok(Some(QueryResponse { records: vec![], fields: vec![], fill_null, with_fields})) + return Ok(Some(QueryResponse { + records: vec![], + fields: vec![], + fill_null, + with_fields, + })); } - + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; // Build a async parquet reader. let stream = builder.build()?; diff --git a/server/src/hottier.rs b/server/src/hottier.rs index 475a559a3..0fec37c0c 100644 --- a/server/src/hottier.rs +++ b/server/src/hottier.rs @@ -29,7 +29,7 @@ use crate::{ metadata::{error::stream_info::MetadataError, STREAM_INFO}, option::{validation::bytes_to_human_size, CONFIG}, storage::{ObjectStorage, ObjectStorageError}, - utils::extract_datetime, + utils::{extract_datetime, get_disk_usage}, validator::error::HotTierValidationError, }; use chrono::NaiveDate; @@ -41,7 +41,6 @@ use once_cell::sync::OnceCell; use parquet::errors::ParquetError; use relative_path::RelativePathBuf; use std::time::Duration; -use sysinfo::{Disks, System}; use tokio::fs::{self, DirEntry}; use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; @@ -133,7 +132,8 @@ impl HotTierManager { } } - let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); + let (total_disk_space, available_disk_space, used_disk_space) = + get_disk_usage(CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap()); if let (Some(total_disk_space), _, Some(used_disk_space)) = (total_disk_space, available_disk_space, used_disk_space) @@ -653,7 +653,8 @@ impl HotTierManager { ///check if the disk is available to download the parquet file /// check if the disk usage is above the threshold pub async fn is_disk_available(&self, size_to_download: u64) -> Result { - let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); + let (total_disk_space, available_disk_space, used_disk_space) = + get_disk_usage(CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap()); if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = (total_disk_space, available_disk_space, used_disk_space) @@ -751,32 +752,6 @@ pub fn hot_tier_file_path( object_store::path::Path::from_absolute_path(path) } -///get the disk usage for the hot tier storage path -pub fn get_disk_usage() -> (Option, Option, Option) { - let mut sys = System::new_all(); - sys.refresh_all(); - let path = CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap(); - - let mut disks = Disks::new_with_refreshed_list(); - disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); - disks.reverse(); - - for disk in disks.iter() { - if path.starts_with(disk.mount_point().to_str().unwrap()) { - let total_disk_space = disk.total_space(); - let available_disk_space = disk.available_space(); - let used_disk_space = total_disk_space - available_disk_space; - return ( - Some(total_disk_space), - Some(available_disk_space), - Some(used_disk_space), - ); - } - } - - (None, None, None) -} - async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { async fn delete_helper(path: &Path) -> io::Result<()> { if path.is_dir() { diff --git a/server/src/utils.rs b/server/src/utils.rs index 34630c375..26f91cb94 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -28,8 +28,10 @@ use itertools::Itertools; use regex::Regex; use sha2::{Digest, Sha256}; use std::collections::HashMap; -use std::env; +use std::{env, path::PathBuf}; use url::Url; +use sysinfo::{Disks, System}; + #[allow(dead_code)] pub fn hostname() -> Option { hostname::get() @@ -321,6 +323,32 @@ pub fn extract_datetime(path: &str) -> Option { } } +pub fn get_disk_usage( + path: &PathBuf +) -> (Option, Option, Option) { + let mut sys = System::new_all(); + sys.refresh_all(); + + let mut disks = Disks::new_with_refreshed_list(); + disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); + disks.reverse(); + + for disk in disks.iter() { + if path.starts_with(disk.mount_point().to_str().unwrap()) { + let total_disk_space = disk.total_space(); + let available_disk_space = disk.available_space(); + let used_disk_space = total_disk_space - available_disk_space; + return ( + Some(total_disk_space), + Some(available_disk_space), + Some(used_disk_space), + ); + } + } + + (None, None, None) +} + #[cfg(test)] mod tests { use chrono::DateTime; From 6b9827ef0ab90faa30371395573e16d63994def9 Mon Sep 17 00:00:00 2001 From: vishalkrishnads <321vishalds@gmail.com> Date: Wed, 28 Aug 2024 16:41:30 +0530 Subject: [PATCH 14/14] return errors --- .gitignore | 2 +- server/src/cli.rs | 8 +- .../src/handlers/http/modal/query_server.rs | 4 +- server/src/handlers/http/modal/server.rs | 4 +- server/src/handlers/http/query.rs | 104 +++++++++--------- 5 files changed, 65 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 5bbc1b194..dc6fcaed6 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ parseable parseable_* parseable-env-secret cache - +query-cache/ diff --git a/server/src/cli.rs b/server/src/cli.rs index d9a339047..41b0c81c1 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -94,7 +94,7 @@ pub struct Cli { pub flight_port: u16, /// to query cached data - pub query_cache_path: Option, + pub query_cache_path: PathBuf, /// Size for local cache pub query_cache_size: u64, @@ -222,6 +222,7 @@ impl Cli { .long(Self::QUERY_CACHE) .env("P_QUERY_CACHE_DIR") .value_name("DIR") + .default_value("./query-cache") .value_parser(validation::canonicalize_path) .help("Local path on this device to be used for caching data") .next_line_help(true), @@ -465,7 +466,6 @@ impl FromArgMatches for Cli { fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { self.local_cache_path = m.get_one::(Self::CACHE).cloned(); - self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); @@ -492,6 +492,10 @@ impl FromArgMatches for Cli { .get_one(Self::QUERY_CACHE_SIZE) .cloned() .expect("default value for query cache size"); + self.query_cache_path = m + .get_one(Self::QUERY_CACHE) + .cloned() + .expect("default value foor query cache directory"); self.username = m .get_one::(Self::USERNAME) .cloned() diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 900bec4e2..cac3033eb 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -32,7 +32,7 @@ use actix_web::web::ServiceConfig; use actix_web::{App, HttpServer}; use async_trait::async_trait; use tokio::sync::Mutex; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -69,7 +69,7 @@ impl ParseableServer for QueryServer { )?; let create_app_fn = move || { - let query_set: query::QuerySet = Arc::new(Mutex::new(HashSet::new())); + let query_set: query::QueryMap = Arc::new(Mutex::new(HashMap::new())); App::new() .app_data(Data::new(query_set)) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 34309d2a6..3bc1bd66b 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -36,7 +36,7 @@ use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -86,7 +86,7 @@ impl ParseableServer for Server { }; let create_app_fn = move || { - let query_set: query::QuerySet = Arc::new(Mutex::new(HashSet::new())); + let query_set: query::QueryMap = Arc::new(Mutex::new(HashMap::new())); App::new() .app_data(Data::new(query_set)) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index c6c1365d0..7a078aa57 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -18,7 +18,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, HttpResponse}; +use actix_web::{FromRequest, HttpRequest, HttpResponse, ResponseError}; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -28,8 +28,7 @@ use futures_util::Future; use http::StatusCode; use itertools::Itertools; use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; -use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -75,21 +74,26 @@ pub struct Query { pub filter_tags: Option>, } -pub type QuerySet = Arc>>; +pub type QueryMap = Arc>>; + +pub enum QueryStatus { + Processing, + Error(QueryError), +} +unsafe impl Send for QueryStatus {} +unsafe impl Sync for QueryStatus {} pub async fn query( req: HttpRequest, query_request: Query, - query_set: web::Data, + query_map: web::Data, ) -> Result { - // Generate hash for the query based on start, end, and query string let hash = generate_hash( &query_request.start_time, &query_request.end_time, &query_request.query, ); - // If result is in cache, just return it. if let Some(result) = find_from_cache(hash, query_request.send_null, query_request.fields) .await .unwrap() @@ -99,40 +103,41 @@ pub async fn query( let session_state = QUERY_SESSION.state(); - let should_spawn = { - let mut query_set = query_set.lock().await; - query_set.insert(hash.clone()) + let should_spawn_task = { + let mut query_map = query_map.lock().await; + match query_map.get(&hash) { + Some(QueryStatus::Error(error)) => { + return Ok(error.error_response()); + } + Some(QueryStatus::Processing) => false, + None => { + query_map.insert(hash, QueryStatus::Processing); + true + } + } }; - // insert the hash into the set anyway, it'll return true if not previously present - if should_spawn { - // if this hash is new to the set, - // Clone necessary data for the spawned task + if should_spawn_task { let query_request_clone = query_request.clone(); - let hash_clone = hash.clone(); - let session_state_clone = session_state.clone(); + let hash_clone = hash; + let session_state_clone = Arc::new(session_state); let creds = extract_session_key_from_req(&req).unwrap().to_owned(); + let query_map_clone = query_map.clone(); - // Spawn a separate task to process the query and cache the results tokio::spawn(async move { - let mut query_set = query_set.lock().await; - - if let Err(err) = process_query( - query_request_clone, - Arc::new(session_state_clone), - creds, - hash_clone, - ) - .await - { - log::error!("Error processing query: {:?}", err); - } + let mut query_map = query_map_clone.lock().await; - query_set.remove(&hash_clone); + match process_query(query_request_clone, session_state_clone, creds, hash_clone).await { + Ok(_) => { + query_map.remove(&hash_clone); + } + Err(err) => { + query_map.insert(hash_clone, QueryStatus::Error(err)); + } + }; }); } - // wait for a (proxy timeout - 5) seconds and at each second, check if the query has finished processing let start_time = Instant::now(); let timeout = Duration::from_secs(CONFIG.parseable.proxy_timeout - 5); @@ -143,11 +148,16 @@ pub async fn query( { return Ok(result.to_http()?); } + + let query_map = query_map.lock().await; + if let Some(QueryStatus::Error(err)) = query_map.get(&hash) { + return Ok(err.error_response()); + } + sleep(Duration::from_secs(1)).await; } - // If we've timed out, return HTTP 202 - return Ok(HttpResponse::Accepted().finish()); + Ok(HttpResponse::Accepted().finish()) } async fn process_query( @@ -156,12 +166,9 @@ async fn process_query( creds: SessionKey, hash: u64, ) -> Result { - sleep(Duration::from_secs(120)).await; - let raw_logical_plan = session_state .create_logical_plan(&query_request.query) - .await - .unwrap(); + .await?; let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -226,13 +233,8 @@ async fn cache_query_results( table_name: &str, hash: u64, ) -> Result<(), Box> { - let root_path = match &CONFIG.parseable.query_cache_path { - Some(path) => path, - None => &PathBuf::from("~/.query-cache"), - }; - let (total_disk_space, available_disk_space, used_disk_space) = - get_disk_usage(root_path); + get_disk_usage(&CONFIG.parseable.query_cache_path); let size_to_download = records.len().try_into().unwrap(); if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = (total_disk_space, available_disk_space, used_disk_space) @@ -248,7 +250,10 @@ async fn cache_query_results( } } - let parquet_path = root_path.join(&format!("{}.parquet", hash)); + let parquet_path = CONFIG + .parseable + .query_cache_path + .join(&format!("{}.parquet", hash)); AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; let parquet_file = AsyncFs::File::create(&parquet_path).await?; @@ -276,7 +281,7 @@ async fn cache_query_results( let parquet_path_clone = parquet_path.clone(); tokio::spawn(async move { // 24 hours for now - sleep(Duration::from_secs(24 * 60 * 60)).await; + sleep(Duration::from_secs(60 * 60)).await; if let Err(e) = AsyncFs::remove_file(&parquet_path_clone).await { log::error!("Error deleting cached query file: {}", e); } else { @@ -295,11 +300,10 @@ async fn find_from_cache( fill_null: bool, with_fields: bool, ) -> Result, Box> { - let root_path = match &CONFIG.parseable.query_cache_path { - Some(path) => path, - None => &PathBuf::from("~/.query-cache"), - }; - let parquet_path = root_path.join(&format!("{}.parquet", hash)); + let parquet_path = CONFIG + .parseable + .query_cache_path + .join(&format!("{}.parquet", hash)); if let Ok(file) = AsyncFs::File::open(parquet_path).await { // check if this is an empty response