Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LB timeout in QueryServer #893

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ parseable
parseable_*
parseable-env-secret
cache

query-cache/
22 changes: 20 additions & 2 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub struct Cli {
pub flight_port: u16,

/// to query cached data
pub query_cache_path: Option<PathBuf>,
pub query_cache_path: PathBuf,

/// Size for local cache
pub query_cache_size: u64,
Expand All @@ -109,6 +109,8 @@ pub struct Cli {
pub max_disk_usage: f64,

pub ms_clarity_tag: Option<String>,

pub proxy_timeout: u64
}

impl Cli {
Expand Down Expand Up @@ -145,6 +147,7 @@ impl Cli {
pub const HOT_TIER_PATH: &'static str = "hot-tier-path";
pub const MAX_DISK_USAGE: &'static str = "max-disk-usage";
pub const MS_CLARITY_TAG: &'static str = "ms-clarity-tag";
pub const PROXY_TIMEOUT: &'static str = "proxy-timeout";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
Expand Down Expand Up @@ -219,6 +222,7 @@ impl Cli {
.long(Self::QUERY_CACHE)
.env("P_QUERY_CACHE_DIR")
.value_name("DIR")
.default_value("./query-cache")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
Expand Down Expand Up @@ -434,6 +438,16 @@ impl Cli {
.required(false)
.help("Tag for MS Clarity"),
)
.arg(
Arg::new(Self::PROXY_TIMEOUT)
.long(Self::PROXY_TIMEOUT)
.env("P_PROXY_TIMEOUT")
.value_name("NUMBER")
.required(false)
.default_value("60")
.value_parser(value_parser!(u64))
.help("Time to wait before responding to a query request.")
)
.group(
ArgGroup::new("oidc")
.args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
Expand All @@ -452,7 +466,6 @@ impl FromArgMatches for Cli {

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
Expand All @@ -479,6 +492,10 @@ impl FromArgMatches for Cli {
.get_one(Self::QUERY_CACHE_SIZE)
.cloned()
.expect("default value for query cache size");
self.query_cache_path = m
.get_one(Self::QUERY_CACHE)
.cloned()
.expect("default value foor query cache directory");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down Expand Up @@ -578,6 +595,7 @@ impl FromArgMatches for Cli {
.expect("default for max disk usage");

self.ms_clarity_tag = m.get_one::<String>(Self::MS_CLARITY_TAG).cloned();
self.proxy_timeout = m.get_one::<u64>(Self::PROXY_TIMEOUT).cloned().expect("Please specify a default timeout for query requests.");

Ok(())
}
Expand Down
64 changes: 32 additions & 32 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
use crate::handlers::livetail::cross_origin_config;

use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,

Check warning on line 44 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `put_results_in_cache`

Check failure on line 44 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `put_results_in_cache`
};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;

Check warning on line 47 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `crate::querycache::QueryCacheManager`

Check failure on line 47 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `crate::querycache::QueryCacheManager`
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand All @@ -63,7 +63,7 @@
use crate::rbac;
use crate::rbac::Users;

use super::http::query::get_results_from_cache;

Check warning on line 66 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `super::http::query::get_results_from_cache`

Check failure on line 66 in server/src/handlers/airplane.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `super::http::query::get_results_from_cache`

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}
Expand Down Expand Up @@ -155,9 +155,9 @@

let streams = visitor.into_inner();

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);
// let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
// .await
// .unwrap_or(None);

let cache_results = req
.metadata()
Expand All @@ -179,21 +179,21 @@
.to_owned();

// send the cached results
if let Ok(cache_results) = get_results_from_cache(
show_cached,
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&ticket.query,
ticket.send_null,
ticket.fields,
)
.await
{
return cache_results.into_flight();
}
// if let Ok(cache_results) = get_results_from_cache(
// show_cached,
// query_cache_manager,
// &stream_name,
// user_id,
// &ticket.start_time,
// &ticket.end_time,
// &ticket.query,
// ticket.send_null,
// ticket.fields,
// )
// .await
// {
// return cache_results.into_flight();
// }

update_schema_when_distributed(streams)
.await
Expand Down Expand Up @@ -257,20 +257,20 @@
.await
.map_err(|err| Status::internal(err.to_string()))?;

if let Err(err) = put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
ticket.query,
)
.await
{
log::error!("{}", err);
};
// if let Err(err) = put_results_in_cache(
// cache_results,
// user_id,
// query_cache_manager,
// &stream_name,
// &records,
// query.start.to_rfc3339(),
// query.end.to_rfc3339(),
// ticket.query,
// )
// .await
// {
// log::error!("{}", err);
// };

/*
* INFO: No returning the schema with the data.
Expand Down
75 changes: 40 additions & 35 deletions server/src/handlers/http/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,61 @@
*
*/

use actix_web::{web, HttpRequest, HttpResponse, Responder};

Check warning on line 19 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `web`

Check failure on line 19 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `web`
use anyhow::anyhow;
use bytes::Bytes;
use http::StatusCode;

Check warning on line 22 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `http::StatusCode`

Check failure on line 22 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `http::StatusCode`
use serde_json::json;

Check warning on line 23 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `serde_json::json`

Check failure on line 23 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `serde_json::json`

use crate::{
option::CONFIG,

Check warning on line 26 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused imports: `QueryCacheManager` and `option::CONFIG`

Check failure on line 26 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused imports: `QueryCacheManager` and `option::CONFIG`
querycache::{CacheMetadata, QueryCacheManager},
};

use super::ingest::PostError;

pub async fn list(req: HttpRequest) -> Result<impl Responder, PostError> {
let stream = req

Check warning on line 33 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused variable: `stream`

Check failure on line 33 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused variable: `stream`
.match_info()
.get("stream")
.ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?;

let user_id = req

Check failure on line 38 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused variable: `user_id`
.match_info()
.get("user_id")
.ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?;

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

if let Some(query_cache_manager) = query_cache_manager {
let cache = query_cache_manager
.get_cache(stream, user_id)
.await
.map_err(PostError::CacheError)?;

let size = cache.used_cache_size();
let queries = cache.queries();

let out = json!({
"used_capacity": size,
"cache": queries
});

Ok((web::Json(out), StatusCode::OK))
} else {
Err(PostError::Invalid(anyhow!(
"Query Caching is not active on server "
)))
}
// let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
// .await
// .unwrap_or(None);

// if let Some(query_cache_manager) = query_cache_manager {
// let cache = query_cache_manager
// .get_cache(stream, user_id)
// .await
// .map_err(PostError::CacheError)?;

// let size = cache.used_cache_size();
// let queries = cache.queries();

// let out = json!({
// "used_capacity": size,
// "cache": queries
// });

// Ok((web::Json(out), StatusCode::OK))
// } else {
// Err(PostError::Invalid(anyhow!(
// "Query Caching is not active on server "
// )))
// }

Ok(HttpResponse::Ok().finish())

}

pub async fn remove(req: HttpRequest, body: Bytes) -> Result<impl Responder, PostError> {
let stream = req

Check failure on line 73 in server/src/handlers/http/cache.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused variable: `stream`
.match_info()
.get("stream")
.ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?;
Expand All @@ -79,17 +82,19 @@

let query = serde_json::from_slice::<CacheMetadata>(&body)?;

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);
// let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
// .await
// .unwrap_or(None);

// if let Some(query_cache_manager) = query_cache_manager {
// query_cache_manager
// .remove_from_cache(query, stream, user_id)
// .await?;

if let Some(query_cache_manager) = query_cache_manager {
query_cache_manager
.remove_from_cache(query, stream, user_id)
.await?;
// Ok(HttpResponse::Ok().finish())
// } else {
// Err(PostError::Invalid(anyhow!("Query Caching is not enabled")))
// }

Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Invalid(anyhow!("Query Caching is not enabled")))
}
Ok(HttpResponse::Ok().finish())
}
16 changes: 13 additions & 3 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
use crate::handlers::http::{base_path, cross_origin_config, query, API_BASE_PATH, API_VERSION};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use crate::{analytics, banner, metrics, migration, rbac, storage};
use actix_web::web;
use actix_web::web::{self, Data};
use actix_web::web::ServiceConfig;
use actix_web::{App, HttpServer};
use async_trait::async_trait;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crate::option::CONFIG;

Expand Down Expand Up @@ -66,16 +69,23 @@ impl ParseableServer for QueryServer {
)?;

let create_app_fn = move || {
let query_set: query::QueryMap = Arc::new(Mutex::new(HashMap::new()));

App::new()
.app_data(Data::new(query_set))
.wrap(prometheus.clone())
.configure(|config| QueryServer::configure_routes(config, oidc_client.clone()))
.wrap(actix_web::middleware::Logger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};

let keep_alive_timeout = 120;

// concurrent workers equal to number of cores on the cpu
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.keep_alive(Duration::from_secs(keep_alive_timeout));
if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
Expand Down
12 changes: 10 additions & 2 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ use crate::storage;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use actix_web::web::resource;
use actix_web::web::{resource, Data};
use actix_web::Resource;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use actix_web_static_files::ResourceFiles;
use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::{
handlers::http::{
Expand Down Expand Up @@ -83,7 +86,10 @@ impl ParseableServer for Server {
};

let create_app_fn = move || {
let query_set: query::QueryMap = Arc::new(Mutex::new(HashMap::new()));

App::new()
.app_data(Data::new(query_set))
.wrap(prometheus.clone())
.configure(|cfg| Server::configure_routes(cfg, oidc_client.clone()))
.wrap(actix_web::middleware::Logger::default())
Expand All @@ -96,8 +102,10 @@ impl ParseableServer for Server {
&CONFIG.parseable.tls_key_path,
)?;

let keep_alive_timeout = 120;

// concurrent workers equal to number of cores on the cpu
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get()).keep_alive(Duration::from_secs(keep_alive_timeout));
if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
Expand Down
Loading
Loading