Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LB timeout in QueryServer #893

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
7 changes: 6 additions & 1 deletion server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ use crate::storage;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use actix_web::web::resource;
use actix_web::web::{resource, Data};
use actix_web::Resource;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use actix_web_static_files::ResourceFiles;
use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::{
handlers::http::{
Expand Down Expand Up @@ -84,7 +86,10 @@ impl ParseableServer for Server {
};

let create_app_fn = move || {
let query_map: query::QueryMap = Arc::new(Mutex::new(HashMap::new()));

App::new()
.app_data(Data::new(query_map))
.wrap(prometheus.clone())
.configure(|cfg| Server::configure_routes(cfg, oidc_client.clone()))
.wrap(actix_web::middleware::Logger::default())
Expand Down
148 changes: 106 additions & 42 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use actix_web::{FromRequest, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand All @@ -28,10 +28,13 @@ use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::rbac::map::SessionKey;
use arrow_array::RecordBatch;

use crate::event::commit_schema;
Expand All @@ -41,7 +44,7 @@ use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::{CacheMetadata, QueryCacheManager};
use crate::querycache::{generate_hash, CacheMetadata, QueryCacheManager};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -50,7 +53,7 @@ use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand All @@ -64,81 +67,142 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

pub type QueryMap = Arc<Mutex<HashMap<u64, QueryStatus>>>;
enum QueryStatus {
Processing,
Result(Query),
Result(QueryResponse),
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
pub async fn query(
req: HttpRequest,
query_request: Query,
query_map: web::Data<QueryMap>,
) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
// Generate hash for the query based on start, end, and query string
let hash = generate_hash(
&query_request.start_time,
&query_request.end_time,
&query_request.query,
);

// Check if the query is already being processed or completed
{
let query_map = query_map.lock().await;
if let Some(status) = query_map.get(&hash) {
match status {
QueryStatus::Processing => {
// Wait for 55 seconds and check again
sleep(Duration::from_secs(55)).await;
vishalkrishnads marked this conversation as resolved.
Show resolved Hide resolved
if let Some(QueryStatus::Result(response)) = query_map.get(&hash) {
return Ok(response.clone().to_http()?);
} else {
return Ok(HttpResponse::Accepted().finish());
}
}
QueryStatus::Result(response) => {
return Ok(response.clone().to_http()?);
}
}
}
}

// Insert the query into the map as Processing
{
let mut query_map = query_map.lock().await;
query_map.insert(hash.clone(), QueryStatus::Processing);
}

// Clone necessary data for the spawned task
let query_request_clone = query_request.clone();
let hash_clone = hash.clone();
let session_state_clone = session_state.clone();
let creds = extract_session_key_from_req(&req).unwrap().to_owned();

// Spawn a separate task to process the query and cache the results
tokio::spawn(async move {
let mut query_map = query_map.lock().await;

let result = process_query(
query_request_clone,
Arc::new(session_state_clone),
creds,
).await;

// Update the query status in the map
match result {
Ok(response) => {
query_map.insert(hash_clone, QueryStatus::Result(response));
}
Err(err) => {
log::error!("Error processing query: {:?}", err);
query_map.remove(&hash_clone);
}
}
});

// Wait for 55 seconds and respond with HTTP 202
sleep(Duration::from_secs(55)).await;
vishalkrishnads marked this conversation as resolved.
Show resolved Hide resolved
Ok(HttpResponse::Accepted().finish())
}

async fn process_query(
query_request: Query,
session_state: Arc<SessionState>,
creds: SessionKey,
) -> Result<QueryResponse, QueryError> {
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;
.await.unwrap();

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let _stream = visitor

let visitor_clone = visitor.clone();
let stream = visitor
.top()
.ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?;

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size).await?;

// deal with cached data
if let Ok(results) = get_results_from_cache(
query_cache_manager,
&query_request.start_time,
&query_request.end_time,
&query_request.query,
query_request.send_null,
query_request.fields,
)
.await
{
return results.to_http();
};

let tables = visitor.into_inner();
// Process the query
let tables = visitor_clone.into_inner();
update_schema_when_distributed(tables).await?;

let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let table_name = query
.first_table_name()
let table_name = query.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();
let (records, fields) = query.execute(table_name.clone()).await?;
// deal with cache saving
if let Err(err) = put_results_in_cache(
query_cache_manager,
&table_name,

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await?;

// Cache the results
put_results_in_cache(
&query_cache_manager,
stream,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
query_request.query.clone(),
)
.await
{
log::error!("{}", err);
};
.await?;

// Create the response
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;
};

let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
Expand Down
2 changes: 1 addition & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Query {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct TableScanVisitor {
tables: Vec<String>,
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/querycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl QueryCacheManager {
}
}

fn generate_hash(start: &str, end: &str, query: &str) -> u64 {
pub fn generate_hash(start: &str, end: &str, query: &str) -> u64 {
let mut hasher = DefaultHasher::new();
start.hash(&mut hasher);
end.hash(&mut hasher);
Expand Down
7 changes: 4 additions & 3 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use crate::{
record_batches_to_json,
},
};
use actix_web::{web, Responder};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tonic::{Response, Status};

#[derive(Clone)]
pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -37,7 +38,7 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records)?;
Expand All @@ -62,7 +63,7 @@ impl QueryResponse {
Value::Array(values)
};

Ok(web::Json(response))
Ok(HttpResponse::Ok().json(response))
}

pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
Expand Down
Loading