Skip to content

refactor: get_stats merge and simplify #1150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a016af7
refactor: simplify query param extraction
Jan 30, 2025
28764cd
refactor: stats params in query API
Jan 31, 2025
3703343
DRY `get_stats`
Jan 31, 2025
dad51d2
semantic locality
Jan 31, 2025
3cb375e
Merge branch 'main' into query-param
Jan 31, 2025
150eb71
style imports
Jan 31, 2025
46fac83
Merge remote-tracking branch 'origin' into query-param
Feb 1, 2025
4eee17a
Merge remote-tracking branch 'origin/main' into query-param
Feb 4, 2025
ede0f45
fix: failure to deserialize `StatsParams`
Feb 4, 2025
14c79c5
Merge branch 'main' into query-param
Feb 4, 2025
5e62452
Merge branch 'main' into query-param
Feb 5, 2025
1138006
Merge branch 'main' into query-param
Feb 5, 2025
8b35a9f
Merge branch 'main' into query-param
Feb 6, 2025
82f04d6
Merge remote-tracking branch 'origin/main' into query-param
Feb 13, 2025
cc689b4
test makes no sense
Feb 13, 2025
3902bd2
style: less code == better
Feb 13, 2025
6340a8e
don't handle test path
Feb 13, 2025
92fba76
refactor: with less code
Feb 13, 2025
9f54e9e
coderabbit suggestions
Feb 13, 2025
5e7d8d1
refactor: date validation
Feb 13, 2025
2a3825a
Merge remote-tracking branch 'origin/main' into query-param
Feb 16, 2025
4b0c0dc
style: fmt
Feb 16, 2025
fba51f1
Merge remote-tracking branch 'origin/main' into query-param
Mar 6, 2025
43db1b6
fix: `DatedStats` merge
Mar 6, 2025
fe1bb95
Merge branch 'main' into query-param
Mar 11, 2025
0a07fbc
Merge remote-tracking branch 'origin/main' into query-param
Mar 24, 2025
f820090
refactor: separate constructors
Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
option::Mode,
parseable::PARSEABLE,
stats::{self, Stats},
stats::{FullStats, Stats},
storage, HTTP_CLIENT,
};

