diff --git a/server/src/cli.rs b/server/src/cli.rs index 6f4160738..d479ab9b6 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -109,6 +109,13 @@ pub struct Cli { pub max_disk_usage: f64, pub ms_clarity_tag: Option, + + // Trino vars + pub trino_endpoint: Option, + pub trino_username: Option, + pub trino_auth: Option, + pub trino_schema: Option, + pub trino_catalog: Option, } impl Cli { @@ -146,6 +153,13 @@ impl Cli { pub const MAX_DISK_USAGE: &'static str = "max-disk-usage"; pub const MS_CLARITY_TAG: &'static str = "ms-clarity-tag"; + // Trino specific env vars + pub const TRINO_ENDPOINT: &'static str = "p-trino-end-point"; + pub const TRINO_CATALOG_NAME: &'static str = "p-trino-catalog-name"; + pub const TRINO_USER_NAME: &'static str = "p-trino-user-name"; + pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; + pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -159,287 +173,321 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) - .arg( - Arg::new(Self::TLS_CERT) - .long(Self::TLS_CERT) - .env("P_TLS_CERT_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where certificate file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::TLS_KEY) - .long(Self::TLS_KEY) - .env("P_TLS_KEY_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where private key file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::ADDRESS) - .long(Self::ADDRESS) - .env("P_ADDR") - .value_name("ADDR:PORT") - .default_value("0.0.0.0:8000") - .value_parser(validation::socket_addr) - .help("Address and port for Parseable HTTP(s) server"), - ) - .arg( - Arg::new(Self::STAGING) - .long(Self::STAGING) - .env("P_STAGING_DIR") - .value_name("DIR") - .default_value("./staging") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used as landing point for incoming events") - .next_line_help(true), - ) .arg( - Arg::new(Self::CACHE) - .long(Self::CACHE) - .env("P_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) + Arg::new(Self::TRINO_ENDPOINT) + .long(Self::TRINO_ENDPOINT) + .env("P_TRINO_ENDPOINT") + .value_name("STRING") + .help("Address and port for Trino HTTP(s) server"), + ) .arg( - Arg::new(Self::CACHE_SIZE) - .long(Self::CACHE_SIZE) - .env("P_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - + Arg::new(Self::TRINO_CATALOG_NAME) + .long(Self::TRINO_CATALOG_NAME) + .env("P_TRINO_CATALOG_NAME") + .value_name("STRING") + .help("Name of the catalog to be queried (Translates to X-Trino-Catalog)"), + ) + .arg( + Arg::new(Self::TRINO_SCHEMA) + .long(Self::TRINO_SCHEMA) + .env("P_TRINO_SCHEMA") + .value_name("STRING") + .help("Name of schema to be queried (Translates to X-Trino-Schema)"), + ) + .arg( + Arg::new(Self::TRINO_USER_NAME) + .long(Self::TRINO_USER_NAME) + .env("P_TRINO_USER_NAME") + .value_name("STRING") + .help("Name of Trino user (Translates to X-Trino-User)"), + ) + .arg( + Arg::new(Self::TRINO_AUTHORIZATION) + .long(Self::TRINO_AUTHORIZATION) + .env("P_TRINO_AUTHORIZATION") + .value_name("STRING") + .help("Base 64 encoded in the format username:password"), + ) + .arg( + Arg::new(Self::TLS_CERT) + .long(Self::TLS_CERT) + .env("P_TLS_CERT_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where certificate file is located. Required to enable TLS"), + ) + .arg( + Arg::new(Self::TLS_KEY) + .long(Self::TLS_KEY) + .env("P_TLS_KEY_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where private key file is located. Required to enable TLS"), + ) + .arg( + Arg::new(Self::ADDRESS) + .long(Self::ADDRESS) + .env("P_ADDR") + .value_name("ADDR:PORT") + .default_value("0.0.0.0:8000") + .value_parser(validation::socket_addr) + .help("Address and port for Parseable HTTP(s) server"), + ) + .arg( + Arg::new(Self::STAGING) + .long(Self::STAGING) + .env("P_STAGING_DIR") + .value_name("DIR") + .default_value("./staging") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used as landing point for incoming events") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE) + .long(Self::CACHE) + .env("P_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE_SIZE) + .long(Self::CACHE_SIZE) + .env("P_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE) + .long(Self::QUERY_CACHE) + .env("P_QUERY_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE_SIZE) + .long(Self::QUERY_CACHE_SIZE) + .env("P_QUERY_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) + .arg( + Arg::new(Self::USERNAME) + .long(Self::USERNAME) + .env("P_USERNAME") + .value_name("STRING") + .required(true) + .help("Admin username to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::PASSWORD) + .long(Self::PASSWORD) + .env("P_PASSWORD") + .value_name("STRING") + .required(true) + .help("Admin password to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::CHECK_UPDATE) + .long(Self::CHECK_UPDATE) + .env("P_CHECK_UPDATE") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable checking for new Parseable release"), + ) + .arg( + Arg::new(Self::SEND_ANALYTICS) + .long(Self::SEND_ANALYTICS) + .env("P_SEND_ANONYMOUS_USAGE_DATA") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable anonymous telemetry data collection"), + ) + .arg( + Arg::new(Self::OPEN_AI_KEY) + .long(Self::OPEN_AI_KEY) + .env("P_OPENAI_API_KEY") + .value_name("STRING") + .required(false) + .help("OpenAI key to enable llm features"), + ) .arg( - Arg::new(Self::QUERY_CACHE) - .long(Self::QUERY_CACHE) - .env("P_QUERY_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) + Arg::new(Self::OPENID_CLIENT_ID) + .long(Self::OPENID_CLIENT_ID) + .env("P_OIDC_CLIENT_ID") + .value_name("STRING") + .required(false) + .help("Client id for OIDC provider"), + ) .arg( - Arg::new(Self::QUERY_CACHE_SIZE) - .long(Self::QUERY_CACHE_SIZE) - .env("P_QUERY_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( - Arg::new(Self::USERNAME) - .long(Self::USERNAME) - .env("P_USERNAME") - .value_name("STRING") - .required(true) - .help("Admin username to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::PASSWORD) - .long(Self::PASSWORD) - .env("P_PASSWORD") - .value_name("STRING") - .required(true) - .help("Admin password to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::CHECK_UPDATE) - .long(Self::CHECK_UPDATE) - .env("P_CHECK_UPDATE") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable checking for new Parseable release"), - ) - .arg( - Arg::new(Self::SEND_ANALYTICS) - .long(Self::SEND_ANALYTICS) - .env("P_SEND_ANONYMOUS_USAGE_DATA") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable anonymous telemetry data collection"), - ) - .arg( - Arg::new(Self::OPEN_AI_KEY) - .long(Self::OPEN_AI_KEY) - .env("P_OPENAI_API_KEY") - .value_name("STRING") - .required(false) - .help("OpenAI key to enable llm features"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_ID) - .long(Self::OPENID_CLIENT_ID) - .env("P_OIDC_CLIENT_ID") - .value_name("STRING") - .required(false) - .help("Client id for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_SECRET) - .long(Self::OPENID_CLIENT_SECRET) - .env("P_OIDC_CLIENT_SECRET") - .value_name("STRING") - .required(false) - .help("Client secret for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_ISSUER) - .long(Self::OPENID_ISSUER) - .env("P_OIDC_ISSUER") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("OIDC provider's host address"), - ) - .arg( - Arg::new(Self::DOMAIN_URI) - .long(Self::DOMAIN_URI) - .env("P_ORIGIN_URI") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("Parseable server global domain address"), - ) - .arg( - Arg::new(Self::GRPC_PORT) - .long(Self::GRPC_PORT) - .env("P_GRPC_PORT") - .value_name("PORT") - .default_value("8001") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for gRPC server"), - ) - .arg( - Arg::new(Self::FLIGHT_PORT) - .long(Self::FLIGHT_PORT) - .env("P_FLIGHT_PORT") - .value_name("PORT") - .default_value("8002") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for Arrow Flight Querying Engine"), - ) - .arg( - Arg::new(Self::CORS) - .long(Self::CORS) - .env("P_CORS") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable CORS, default disabled"), - ) - .arg( - Arg::new(Self::LIVETAIL_CAPACITY) - .long(Self::LIVETAIL_CAPACITY) - .env("P_LIVETAIL_CAPACITY") - .value_name("NUMBER") - .default_value("1000") - .required(false) - .value_parser(value_parser!(usize)) - .help("Number of rows in livetail channel"), - ) - .arg( - Arg::new(Self::QUERY_MEM_POOL_SIZE) - .long(Self::QUERY_MEM_POOL_SIZE) - .env("P_QUERY_MEMORY_LIMIT") - .value_name("Gib") - .required(false) - .value_parser(value_parser!(u8)) - .help("Set a fixed memory limit for query"), - ) - .arg( - Arg::new(Self::ROW_GROUP_SIZE) - .long(Self::ROW_GROUP_SIZE) - .env("P_PARQUET_ROW_GROUP_SIZE") - .value_name("NUMBER") - .required(false) - .default_value("16384") - .value_parser(value_parser!(usize)) - .help("Number of rows in a row group"), - ).arg( - Arg::new(Self::MODE) - .long(Self::MODE) - .env("P_MODE") - .value_name("STRING") - .required(false) - .default_value("all") - .value_parser([ - "query", - "ingest", - "all"]) - .help("Mode of operation"), - ) - .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) - .help("URL to connect to this specific ingestor. Default is the address of the server.") - ) - .arg( - Arg::new(Self::PARQUET_COMPRESSION_ALGO) - .long(Self::PARQUET_COMPRESSION_ALGO) - .env("P_PARQUET_COMPRESSION_ALGO") - .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") - .required(false) - .default_value("lz4") - .value_parser([ - "uncompressed", - "snappy", - "gzip", - "lzo", - "brotli", - "lz4", - "zstd"]) - .help("Parquet compression algorithm"), - ) - .arg( - Arg::new(Self::HOT_TIER_PATH) - .long(Self::HOT_TIER_PATH) - .env("P_HOT_TIER_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for hot tier data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MAX_DISK_USAGE) - .long(Self::MAX_DISK_USAGE) - .env("P_MAX_DISK_USAGE_PERCENT") - .value_name("percentage") - .default_value("80.0") - .value_parser(validation::validate_disk_usage) - .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MS_CLARITY_TAG) - .long(Self::MS_CLARITY_TAG) - .env("P_MS_CLARITY_TAG") - .value_name("STRING") - .required(false) - .help("Tag for MS Clarity"), - ) - .group( - ArgGroup::new("oidc") - .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .multiple(true) - ) + Arg::new(Self::OPENID_CLIENT_SECRET) + .long(Self::OPENID_CLIENT_SECRET) + .env("P_OIDC_CLIENT_SECRET") + .value_name("STRING") + .required(false) + .help("Client secret for OIDC provider"), + ) + .arg( + Arg::new(Self::OPENID_ISSUER) + .long(Self::OPENID_ISSUER) + .env("P_OIDC_ISSUER") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("OIDC provider's host address"), + ) + .arg( + Arg::new(Self::DOMAIN_URI) + .long(Self::DOMAIN_URI) + .env("P_ORIGIN_URI") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Parseable server global domain address"), + ) + .arg( + Arg::new(Self::GRPC_PORT) + .long(Self::GRPC_PORT) + .env("P_GRPC_PORT") + .value_name("PORT") + .default_value("8001") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for gRPC server"), + ) + .arg( + Arg::new(Self::FLIGHT_PORT) + .long(Self::FLIGHT_PORT) + .env("P_FLIGHT_PORT") + .value_name("PORT") + .default_value("8002") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for Arrow Flight Querying Engine"), + ) + .arg( + Arg::new(Self::CORS) + .long(Self::CORS) + .env("P_CORS") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable CORS, default disabled"), + ) + .arg( + Arg::new(Self::LIVETAIL_CAPACITY) + .long(Self::LIVETAIL_CAPACITY) + .env("P_LIVETAIL_CAPACITY") + .value_name("NUMBER") + .default_value("1000") + .required(false) + .value_parser(value_parser!(usize)) + .help("Number of rows in livetail channel"), + ) + .arg( + Arg::new(Self::QUERY_MEM_POOL_SIZE) + .long(Self::QUERY_MEM_POOL_SIZE) + .env("P_QUERY_MEMORY_LIMIT") + .value_name("Gib") + .required(false) + .value_parser(value_parser!(u8)) + .help("Set a fixed memory limit for query"), + ) + .arg( + Arg::new(Self::ROW_GROUP_SIZE) + .long(Self::ROW_GROUP_SIZE) + .env("P_PARQUET_ROW_GROUP_SIZE") + .value_name("NUMBER") + .required(false) + .default_value("16384") + .value_parser(value_parser!(usize)) + .help("Number of rows in a row group"), + ).arg( + Arg::new(Self::MODE) + .long(Self::MODE) + .env("P_MODE") + .value_name("STRING") + .required(false) + .default_value("all") + .value_parser([ + "query", + "ingest", + "all"]) + .help("Mode of operation"), + ) + .arg( + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") + .value_name("URL") + .required(false) + .help("URL to connect to this specific ingestor. Default is the address of the server.") + ) + .arg( + Arg::new(Self::PARQUET_COMPRESSION_ALGO) + .long(Self::PARQUET_COMPRESSION_ALGO) + .env("P_PARQUET_COMPRESSION_ALGO") + .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") + .required(false) + .default_value("lz4") + .value_parser([ + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "zstd"]) + .help("Parquet compression algorithm"), + ) + .arg( + Arg::new(Self::HOT_TIER_PATH) + .long(Self::HOT_TIER_PATH) + .env("P_HOT_TIER_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for hot tier data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::MAX_DISK_USAGE) + .long(Self::MAX_DISK_USAGE) + .env("P_MAX_DISK_USAGE_PERCENT") + .value_name("percentage") + .default_value("80.0") + .value_parser(validation::validate_disk_usage) + .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") + .next_line_help(true), + ) + .arg( + Arg::new(Self::MS_CLARITY_TAG) + .long(Self::MS_CLARITY_TAG) + .env("P_MS_CLARITY_TAG") + .value_name("STRING") + .required(false) + .help("Tag for MS Clarity"), + ) + .group( + ArgGroup::new("oidc") + .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .multiple(true) + ) } } @@ -451,6 +499,12 @@ impl FromArgMatches for Cli { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + self.trino_catalog = m.get_one::(Self::TRINO_CATALOG_NAME).cloned(); + self.trino_endpoint = m.get_one::(Self::TRINO_ENDPOINT).cloned(); + self.trino_auth = m.get_one::(Self::TRINO_AUTHORIZATION).cloned(); + self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); + self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 60f64fd63..a69847239 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -40,6 +40,11 @@ const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; const USER_COOKIE_NAME: &str = "username"; +//constants for trino +const TRINO_SCHEMA: &str = "x-trino-schema"; +const TRINO_CATALOG: &str = "x-trino-catalog"; +const TRINO_USER: &str = "x-trino-user"; + // constants for log Source values for known sources and formats const LOG_SOURCE_KINESIS: &str = "kinesis"; diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 24e8564e9..2a2279800 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -40,6 +40,7 @@ mod otel; pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; +pub(crate) mod trino; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; diff --git a/server/src/handlers/http/about.rs b/server/src/handlers/http/about.rs index 8e2d6ccf3..1b455d919 100644 --- a/server/src/handlers/http/about.rs +++ b/server/src/handlers/http/about.rs @@ -17,7 +17,6 @@ */ use actix_web::web::Json; -use human_size::SpecificSize; use serde_json::json; use crate::{ @@ -79,21 +78,6 @@ pub async fn about() -> Json { let is_oidc_active = CONFIG.parseable.openid.is_some(); let ui_version = option_env!("UI_VERSION").unwrap_or("development"); - let cache_details: String = if CONFIG.cache_dir().is_none() { - "Disabled".to_string() - } else { - let cache_dir: &Option = CONFIG.cache_dir(); - let cache_size: SpecificSize = - SpecificSize::new(CONFIG.cache_size() as f64, human_size::Byte) - .unwrap() - .into(); - format!( - "Enabled, Path: {} (Size: {})", - cache_dir.as_ref().unwrap().display(), - cache_size - ) - }; - let hot_tier_details: String = if CONFIG.hot_tier_dir().is_none() { "Disabled".to_string() } else { @@ -105,6 +89,16 @@ pub async fn about() -> Json { }; let ms_clarity_tag = &CONFIG.parseable.ms_clarity_tag; + let mut query_engine = "Parseable".to_string(); + if let (Some(_), Some(_), Some(_), Some(_)) = ( + CONFIG.parseable.trino_endpoint.as_ref(), + CONFIG.parseable.trino_catalog.as_ref(), + CONFIG.parseable.trino_schema.as_ref(), + CONFIG.parseable.trino_username.as_ref(), + ) { + // Trino is enabled + query_engine = "Trino".to_string(); + } Json(json!({ "version": current_version, @@ -119,7 +113,6 @@ pub async fn about() -> Json { "license": "AGPL-3.0-only", "mode": mode, "staging": staging, - "cache": cache_details, "hotTier": hot_tier_details, "grpcPort": grpc_port, "store": { @@ -128,7 +121,8 @@ pub async fn about() -> Json { }, "analytics": { "clarityTag": ms_clarity_tag - } + }, + "queryEngine": query_engine })) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 575b0748c..9861990de 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -154,6 +154,7 @@ impl QueryServer { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) + .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 67586be94..47a4f337d 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -24,6 +24,7 @@ use crate::handlers::http::base_path; use crate::handlers::http::cache; use crate::handlers::http::health_check; use crate::handlers::http::query; +use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::handlers::http::API_BASE_PATH; @@ -169,6 +170,7 @@ impl Server { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) + .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) @@ -187,6 +189,12 @@ impl Server { .service(Self::get_generated()); } + // get the trino factory + pub fn get_trino_factory() -> Resource { + web::resource("/trinoquery") + .route(web::post().to(trino::trino_query).authorize(Action::Query)) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 576f838fe..8fe9c8229 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -511,3 +511,9 @@ impl actix_web::ResponseError for QueryError { .body(self.to_string()) } } + +impl From for QueryError { + fn from(value: reqwest::Error) -> Self { + QueryError::Anyhow(anyhow::Error::msg(value.to_string())) + } +} diff --git a/server/src/handlers/http/trino.rs b/server/src/handlers/http/trino.rs new file mode 100644 index 000000000..893a8bc4c --- /dev/null +++ b/server/src/handlers/http/trino.rs @@ -0,0 +1,308 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashMap, future::Future, pin::Pin}; + +use actix_web::{ + web::{self, Json}, + FromRequest, HttpRequest, Responder, +}; +use http::HeaderMap; +use serde_json::Value; +use trino_response::QueryResponse; + +use crate::{ + handlers::{AUTHORIZATION_KEY, TRINO_CATALOG, TRINO_SCHEMA, TRINO_USER}, + option::CONFIG, +}; + +use super::query::QueryError; + +#[derive(Debug, serde::Deserialize, serde::Serialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct QueryResultsTrino { + pub id: String, + pub next_uri: Option, + pub stats: Value, + pub error: Option, + pub warnings: Option, + pub columns: Option, + pub data: Option, +} + +/// Query Request through http endpoint. +#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TrinoQuery { + pub query: String, + #[serde(skip)] + pub fields: bool, +} + +impl FromRequest for TrinoQuery { + type Error = actix_web::Error; + type Future = Pin>>>; + + fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { + let query = Json::::from_request(req, payload); + let params = web::Query::>::from_request(req, payload) + .into_inner() + .map(|x| x.0) + .unwrap_or_default(); + + let fut = async move { + let mut query = query.await?.into_inner(); + // format output json to include field names + query.fields = params.get("fields").cloned().unwrap_or(false); + + Ok(query) + }; + + Box::pin(fut) + } +} + +pub async fn trino_query( + _req: HttpRequest, + query_request: TrinoQuery, +) -> Result { + let sql = query_request.query; + + let (endpoint, catalog, schema, username) = + if let (Some(endpoint), Some(catalog), Some(schema), Some(username)) = ( + CONFIG.parseable.trino_endpoint.as_ref(), + CONFIG.parseable.trino_catalog.as_ref(), + CONFIG.parseable.trino_schema.as_ref(), + CONFIG.parseable.trino_username.as_ref(), + ) { + let endpoint = if endpoint.ends_with('/') { + &endpoint[0..endpoint.len() - 1] + } else { + endpoint + }; + ( + endpoint.to_string(), + catalog.to_string(), + schema.to_string(), + username.to_string(), + ) + } else { + return Err(QueryError::Anyhow(anyhow::Error::msg( + "Trino endpoint, catalog, schema, or username not set in config", + ))); + }; + let auth = &CONFIG.parseable.trino_auth; + + trino_init( + &sql, + query_request.fields, + &endpoint, + &catalog, + &schema, + &username, + auth, + ) + .await? + .to_http() +} + +pub async fn trino_get( + with_fields: bool, + query_results: QueryResultsTrino, +) -> Result { + // initial check for nextUri + if let Some(mut next_uri) = query_results.next_uri { + let mut records: Vec = Vec::new(); + let mut fields: Vec = Vec::new(); + + let client = reqwest::Client::new(); + + // loop will handle batches being sent by server + loop { + let res: QueryResultsTrino = client.get(next_uri.clone()).send().await?.json().await?; + + // check if columns and data present, collate + // if len of fields is not 0, then don't overwrite + if fields.is_empty() { + if let Some(columns) = res.columns { + columns.as_array().unwrap().iter().for_each(|row| { + let name = row + .as_object() + .unwrap() + .get("name") + .unwrap() + .as_str() + .unwrap() + .to_string(); + fields.push(name); + }); + } + } + + if let Some(data) = res.data { + if let Some(data) = data.as_array() { + data.iter().for_each(|d| records.push(d.to_owned())); + } + } + + // check if more data present + if res.next_uri.is_some() { + // more data to process + next_uri = res.next_uri.unwrap().to_string(); + } else { + // check if state is FINISHED or FAILED, then return + let state = res + .stats + .as_object() + .unwrap() + .get("state") + .unwrap() + .as_str() + .unwrap(); + + match state { + "FAILED" => { + // extract error + if res.error.is_some() { + let message = res + .error + .unwrap() + .as_object() + .unwrap() + .get("message") + .unwrap() + .to_string(); + return Err(QueryError::Anyhow(anyhow::Error::msg(message))); + } else { + return Err(QueryError::Anyhow(anyhow::Error::msg("FAILED"))); + } + } + "FINISHED" => { + // break + break; + } + _ => { + log::warn!("state '{state}' not covered"); + break; + } + } + } + } + + Ok(QueryResponse { + trino_records: Some(records), + fields, + with_fields, + }) + } else { + // initial check for nex_uri retuned None + // check for error messages + Err(QueryError::Anyhow(anyhow::Error::msg( + "Did not receive nexUri for initial QueryResults", + ))) + } +} + +#[allow(clippy::too_many_arguments)] +/// This is the entry point for a trino bound request +/// The first POST request will happen here and the subsequent GET requests will happen in `trino_get()` +pub async fn trino_init( + query: &str, + fields: bool, + endpoint: &str, + catalog: &str, + schema: &str, + user: &str, + auth: &Option, +) -> Result { + let mut headers = HeaderMap::new(); + headers.insert(TRINO_SCHEMA, schema.parse().unwrap()); + headers.insert(TRINO_CATALOG, catalog.parse().unwrap()); + headers.insert(TRINO_USER, user.parse().unwrap()); + + // add password if present + if let Some(auth) = auth { + headers.insert(AUTHORIZATION_KEY, format!("Basic {auth}").parse().unwrap()); + } + + let response: QueryResultsTrino = match reqwest::Client::new() + .post(format!("{endpoint}/v1/statement")) + .body(query.to_owned()) + .headers(headers) + .send() + .await + { + Ok(r) => r.json().await?, + Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), + }; + + trino_get(fields, response).await +} + +mod trino_response { + use actix_web::{web, Responder}; + use itertools::Itertools; + use serde_json::{json, Map, Value}; + + use crate::handlers::http::query::QueryError; + + pub struct QueryResponse { + pub trino_records: Option>, + pub fields: Vec, + pub with_fields: bool, + } + + impl QueryResponse { + pub fn to_http(&self) -> Result { + log::info!("{}", "Returning query results"); + let values = if let Some(trino_records) = self.trino_records.clone() { + // trino_records = Vec + let mut json_records: Vec> = Vec::new(); + for array in trino_records.into_iter() { + let mut m: Map = Map::new(); + for (key, val) in self + .fields + .clone() + .into_iter() + .zip(array.as_array().unwrap()) + { + m.insert(key, val.clone()); + } + json_records.push(m); + } + + json_records.into_iter().map(Value::Object).collect_vec() + } else { + return Err(QueryError::Anyhow(anyhow::Error::msg( + "QueryResponse made improperly", + ))); + }; + + let response = if self.with_fields { + json!({ + "fields": self.fields, + "records": values + }) + } else { + Value::Array(values) + }; + + Ok(web::Json(response)) + } + } +} diff --git a/server/src/option.rs b/server/src/option.rs index a5428db2f..00a699752 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -151,14 +151,6 @@ Cloud Native, log analytics platform for modern applications."#, &self.parseable.local_staging_path } - pub fn cache_size(&self) -> u64 { - self.parseable.local_cache_size - } - - pub fn cache_dir(&self) -> &Option { - &self.parseable.local_cache_path - } - pub fn hot_tier_dir(&self) -> &Option { &self.parseable.hot_tier_storage_path }