diff --git a/src/analytics.rs b/src/analytics.rs index 42af72d36..c8c3e8038 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -41,7 +41,7 @@ use crate::{ }, option::Mode, parseable::PARSEABLE, - stats::{self, Stats}, + stats::{FullStats, Stats}, storage, HTTP_CLIENT, }; @@ -170,7 +170,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) { let mut deleted_json_bytes: u64 = 0; for stream in PARSEABLE.streams.list() { - let Some(stats) = stats::get_current_stats(&stream, "json") else { + let Some(stats) = FullStats::get_current(&stream, "json") else { continue; }; total_events += stats.lifetime_stats.events; diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index d44ac877b..3b0c8617e 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -33,7 +33,7 @@ use crate::{ option::Mode, parseable::PARSEABLE, query::PartialTimeFilter, - stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats}, + stats::{event_labels_date, storage_size_labels_date, update_deleted_stats, FullStats}, storage::{ object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat, }, @@ -181,7 +181,7 @@ pub async fn update_snapshot( if let Some(mut manifest) = storage.get_manifest(&path).await? { manifest.apply_change(change); storage.put_manifest(&path, manifest).await?; - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { meta.stats = stats; } @@ -307,7 +307,7 @@ async fn create_manifest( }; manifests.push(new_snapshot_entry); meta.snapshot.manifest_list = manifests; - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { meta.stats = stats; } diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 05d56e028..bbf9cee3e 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -42,7 +42,6 @@ use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; -use crate::stats::Stats; use crate::storage::{ ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, }; @@ -369,33 +368,6 @@ pub async fn sync_role_update_with_ingestors( Ok(()) } -pub fn fetch_daily_stats_from_ingestors( - date: &str, - stream_meta_list: &[ObjectStoreFormat], -) -> Result { - // for the given date, get the stats from the ingestors - let mut events_ingested = 0; - let mut ingestion_size = 0; - let mut storage_size = 0; - - for meta in stream_meta_list.iter() { - for manifest in meta.snapshot.manifest_list.iter() { - if manifest.time_lower_bound.date_naive().to_string() == date { - events_ingested += manifest.events_ingested; - ingestion_size += manifest.ingestion_size; - storage_size += manifest.storage_size; - } - } - } - - let stats = Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }; - Ok(stats) -} - /// get the cumulative stats from all ingestors pub async fn fetch_stats_from_ingestors( stream_name: &str, diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index b88e95bb1..d91902b58 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -45,6 +45,46 @@ impl QueriedStats { storage, } } + + pub fn merge(stats: Vec) -> Self { + // get the stream name + let stream_name = stats[1].stream.clone(); + + let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); + + let cumulative_ingestion = + stats + .iter() + .map(|x| &x.ingestion) + .fold(IngestionStats::default(), |acc, x| IngestionStats { + count: acc.count + x.count, + + size: acc.size + x.size, + format: x.format.clone(), + lifetime_count: acc.lifetime_count + x.lifetime_count, + lifetime_size: acc.lifetime_size + x.lifetime_size, + deleted_count: acc.deleted_count + x.deleted_count, + deleted_size: acc.deleted_size + x.deleted_size, + }); + + let cumulative_storage = + stats + .iter() + .map(|x| &x.storage) + .fold(StorageStats::default(), |acc, x| StorageStats { + size: acc.size + x.size, + format: x.format.clone(), + lifetime_size: acc.lifetime_size + x.lifetime_size, + deleted_size: acc.deleted_size + x.deleted_size, + }); + + QueriedStats::new( + &stream_name, + min_time, + cumulative_ingestion, + cumulative_storage, + ) + } } #[derive(Debug, Default, Serialize, Deserialize)] @@ -129,46 +169,6 @@ impl StorageStats { } } -pub fn merge_quried_stats(stats: Vec) -> QueriedStats { - // get the stream name - let stream_name = stats[1].stream.clone(); - - let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now); - - let cumulative_ingestion = - stats - .iter() - .map(|x| &x.ingestion) - .fold(IngestionStats::default(), |acc, x| IngestionStats { - count: acc.count + x.count, - - size: acc.size + x.size, - format: x.format.clone(), - lifetime_count: acc.lifetime_count + x.lifetime_count, - lifetime_size: acc.lifetime_size + x.lifetime_size, - deleted_count: acc.deleted_count + x.deleted_count, - deleted_size: acc.deleted_size + x.deleted_size, - }); - - let cumulative_storage = - stats - .iter() - .map(|x| &x.storage) - .fold(StorageStats::default(), |acc, x| StorageStats { - size: acc.size + x.size, - format: x.format.clone(), - lifetime_size: acc.lifetime_size + x.lifetime_size, - deleted_size: acc.deleted_size + x.deleted_size, - }); - - QueriedStats::new( - &stream_name, - min_time, - cumulative_ingestion, - cumulative_storage, - ) -} - pub async fn check_liveness(domain_name: &str) -> bool { let uri = match Url::parse(&format!( "{}{}/liveness", diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a3a2096ae..51bc071ce 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -16,33 +16,33 @@ * */ -use self::error::StreamError; -use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; -use super::query::update_schema_when_distributed; +use actix_web::http::StatusCode; +use actix_web::web::{Json, Path, Query}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use arrow_json::reader::infer_json_schema_from_iterator; +use bytes::Bytes; +use chrono::Utc; +use error::StreamError; +use itertools::Itertools; +use serde_json::{json, Value}; +use std::fs; +use std::sync::Arc; +use tracing::warn; + use crate::event::format::override_data_type; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::SchemaVersion; -use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::role::Action; use crate::rbac::Users; -use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; +use crate::stats::{FullStats, Stats, StatsParams}; use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; use crate::{stats, validator, LOCK_EXPECT}; -use actix_web::http::StatusCode; -use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, Responder}; -use arrow_json::reader::infer_json_schema_from_iterator; -use bytes::Bytes; -use chrono::Utc; -use itertools::Itertools; -use serde_json::{json, Value}; -use std::fs; -use std::sync::Arc; -use tracing::warn; +use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; +use super::query::update_schema_when_distributed; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -198,34 +198,10 @@ pub async fn put_retention( )) } -pub async fn get_stats_date(stream_name: &str, date: &str) -> Result { - let event_labels = event_labels_date(stream_name, "json", date); - let storage_size_labels = storage_size_labels_date(stream_name, date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - - let stats = Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }; - Ok(stats) -} - pub async fn get_stats( - req: HttpRequest, stream_name: Path, -) -> Result { + Query(params): Query, +) -> Result { let stream_name = stream_name.into_inner(); // For query mode, if the stream not found in memory map, @@ -235,30 +211,14 @@ pub async fn get_stats( return Err(StreamNotFound(stream_name.clone()).into()); } - let query_string = req.query_string(); - if !query_string.is_empty() { - let tokens = query_string.split('=').collect::>(); - let date_key = tokens[0]; - let date_value = tokens[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !date_value.is_empty() { - let stats = get_stats_date(&stream_name, date_value).await?; - let stats = serde_json::to_value(stats)?; - return Ok((web::Json(stats), StatusCode::OK)); - } + if let Some(date) = params.date { + let stats = Stats::for_stream_on_date(date, &stream_name); + return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } - let stats = stats::get_current_stats(&stream_name, "json") + let stats = FullStats::get_current(&stream_name, "json") .ok_or_else(|| StreamNotFound(stream_name.clone()))?; - let time = Utc::now(); - let stats = { let ingestion_stats = IngestionStats::new( stats.current_stats.events, @@ -279,9 +239,7 @@ pub async fn get_stats( QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) }; - let stats = serde_json::to_value(stats)?; - - Ok((web::Json(stats), StatusCode::OK)) + Ok(HttpResponse::build(StatusCode::OK).json(stats)) } pub async fn get_stream_info(stream_name: Path) -> Result { @@ -572,26 +530,9 @@ pub mod error { #[cfg(test)] mod tests { - use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; - // TODO: Fix this test with routes - // #[actix_web::test] - // #[should_panic] - // async fn get_stats_panics_without_logstream() { - // let req = TestRequest::default().to_http_request(); - // let _ = get_stats(req).await; - // } - - // #[actix_web::test] - // async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - // let req = TestRequest::default().to_http_request(); - - // match get_stats(req, web::Path::from("test".to_string())).await { - // Err(StreamError::StreamNotFound(_)) => Ok(()), - // _ => bail!("expected StreamNotFound error"), - // } - // } + use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; #[actix_web::test] async fn header_without_log_source() { diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 76d282f76..61f51007d 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -15,18 +15,11 @@ * along with this program. If not, see . * */ - -use core::str; use std::fs; -use actix_web::{ - web::{self, Path}, - HttpRequest, Responder, -}; +use actix_web::{web::Path, HttpRequest, Responder}; use bytes::Bytes; -use chrono::Utc; use http::StatusCode; -use relative_path::RelativePathBuf; use tokio::sync::Mutex; use tracing::{error, warn}; @@ -35,17 +28,12 @@ static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ handlers::http::{ base_path_without_preceding_slash, - cluster::{ - self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_streams_with_ingestors, - utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, - }, - logstream::{error::StreamError, get_stats_date}, + cluster::{self, sync_streams_with_ingestors}, + logstream::error::StreamError, }, hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, - stats::{self, Stats}, - storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, + stats, }; pub async fn delete(stream_name: Path) -> Result { @@ -121,119 +109,3 @@ pub async fn put_stream( Ok(("Log stream created", StatusCode::OK)) } - -pub async fn get_stats( - req: HttpRequest, - stream_name: Path, -) -> Result { - let stream_name = stream_name.into_inner(); - // if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if !PARSEABLE.streams.contains(&stream_name) - && !PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - .unwrap_or(false) - { - return Err(StreamNotFound(stream_name.clone()).into()); - } - - let query_string = req.query_string(); - if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !date_value.is_empty() { - let querier_stats = get_stats_date(&stream_name, date_value).await?; - - // this function requires all the ingestor stream jsons - let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]); - let obs = PARSEABLE - .storage - .get_object_store() - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") - }), - ) - .await?; - - let mut ingestor_stream_jsons = Vec::new(); - for ob in obs { - let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) { - Ok(d) => d, - Err(e) => { - error!("Failed to parse stream metadata: {:?}", e); - continue; - } - }; - ingestor_stream_jsons.push(stream_metadata); - } - - let ingestor_stats = - fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?; - - let total_stats = Stats { - events: querier_stats.events + ingestor_stats.events, - ingestion: querier_stats.ingestion + ingestor_stats.ingestion, - storage: querier_stats.storage + ingestor_stats.storage, - }; - let stats = serde_json::to_value(total_stats)?; - - return Ok((web::Json(stats), StatusCode::OK)); - } - } - - let stats = stats::get_current_stats(&stream_name, "json") - .ok_or_else(|| StreamNotFound(stream_name.clone()))?; - - let ingestor_stats = if PARSEABLE - .get_stream(&stream_name) - .is_ok_and(|stream| stream.get_stream_type() == StreamType::UserDefined) - { - Some(fetch_stats_from_ingestors(&stream_name).await?) - } else { - None - }; - - let time = Utc::now(); - - let stats = { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - stats.current_stats.ingestion, - stats.lifetime_stats.events, - stats.lifetime_stats.ingestion, - stats.deleted_stats.events, - stats.deleted_stats.ingestion, - "json", - ); - let storage_stats = StorageStats::new( - stats.current_stats.storage, - stats.lifetime_stats.storage, - stats.deleted_stats.storage, - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - }; - - let stats = if let Some(mut ingestor_stats) = ingestor_stats { - ingestor_stats.push(stats); - merge_quried_stats(ingestor_stats) - } else { - stats - }; - - let stats = serde_json::to_value(stats)?; - - Ok((web::Json(stats), StatusCode::OK)) -} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 161bd88f5..831543606 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -291,7 +291,7 @@ impl QueryServer { // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route( web::get() - .to(querier_logstream::get_stats) + .to(logstream::get_stats) .authorize_for_stream(Action::GetStats), ), ) diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 95e03b8ce..a13e2caf8 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -29,13 +29,10 @@ use tracing::error; use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, - handlers::http::{ - cluster::fetch_daily_stats_from_ingestors, - logstream::{error::StreamError, get_stats_date}, - }, + handlers::http::logstream::error::StreamError, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, - stats::Stats, + stats::{DatedStats, Stats}, storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -47,14 +44,6 @@ struct StreamInfo { stats_summary: Stats, } -#[derive(Debug, Serialize, Default)] -struct DatedStats { - date: String, - events: u64, - ingestion_size: u64, - storage_size: u64, -} - #[derive(Debug, Serialize)] struct TitleAndId { title: String, @@ -155,8 +144,8 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result>, -) -> Result { - // collect stats for all the streams for the given date - let mut details = DatedStats { - date: date.clone(), - ..Default::default() - }; - - for (stream, meta) in stream_wise_meta { - let querier_stats = get_stats_date(&stream, &date).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; - // collect date-wise stats for all streams - details.events += querier_stats.events + ingestor_stats.events; - details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; - details.storage_size += querier_stats.storage + ingestor_stats.storage; - } - - Ok(details) -} - #[derive(Debug, thiserror::Error)] pub enum PrismHomeError { #[error("Error: {0}")] diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index b15e1907d..a04d5fa81 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -30,7 +30,7 @@ use crate::{ handlers::http::{ cluster::{ fetch_stats_from_ingestors, - utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + utils::{IngestionStats, QueriedStats, StorageStats}, }, logstream::error::StreamError, query::{into_query, update_schema_when_distributed, Query, QueryError}, @@ -39,7 +39,7 @@ use crate::{ parseable::{StreamNotFound, PARSEABLE}, query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION}, rbac::{map::SessionKey, role::Action, Users}, - stats, + stats::FullStats, storage::{retention::Retention, StreamInfo, StreamType}, utils::{ arrow::record_batches_to_json, @@ -103,7 +103,7 @@ async fn get_stream_schema_helper(stream_name: &str) -> Result, Stre } async fn get_stats(stream_name: &str) -> Result { - let stats = stats::get_current_stats(stream_name, "json") + let stats = FullStats::get_current(stream_name, "json") .ok_or_else(|| StreamNotFound(stream_name.to_owned()))?; let ingestor_stats = if PARSEABLE @@ -139,7 +139,7 @@ async fn get_stats(stream_name: &str) -> Result Option { + let events = EVENTS_INGESTED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = EVENTS_INGESTED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + + fn get_lifetime(event_labels: &[&str], storage_size_labels: &[&str]) -> Option { + let events = LIFETIME_EVENTS_INGESTED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = LIFETIME_EVENTS_INGESTED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = LIFETIME_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + + fn get_deleted(event_labels: &[&str], storage_size_labels: &[&str]) -> Option { + let events = EVENTS_DELETED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = EVENTS_DELETED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = DELETED_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + + pub fn for_stream_on_date(date: NaiveDate, stream_name: &str) -> Stats { + let date = date.to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + + Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + } + } + + pub fn fetch_from_ingestors( + date: NaiveDate, + stream_meta_list: &[ObjectStoreFormat], + ) -> Result { + // for the given date, get the stats from the ingestors + let mut events_ingested = 0; + let mut ingestion_size = 0; + let mut storage_size = 0; + + for meta in stream_meta_list.iter() { + for manifest in meta.snapshot.manifest_list.iter() { + if manifest.time_lower_bound.date_naive() == date { + events_ingested += manifest.events_ingested; + ingestion_size += manifest.ingestion_size; + storage_size += manifest.storage_size; + } + } + } + + let stats = Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }; + Ok(stats) + } +} + +#[derive(Debug, Serialize, Default)] +pub struct DatedStats { + pub date: NaiveDate, + pub events: u64, + pub ingestion_size: u64, + pub storage_size: u64, +} + +impl DatedStats { + pub fn for_all_streams( + date: NaiveDate, + stream_wise_meta: &HashMap>, + ) -> Result, PrismHomeError> { + // collect stats for all the streams for the given date + let mut details = DatedStats { + date, + ..Default::default() + }; + + for (stream, meta) in stream_wise_meta { + let querier_stats = Stats::for_stream_on_date(date, stream); + let ingestor_stats = Stats::fetch_from_ingestors(date, meta)?; + // collect date-wise stats for all streams + details.events += querier_stats.events + ingestor_stats.events; + details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; + details.storage_size += querier_stats.storage + ingestor_stats.storage; + } + + Ok(Some(details)) + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct FullStats { pub lifetime_stats: Stats, pub current_stats: Stats, pub deleted_stats: Stats, } -pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { - let event_labels = event_labels(stream_name, format); - let storage_size_labels = storage_size_labels(stream_name); +impl FullStats { + pub fn get_current(stream_name: &str, format: &'static str) -> Option { + let event_labels = event_labels(stream_name, format); + let storage_size_labels = storage_size_labels(stream_name); - let events_ingested = EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let storage_size = STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let events_deleted = EVENTS_DELETED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let events_deleted_size = EVENTS_DELETED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let deleted_events_storage_size = DELETED_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let lifetime_events_ingested = LIFETIME_EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_ingestion_size = LIFETIME_EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_events_storage_size = LIFETIME_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - - Some(FullStats { - lifetime_stats: Stats { - events: lifetime_events_ingested, - ingestion: lifetime_ingestion_size, - storage: lifetime_events_storage_size, - }, - current_stats: Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }, - deleted_stats: Stats { - events: events_deleted, - ingestion: events_deleted_size, - storage: deleted_events_storage_size, - }, - }) + Some(FullStats { + lifetime_stats: Stats::get_lifetime(&event_labels, &storage_size_labels)?, + current_stats: Stats::get_current(&event_labels, &storage_size_labels)?, + deleted_stats: Stats::get_deleted(&event_labels, &storage_size_labels)?, + }) + } } pub async fn update_deleted_stats( @@ -157,7 +263,7 @@ pub async fn update_deleted_stats( STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) .sub(storage_size); - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { if let Err(e) = storage.put_stats(stream_name, &stats).await { warn!("Error updating stats to objectstore due to error [{}]", e); @@ -220,3 +326,22 @@ pub fn event_labels_date<'a>( pub fn storage_size_labels_date<'a>(stream_name: &'a str, date: &'a str) -> [&'a str; 4] { ["data", stream_name, "parquet", date] } + +#[derive(Debug, Deserialize)] +pub struct StatsParams { + #[serde(deserialize_with = "deserialize_date")] + pub date: Option, +} + +pub fn deserialize_date<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let Some(s) = Option::::deserialize(deserializer)? else { + return Ok(None); + }; + + NaiveDate::parse_from_str(&s, "%Y-%m-%d") + .map(Some) + .map_err(serde::de::Error::custom) +}