Expand Down Expand Up @@ -170,7 +170,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) {
let mut deleted_json_bytes: u64 = 0;

for stream in PARSEABLE.streams.list() {
let Some(stats) = stats::get_current_stats(&stream, "json") else {
let Some(stats) = FullStats::get_current(&stream, "json") else {
continue;
};
total_events += stats.lifetime_stats.events;
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
option::Mode,
parseable::PARSEABLE,
query::PartialTimeFilter,
stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats},
stats::{event_labels_date, storage_size_labels_date, update_deleted_stats, FullStats},
storage::{
object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat,
},
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn update_snapshot(
if let Some(mut manifest) = storage.get_manifest(&path).await? {
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
let stats = get_current_stats(stream_name, "json");
let stats = FullStats::get_current(stream_name, "json");
if let Some(stats) = stats {
meta.stats = stats;
}
Expand Down Expand Up @@ -307,7 +307,7 @@ async fn create_manifest(
};
manifests.push(new_snapshot_entry);
meta.snapshot.manifest_list = manifests;
let stats = get_current_stats(stream_name, "json");
let stats = FullStats::get_current(stream_name, "json");
if let Some(stats) = stats {
meta.stats = stats;
}
Expand Down
28 changes: 0 additions & 28 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use crate::metrics::prom_utils::Metrics;
use crate::parseable::PARSEABLE;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::{
ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY,
};
Expand Down Expand Up @@ -369,33 +368,6 @@ pub async fn sync_role_update_with_ingestors(
Ok(())
}

pub fn fetch_daily_stats_from_ingestors(
date: &str,
stream_meta_list: &[ObjectStoreFormat],
) -> Result<Stats, StreamError> {
// for the given date, get the stats from the ingestors
let mut events_ingested = 0;
let mut ingestion_size = 0;
let mut storage_size = 0;

for meta in stream_meta_list.iter() {
for manifest in meta.snapshot.manifest_list.iter() {
if manifest.time_lower_bound.date_naive().to_string() == date {
events_ingested += manifest.events_ingested;
ingestion_size += manifest.ingestion_size;
storage_size += manifest.storage_size;
}
}
}

let stats = Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
};
Ok(stats)
}

/// get the cumulative stats from all ingestors
pub async fn fetch_stats_from_ingestors(
stream_name: &str,
Expand Down
80 changes: 40 additions & 40 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,46 @@ impl QueriedStats {
storage,
}
}

pub fn merge(stats: Vec<Self>) -> Self {
// get the stream name
let stream_name = stats[1].stream.clone();

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

let cumulative_ingestion =
stats
.iter()
.map(|x| &x.ingestion)
.fold(IngestionStats::default(), |acc, x| IngestionStats {
count: acc.count + x.count,

size: acc.size + x.size,
format: x.format.clone(),
lifetime_count: acc.lifetime_count + x.lifetime_count,
lifetime_size: acc.lifetime_size + x.lifetime_size,
deleted_count: acc.deleted_count + x.deleted_count,
deleted_size: acc.deleted_size + x.deleted_size,
});

let cumulative_storage =
stats
.iter()
.map(|x| &x.storage)
.fold(StorageStats::default(), |acc, x| StorageStats {
size: acc.size + x.size,
format: x.format.clone(),
lifetime_size: acc.lifetime_size + x.lifetime_size,
deleted_size: acc.deleted_size + x.deleted_size,
});

QueriedStats::new(
&stream_name,
min_time,
cumulative_ingestion,
cumulative_storage,
)
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -129,46 +169,6 @@ impl StorageStats {
}
}

pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
// get the stream name
let stream_name = stats[1].stream.clone();

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

let cumulative_ingestion =
stats
.iter()
.map(|x| &x.ingestion)
.fold(IngestionStats::default(), |acc, x| IngestionStats {
count: acc.count + x.count,

size: acc.size + x.size,
format: x.format.clone(),
lifetime_count: acc.lifetime_count + x.lifetime_count,
lifetime_size: acc.lifetime_size + x.lifetime_size,
deleted_count: acc.deleted_count + x.deleted_count,
deleted_size: acc.deleted_size + x.deleted_size,
});

let cumulative_storage =
stats
.iter()
.map(|x| &x.storage)
.fold(StorageStats::default(), |acc, x| StorageStats {
size: acc.size + x.size,
format: x.format.clone(),
lifetime_size: acc.lifetime_size + x.lifetime_size,
deleted_size: acc.deleted_size + x.deleted_size,
});

QueriedStats::new(
&stream_name,
min_time,
cumulative_ingestion,
cumulative_storage,
)
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = match Url::parse(&format!(
"{}{}/liveness",
Expand Down
107 changes: 24 additions & 83 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,33 @@
*
*/

use self::error::StreamError;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;
use actix_web::http::StatusCode;
use actix_web::web::{Json, Path, Query};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use bytes::Bytes;
use chrono::Utc;
use error::StreamError;
use itertools::Itertools;
use serde_json::{json, Value};
use std::fs;
use std::sync::Arc;
use tracing::warn;

use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::stats::{FullStats, Stats, StatsParams};
use crate::storage::retention::Retention;
use crate::storage::{StreamInfo, StreamType};
use crate::utils::actix::extract_session_key_from_req;
use crate::{stats, validator, LOCK_EXPECT};

use actix_web::http::StatusCode;
use actix_web::web::{Json, Path};
use actix_web::{web, HttpRequest, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use bytes::Bytes;
use chrono::Utc;
use itertools::Itertools;
use serde_json::{json, Value};
use std::fs;
use std::sync::Arc;
use tracing::warn;
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
use super::query::update_schema_when_distributed;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
Expand Down Expand Up @@ -198,34 +198,10 @@ pub async fn put_retention(
))
}

pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, StreamError> {
let event_labels = event_labels_date(stream_name, "json", date);
let storage_size_labels = storage_size_labels_date(stream_name, date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

let stats = Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
};
Ok(stats)
}

pub async fn get_stats(
req: HttpRequest,
stream_name: Path<String>,
) -> Result<impl Responder, StreamError> {
Query(params): Query<StatsParams>,
) -> Result<HttpResponse, StreamError> {
let stream_name = stream_name.into_inner();

// For query mode, if the stream not found in memory map,
Expand All @@ -235,30 +211,14 @@ pub async fn get_stats(
return Err(StreamNotFound(stream_name.clone()).into());
}

let query_string = req.query_string();
if !query_string.is_empty() {
let tokens = query_string.split('=').collect::<Vec<&str>>();
let date_key = tokens[0];
let date_value = tokens[1];
if date_key != "date" {
return Err(StreamError::Custom {
msg: "Invalid query parameter".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;
return Ok((web::Json(stats), StatusCode::OK));
}
if let Some(date) = params.date {
let stats = Stats::for_stream_on_date(date, &stream_name);
return Ok(HttpResponse::build(StatusCode::OK).json(stats));
}

let stats = stats::get_current_stats(&stream_name, "json")
let stats = FullStats::get_current(&stream_name, "json")
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;

let time = Utc::now();

let stats = {
let ingestion_stats = IngestionStats::new(
stats.current_stats.events,
Expand All @@ -279,9 +239,7 @@ pub async fn get_stats(
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
};

let stats = serde_json::to_value(stats)?;

Ok((web::Json(stats), StatusCode::OK))
Ok(HttpResponse::build(StatusCode::OK).json(stats))
}

pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -572,26 +530,9 @@ pub mod error {

#[cfg(test)]
mod tests {
use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders;
use actix_web::test::TestRequest;

// TODO: Fix this test with routes
// #[actix_web::test]
// #[should_panic]
// async fn get_stats_panics_without_logstream() {
// let req = TestRequest::default().to_http_request();
// let _ = get_stats(req).await;
// }

// #[actix_web::test]
// async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> {
// let req = TestRequest::default().to_http_request();

// match get_stats(req, web::Path::from("test".to_string())).await {
// Err(StreamError::StreamNotFound(_)) => Ok(()),
// _ => bail!("expected StreamNotFound error"),
// }
// }
use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders;

#[actix_web::test]
async fn header_without_log_source() {
Expand Down
Loading
Loading