diff --git a/src/audit.rs b/src/audit/builder.rs similarity index 57% rename from src/audit.rs rename to src/audit/builder.rs index e62c53c0e..45719e087 100644 --- a/src/audit.rs +++ b/src/audit/builder.rs @@ -16,135 +16,18 @@ * */ -use std::{ - collections::HashMap, - fmt::{Debug, Display}, -}; +use std::fmt::Display; -use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT}; +use crate::{about::current, storage::StorageMetadata}; -use chrono::{DateTime, Utc}; -use once_cell::sync::Lazy; -use serde::Serialize; -use serde_json::{json, Value}; +use chrono::Utc; use tracing::error; - use ulid::Ulid; -use url::Url; - -static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); - -// AuditLogger handles sending audit logs to a remote logging system -pub struct AuditLogger { - log_endpoint: Url, -} - -impl AuditLogger { - /// Create an audit logger that can be used to capture and push - /// audit logs to the appropriate logging system over HTTP - pub fn new() -> Option { - // Try to construct the log endpoint URL by joining the base URL - // with the ingest path, This can fail if the URL is not valid, - // when the base URL is not set or the ingest path is not valid - let log_endpoint = match PARSEABLE - .options - .audit_logger - .as_ref()? - .join("/api/v1/ingest") - { - Ok(url) => url, - Err(err) => { - eprintln!("Couldn't setup audit logger: {err}"); - return None; - } - }; - - Some(AuditLogger { log_endpoint }) - } - - // Sends the audit log to the configured endpoint with proper authentication - async fn send_log(&self, json: Value) { - let mut req = HTTP_CLIENT - .post(self.log_endpoint.as_str()) - .json(&json) - .header("x-p-stream", "audit_log"); - - // Use basic auth if credentials are configured - if let Some(username) = PARSEABLE.options.audit_username.as_ref() { - req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref()) - } - - match req.send().await { - Ok(r) => { - if let Err(e) = r.error_for_status() { - error!("{e}") - } - } - Err(e) => error!("Failed to send audit event: {}", e), - } - } -} - -// Represents the version of the audit log format -#[non_exhaustive] -#[repr(u8)] -#[derive(Debug, Clone, Copy, Serialize, Default)] -pub enum AuditLogVersion { - // NOTE: default should be latest version - #[default] - V1 = 1, -} - -#[derive(Serialize, Default)] -pub struct AuditDetails { - pub version: AuditLogVersion, - pub id: Ulid, - pub generated_at: DateTime, -} - -#[derive(Serialize, Default)] -pub struct ServerDetails { - pub version: String, - pub deployment_id: Ulid, -} - -// Contains information about the actor (user) who performed the action -#[derive(Serialize, Default)] -pub struct ActorDetails { - pub remote_host: String, - pub user_agent: String, - pub username: String, - pub authorization_method: String, -} -// Contains details about the HTTP request that was made -#[derive(Serialize, Default)] -pub struct RequestDetails { - pub stream: String, - pub start_time: DateTime, - pub end_time: DateTime, - pub method: String, - pub path: String, - pub protocol: String, - pub headers: HashMap, -} - -/// Contains information about the response sent back to the client -#[derive(Default, Serialize)] -pub struct ResponseDetails { - pub status_code: u16, - pub error: Option, -} - -/// The main audit log structure that combines all audit information -#[derive(Serialize)] -pub struct AuditLog { - pub audit: AuditDetails, - pub parseable_server: ServerDetails, - pub actor: ActorDetails, - pub request: RequestDetails, - pub response: ResponseDetails, -} +use super::{ + ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails, + ServerDetails, AUDIT_LOG_TX, +}; /// Builder pattern implementation for constructing audit logs pub struct AuditLogBuilder { @@ -156,7 +39,7 @@ pub struct AuditLogBuilder { impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { - enabled: AUDIT_LOGGER.is_some(), + enabled: AUDIT_LOG_TX.get().is_some(), inner: AuditLog { audit: AuditDetails { version: AuditLogVersion::V1, @@ -287,10 +170,14 @@ impl AuditLogBuilder { audit_log.audit.generated_at = now; audit_log.request.end_time = now; - AUDIT_LOGGER - .as_ref() - .unwrap() - .send_log(json!(audit_log)) + // NOTE: we are fine with blocking here as user expects audit logs to be sent at all costs + if let Err(e) = AUDIT_LOG_TX + .get() + .expect("Audit logger not initialized") + .send(audit_log) .await + { + error!("Couldn't send to logger: {e}") + } } } diff --git a/src/audit/logger.rs b/src/audit/logger.rs new file mode 100644 index 000000000..b41e17e51 --- /dev/null +++ b/src/audit/logger.rs @@ -0,0 +1,235 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::fs::File; + +use tokio::{ + fs::OpenOptions, + io::AsyncWriteExt, + select, + sync::{mpsc::channel, oneshot}, + time::interval, +}; +use tracing::{error, info, warn}; +use url::Url; + +use crate::{parseable::PARSEABLE, HTTP_CLIENT}; + +use super::{AuditLog, AUDIT_LOG_TX}; + +// AuditLogger handles sending audit logs to a remote logging system +pub struct AuditLogger { + log_endpoint: Option, + batch: Vec, + + // NOTE: good until usize overflows + next_log_file_id: usize, + oldest_log_file_id: usize, +} + +impl Default for AuditLogger { + /// Create an audit logger that can be used to capture and push + /// audit logs to the appropriate logging system over HTTP + fn default() -> Self { + let mut logger = AuditLogger { + log_endpoint: None, + batch: Vec::with_capacity(PARSEABLE.options.audit_batch_size), + next_log_file_id: 0, + oldest_log_file_id: 0, + }; + + // Try to construct the log endpoint URL by joining the base URL + // with the ingest path, This can fail if the URL is not valid, + // when the base URL is not set or the ingest path is not valid + let Some(url) = PARSEABLE.options.audit_logger.as_ref() else { + return logger; + }; + + logger.log_endpoint = url + .join("/api/v1/ingest") + .inspect_err(|err| error!("Couldn't setup audit logger: {err}")) + .ok(); + + // Created directory for audit logs if it doesn't exist + std::fs::create_dir_all(&PARSEABLE.options.audit_log_dir) + .expect("Failed to create audit log directory"); + + // Figure out the latest and oldest log file in directory + let files = std::fs::read_dir(&PARSEABLE.options.audit_log_dir) + .expect("Failed to read audit log directory"); + let (oldest_log_file_id, latest_log_file_id) = + files.fold((usize::MAX, 0), |(oldest, latest), r| { + let file_name = r.unwrap().file_name(); + let Ok(file_id) = file_name + .to_str() + .expect("File name is not utf8") + .split('.') + .next() + .expect("File name is not valid") + .parse::() + .inspect_err(|e| warn!("Unexpected file in logs directory: {e}")) + else { + return (oldest, latest); + }; + (oldest.min(file_id), latest.max(file_id)) + }); + + logger.next_log_file_id = latest_log_file_id + 1; + if oldest_log_file_id != usize::MAX { + logger.oldest_log_file_id = oldest_log_file_id; + } + + logger + } +} + +impl AuditLogger { + /// Flushes audit logs to the remote logging system + async fn flush(&mut self) { + if self.batch.is_empty() { + return; + } + + // swap the old batch with a new empty one + let mut logs_to_send = Vec::with_capacity(PARSEABLE.options.audit_batch_size); + std::mem::swap(&mut self.batch, &mut logs_to_send); + + // send the logs to the remote logging system, if no backlog, else write to disk + if self.oldest_log_file_id >= self.next_log_file_id + && self.send_logs_to_remote(&logs_to_send).await.is_ok() + { + return; + } + + // write the logs to the next log file + let log_file_path = PARSEABLE + .options + .audit_log_dir + .join(format!("{}.json", self.next_log_file_id)); + let mut log_file = OpenOptions::new() + .create(true) + .truncate(true) + .open(log_file_path) + .await + .expect("Failed to open audit log file"); + let buf = serde_json::to_vec(&logs_to_send).expect("Failed to serialize audit logs"); + if let Err(e) = log_file.write_all(&buf).await { + error!("Failed to write audit logs to file: {e}"); + } + + // increment the next log file id + self.next_log_file_id += 1; + } + + /// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size + async fn insert(&mut self, log: AuditLog) { + self.batch.push(log); + + // Flush if batch size exceeds threshold + if self.batch.len() >= PARSEABLE.options.audit_batch_size { + self.flush().await + } + } + + /// Reads the oldest log file and sends it to the audit logging backend + async fn send_logs(&self) -> anyhow::Result<()> { + // if there are no logs to send, do nothing + if self.oldest_log_file_id >= self.next_log_file_id { + return Ok(()); + } + + // read the oldest log file + let oldest_file_path = PARSEABLE + .options + .audit_log_dir + .join(format!("{}.json", self.oldest_log_file_id)); + let mut oldest_file = File::open(&oldest_file_path)?; + let logs_to_send: Vec = serde_json::from_reader(&mut oldest_file)?; + self.send_logs_to_remote(&logs_to_send).await?; + + // Delete the oldest log file + std::fs::remove_file(oldest_file_path)?; + + Ok(()) + } + + async fn send_logs_to_remote(&self, logs: &Vec) -> anyhow::Result<()> { + // send the logs to the audit logging backend + let log_endpoint = self + .log_endpoint + .as_ref() + .expect("Audit logger was initialized!"); + let mut req = HTTP_CLIENT + .post(log_endpoint.as_str()) + .json(&logs) + .header("x-p-stream", "audit_log"); + + // Use basic auth if credentials are configured + if let Some(username) = PARSEABLE.options.audit_username.as_ref() { + req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref()) + } + + // Send batched logs to the audit logging backend + req.send().await?.error_for_status()?; + + Ok(()) + } + + /// Spawns a background task for periodic flushing of audit logs, if configured + pub async fn spawn_batcher(mut self, mut shutdown_rx: oneshot::Receiver<()>) { + if self.log_endpoint.is_none() { + return; + } + + // setup the audit log channel + let (audit_log_tx, mut audit_log_rx) = channel(0); + AUDIT_LOG_TX + .set(audit_log_tx) + .expect("Failed to set audit logger tx"); + + // spawn the batcher + tokio::spawn(async move { + let mut interval = interval(PARSEABLE.options.audit_flush_interval); + loop { + select! { + _ = interval.tick() => { + self.flush().await; + } + + Some(log) = audit_log_rx.recv() => { + self.insert(log).await; + } + + r = self.send_logs() => { + if let Err(e) = r { + error!("Failed to send logs: {e}"); + continue; + } + self.oldest_log_file_id += 1; + }, + + _ = &mut shutdown_rx => { + self.flush().await; + info!("Audit logger shutting down"); + break; + } + } + } + }); + } +} diff --git a/src/audit/mod.rs b/src/audit/mod.rs new file mode 100644 index 000000000..9665f205b --- /dev/null +++ b/src/audit/mod.rs @@ -0,0 +1,31 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +mod builder; +mod logger; +mod types; + +pub use builder::AuditLogBuilder; +pub use logger::AuditLogger; +pub use types::*; + +use once_cell::sync::OnceCell; +use tokio::sync::mpsc::Sender; + +// Shared audit logger instance to batch and send audit logs +static AUDIT_LOG_TX: OnceCell> = OnceCell::new(); diff --git a/src/audit/types.rs b/src/audit/types.rs new file mode 100644 index 000000000..097875436 --- /dev/null +++ b/src/audit/types.rs @@ -0,0 +1,83 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use ulid::Ulid; + +// Represents the version of the audit log format +#[non_exhaustive] +#[repr(u8)] +#[derive(Debug, Clone, Copy, Serialize, Default, Deserialize)] +pub enum AuditLogVersion { + // NOTE: default should be latest version + #[default] + V1 = 1, +} + +#[derive(Serialize, Default, Deserialize)] +pub struct AuditDetails { + pub version: AuditLogVersion, + pub id: Ulid, + pub generated_at: DateTime, +} + +#[derive(Serialize, Default, Deserialize)] +pub struct ServerDetails { + pub version: String, + pub deployment_id: Ulid, +} + +// Contains information about the actor (user) who performed the action +#[derive(Serialize, Default, Deserialize)] +pub struct ActorDetails { + pub remote_host: String, + pub user_agent: String, + pub username: String, + pub authorization_method: String, +} + +// Contains details about the HTTP request that was made +#[derive(Serialize, Default, Deserialize)] +pub struct RequestDetails { + pub stream: String, + pub start_time: DateTime, + pub end_time: DateTime, + pub method: String, + pub path: String, + pub protocol: String, + pub headers: HashMap, +} + +/// Contains information about the response sent back to the client +#[derive(Default, Serialize, Deserialize)] +pub struct ResponseDetails { + pub status_code: u16, + pub error: Option, +} + +/// The main audit log structure that combines all audit information +#[derive(Serialize, Deserialize)] +pub struct AuditLog { + pub audit: AuditDetails, + pub parseable_server: ServerDetails, + pub actor: ActorDetails, + pub request: RequestDetails, + pub response: ResponseDetails, +} diff --git a/src/cli.rs b/src/cli.rs index ad2db244d..7549d3801 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -15,9 +15,8 @@ * along with this program. If not, see . * */ - use clap::Parser; -use std::{env, fs, path::PathBuf}; +use std::{env, fs, path::PathBuf, time::Duration}; use url::Url; @@ -346,6 +345,30 @@ pub struct Options { #[arg(long, env = "P_AUDIT_PASSWORD", help = "Audit logger password")] pub audit_password: Option, + #[arg( + long, + env = "P_AUDIT_BATCH_SIZE", + default_value = "100", + help = "Audit log batch size" + )] + pub audit_batch_size: usize, + + #[arg( + long, + env = "P_AUDIT_FLUSH_INTERVAL", + value_parser = validation::duration, + help = "Interval to flush into persistence" + )] + pub audit_flush_interval: Duration, + + #[arg( + long, + env = "P_AUDIT_LOG_DIR", + default_value = "./auditlogs", + help = "Path for audit log persistence" + )] + pub audit_log_dir: PathBuf, + #[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")] pub ms_clarity_tag: Option, } diff --git a/src/lib.rs b/src/lib.rs index 27809aab0..2276e1e2d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,7 @@ mod validator; use std::time::Duration; +pub use audit::AuditLogger; pub use handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; diff --git a/src/main.rs b/src/main.rs index 5894b304f..fa19ad6b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use std::process::exit; #[cfg(feature = "kafka")] use parseable::connectors; use parseable::{ - banner, metrics, option::Mode, parseable::PARSEABLE, rbac, storage, IngestServer, + banner, metrics, option::Mode, parseable::PARSEABLE, rbac, storage, AuditLogger, IngestServer, ParseableServer, QueryServer, Server, }; use tokio::signal::ctrl_c; @@ -33,7 +33,14 @@ use tracing_subscriber::{fmt, EnvFilter, Registry}; #[actix_web::main] async fn main() -> anyhow::Result<()> { - init_logger(); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .init(); + + // spawn audit log batcher + let (logger_shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + AuditLogger::default().spawn_batcher(shutdown_rx).await; // these are empty ptrs so mem footprint should be minimal let server: Box = match &PARSEABLE.options.mode { @@ -56,13 +63,14 @@ async fn main() -> anyhow::Result<()> { metadata.set_global(); // Spawn a task to trigger graceful shutdown on appropriate signal - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let (server_shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); tokio::spawn(async move { block_until_shutdown_signal().await; // Trigger graceful shutdown warn!("Received shutdown signal, notifying server to shut down..."); - shutdown_trigger.send(()).unwrap(); + server_shutdown_trigger.send(()).unwrap(); + logger_shutdown_trigger.send(()).unwrap(); }); let prometheus = metrics::build_metrics_handler(); diff --git a/src/option.rs b/src/option.rs index 0e659905b..d16536d00 100644 --- a/src/option.rs +++ b/src/option.rs @@ -76,6 +76,7 @@ pub mod validation { env, io, net::ToSocketAddrs, path::{Path, PathBuf}, + time::Duration, }; use path_clean::PathClean; @@ -134,6 +135,14 @@ pub mod validation { } } + pub fn duration(secs: &str) -> Result { + let Ok(secs) = secs.parse() else { + return Err("Couldn't pass as a number".to_string()); + }; + + Ok(Duration::from_secs(secs)) + } + pub fn compression(s: &str) -> Result { match s { "uncompressed" => Ok(Compression::Uncompressed),