diff --git a/crates/analyzer/src/timeline/binned/resource.rs b/crates/analyzer/src/timeline/binned/resource.rs index 7cb86d63..cd4b84dd 100644 --- a/crates/analyzer/src/timeline/binned/resource.rs +++ b/crates/analyzer/src/timeline/binned/resource.rs @@ -6,7 +6,7 @@ use std::hash::Hash; use quent_time::{SpanNanoSec, TimeNanoSec, bin::BinnedSpan}; -use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; +use rustc_hash::FxHashMap as HashMap; use uuid::Uuid; use crate::{ @@ -25,24 +25,34 @@ fn convert_capacity( Ok(capacity_type.reinterpret_capacity_value(capacity_value.value.unwrap_or_default(), span)) } +/// An entity that exceeded the long-entities threshold on this resource, with +/// its longest usage span there. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct LongUsageEntity { + pub entity_id: Uuid, + pub longest_usage: TimeNanoSec, +} + #[derive(Clone, Debug)] pub struct ResourceTimeline<'a> { pub config: BinnedSpan, pub data: HashMap<&'a str, Vec>, - pub long_entities: Vec, + /// Long entities ordered longest-usage-first, or `None` when not requested. + pub long_usage_entities: Option>, } #[derive(Clone, Debug)] pub struct ResourceTimelineByKey<'a, K> { pub config: BinnedSpan, pub data: HashMap<(K, &'a str), Vec>, - pub long_entities: Vec, + /// Long entities ordered longest-usage-first, or `None` when not requested. + pub long_usage_entities: Option>, } pub struct ResourceTimelineBuilder<'a> { resource_type: &'a ResourceTypeDecl, aggregator: KeyedAggregator<&'a str>, - long_entities: HashSet, + long_entities: HashMap, long_entities_threshold: Option, } @@ -57,7 +67,7 @@ impl<'a> ResourceTimelineBuilder<'a> { Ok(Self { resource_type, aggregator, - long_entities: HashSet::default(), + long_entities: HashMap::default(), long_entities_threshold, }) } @@ -76,7 +86,11 @@ impl<'a> ResourceTimelineBuilder<'a> { && usage.span().duration() > threshold && usage.span().intersects(&self.aggregator.config.span) { - self.long_entities.insert(usage.entity_id()); + let duration = usage.span().duration(); + self.long_entities + .entry(usage.entity_id()) + .and_modify(|d| *d = (*d).max(duration)) + .or_insert(duration); } Ok(()) } @@ -95,7 +109,22 @@ impl<'a> ResourceTimelineBuilder<'a> { ResourceTimeline { config: self.aggregator.config, data: self.aggregator.finish(), - long_entities: self.long_entities.into_iter().collect(), + long_usage_entities: self.long_entities_threshold.is_some().then(|| { + let mut entities: Vec = self + .long_entities + .into_iter() + .map(|(entity_id, longest_usage)| LongUsageEntity { + entity_id, + longest_usage, + }) + .collect(); + entities.sort_by(|a, b| { + b.longest_usage + .cmp(&a.longest_usage) + .then_with(|| a.entity_id.cmp(&b.entity_id)) + }); + entities + }), } } } @@ -103,7 +132,7 @@ impl<'a> ResourceTimelineBuilder<'a> { pub struct ResourceTimelineByKeyBuilder<'a, K> { resource_type: &'a ResourceTypeDecl, aggregator: KeyedAggregator<(K, &'a str)>, - long_entities: HashSet, + long_entities: HashMap, long_entities_threshold: Option, } @@ -121,7 +150,7 @@ where Ok(Self { resource_type, aggregator, - long_entities: HashSet::default(), + long_entities: HashMap::default(), long_entities_threshold, }) } @@ -139,7 +168,11 @@ where && usage.span().duration() > threshold && usage.span().intersects(&self.aggregator.config.span) { - self.long_entities.insert(usage.entity_id()); + let duration = usage.span().duration(); + self.long_entities + .entry(usage.entity_id()) + .and_modify(|d| *d = (*d).max(duration)) + .or_insert(duration); } Ok(()) } @@ -158,7 +191,23 @@ where ResourceTimelineByKey { config: self.aggregator.config, data: self.aggregator.finish(), - long_entities: self.long_entities.into_iter().collect(), + long_usage_entities: self.long_entities_threshold.is_some().then(|| { + let mut entities: Vec = self + .long_entities + .into_iter() + .map(|(entity_id, longest_usage)| LongUsageEntity { + entity_id, + longest_usage, + }) + .collect(); + // Longest usage first; id tie-break for determinism. + entities.sort_by(|a, b| { + b.longest_usage + .cmp(&a.longest_usage) + .then_with(|| a.entity_id.cmp(&b.entity_id)) + }); + entities + }), } } } @@ -167,6 +216,8 @@ where mod tests { use std::num::NonZero; + use rustc_hash::FxHashSet as HashSet; + use crate::{ fsm::{ FsmUsages, @@ -778,7 +829,12 @@ mod tests { let mut outside_builder = ResourceTimelineBuilder::try_new(resource_type, config, Some(threshold)).unwrap(); outside_builder.try_extend(outside_fsms.usages()).unwrap(); - assert!(!outside_builder.build().long_entities.contains(&resource_id)); + assert!( + !outside_builder + .build() + .long_usage_entities + .is_some_and(|e| e.iter().any(|lu| lu.entity_id == resource_id)) + ); let mut inside_fsms = InMemoryFsms::::new(); inside_fsms.insert(make_fsm(500, 1500)); @@ -787,6 +843,11 @@ mod tests { let mut inside_builder = ResourceTimelineBuilder::try_new(resource_type, config, Some(threshold)).unwrap(); inside_builder.try_extend(inside_fsms.usages()).unwrap(); - assert!(inside_builder.build().long_entities.contains(&resource_id)); + assert!( + inside_builder + .build() + .long_usage_entities + .is_some_and(|e| e.iter().any(|lu| lu.entity_id == resource_id)) + ); } } diff --git a/crates/ui/src/timeline/request.rs b/crates/ui/src/timeline/request.rs index 89fe8745..f15f9aa9 100644 --- a/crates/ui/src/timeline/request.rs +++ b/crates/ui/src/timeline/request.rs @@ -48,13 +48,34 @@ pub struct EntityFilter { // TODO(johanpel): instance name } +/// Parameters of paginated APIs. +#[derive(TS, Debug, Clone, Serialize, Deserialize)] +pub struct PageParams { + /// Maximum number of items per page. + pub max: u32, + /// The zero-based page index requested. + pub page: u32, +} + +/// Parameters for requesting long entities in a resource timeline +#[derive(TS, Debug, Clone, Serialize, Deserialize)] +pub struct LongEntitiesParams { + /// Fully include entities that have usages exceeding this amount of time. + pub threshold_s: TimeSec, + /// Parameters of the page of long entities. If this is not set, return + /// all long entities. + pub page: Option, +} + /// Parameters for requesting a resource timeline. #[derive(TS, Debug, Clone, Serialize, Deserialize)] pub struct ResourceTimelineRequest { /// The ID of the resource pub resource_id: Uuid, - /// If set, fully include entities that have usages exceeding this amount of time. - pub long_entities_threshold_s: Option, + /// Parameters for dealing with long entities. + /// + /// If this is not set, then no long entities are returned. + pub long_entities: Option, /// Entity filters. pub entity_filter: EntityFilter, /// Application-specific request parameters, e.g. for filtering. @@ -71,9 +92,10 @@ pub struct ResourceGroupTimelineRequest { /// The type name of the leaf resources for which to produce the timeline /// for this group. pub resource_type_name: String, - /// If set, fully include entities that have usages exceeding this amount of - /// time in seconds. - pub long_entities_threshold_s: Option, + /// Parameters for dealing with long entities. + /// + /// If this is not set, then no long entities are returned. + pub long_entities: Option, /// Entity filters. pub entity_filter: EntityFilter, /// Application-specific request parameters, e.g. for filtering. diff --git a/crates/ui/src/timeline/response.rs b/crates/ui/src/timeline/response.rs index 3e5bddf5..ab6f559e 100644 --- a/crates/ui/src/timeline/response.rs +++ b/crates/ui/src/timeline/response.rs @@ -9,6 +9,18 @@ use ts_rs::TS; use crate::FiniteStateMachine; +/// Long entities in resource timelines. +#[derive(TS, Debug, Clone, Serialize)] +pub struct LongEntities { + /// FSMs that have usage spans exceeding the long entities threshold. + /// + /// This may be empty if no long entities exist or if they were not requested. + /// If pagination is requested, this will only hold one page. + pub long_fsms: Vec, + /// Total number of long FSMs, before pagination. + pub long_fsms_total: u32, +} + #[derive(TS, Debug, Clone, Serialize)] pub struct ResourceTimelineBinned { /// The configuration of the binned timeline. @@ -16,8 +28,8 @@ pub struct ResourceTimelineBinned { /// Maps a resource capacity name to a vector where each element holds an /// aggregated value of a time bin. pub capacities_values: HashMap>, - /// FSMs that have usage spans exceeding the long_entities_threshold. - pub long_fsms: Vec, + /// Entities with long usage durations of this resource, if requested. + pub long_entities: Option, } #[derive(TS, Debug, Clone, Serialize)] @@ -27,8 +39,8 @@ pub struct ResourceTimelineBinnedByState { /// Maps a resource capacity name to a map of a state name to a vector where /// each element holds an aggregated value of a time bin. pub capacities_states_values: HashMap>>, - /// FSMs that have usage spans exceeding the long_entities_threshold. - pub long_fsms: Vec, + /// Entities with long usage durations of this resource, if requested. + pub long_entities: Option, } #[derive(TS, Debug, Clone, Serialize)] diff --git a/domains/query_engine/server/src/timeline_cache.rs b/domains/query_engine/server/src/timeline_cache.rs index 259953b2..42b70e79 100644 --- a/domains/query_engine/server/src/timeline_cache.rs +++ b/domains/query_engine/server/src/timeline_cache.rs @@ -12,13 +12,14 @@ use moka::future::Cache; use quent_analyzer::Span; use quent_query_engine_analyzer::{QueryEngineModel, ui::UiAnalyzer}; use quent_time::{SpanNanoSec, TimeNanoSec, bin::BinnedSpan, to_nanosecs, to_secs_relative}; +use quent_ui::FiniteStateMachine; use quent_ui::timeline::{ request::{ - BulkChunkedTimelineRequest, BulkTimelineRequest, SingleTimelineRequest, TimelineConfig, - TimelineRequest, + BulkChunkedTimelineRequest, BulkTimelineRequest, PageParams, SingleTimelineRequest, + TimelineConfig, TimelineRequest, }, response::{ - BulkTimelinesResponse, BulkTimelinesResponseEntry, ResourceTimeline, + BulkTimelinesResponse, BulkTimelinesResponseEntry, LongEntities, ResourceTimeline, ResourceTimelineBinned, ResourceTimelineBinnedByState, SingleTimelineResponse, }, }; @@ -69,14 +70,20 @@ impl<'a, EntryParams> EntryParamsKey<'a, EntryParams> { match entry { TimelineRequest::Resource(r) => Self::Resource { resource_id: r.resource_id, - long_entities_threshold_s: r.long_entities_threshold_s.map(HashableF64::from), + long_entities_threshold_s: r + .long_entities + .as_ref() + .map(|le| HashableF64::from(le.threshold_s)), entity_type_name: r.entity_filter.entity_type_name.as_deref(), application: &r.application, }, TimelineRequest::ResourceGroup(rg) => Self::ResourceGroup { resource_group_id: rg.resource_group_id, resource_type_name: &rg.resource_type_name, - long_entities_threshold_s: rg.long_entities_threshold_s.map(HashableF64::from), + long_entities_threshold_s: rg + .long_entities + .as_ref() + .map(|le| HashableF64::from(le.threshold_s)), entity_type_name: rg.entity_filter.entity_type_name.as_deref(), app_params: &rg.app_params, }, @@ -151,8 +158,40 @@ impl TimelineCache { } } + /// Fetch bulk timelines, paginating each entry's long FSMs after merge. + pub(crate) async fn paginate_bulk_timeline( + &self, + analyzer: Arc, + engine_id: Uuid, + request: BulkTimelineRequest< + ::TimelineGlobalParams, + ::TimelineParams, + >, + ) -> ServerResult + where + A: UiAnalyzer + Send + Sync + 'static, + ::TimelineGlobalParams: Hash + Eq + Clone + Send + 'static, + ::TimelineParams: Hash + Eq + Clone + Send + 'static, + { + let pagination: HashMap> = request + .entries + .iter() + .map(|(k, entry)| (k.clone(), pagination_of(entry))) + .collect(); + let mut response = self + .cached_bulk_timeline(analyzer, engine_id, request) + .await?; + for (key, entry) in response.entries.iter_mut() { + if let BulkTimelinesResponseEntry::Ok { data, .. } = entry { + let page = pagination.get(key).cloned().flatten(); + paginate_long_fsms(data, page); + } + } + Ok(response) + } + /// Fetch bulk timelines, serving as many chunks from cache as possible. - pub(crate) async fn cached_bulk_timeline( + async fn cached_bulk_timeline( &self, analyzer: Arc, engine_id: Uuid, @@ -386,7 +425,30 @@ impl TimelineCache { Ok(()) } - pub(crate) async fn cached_single_timeline( + /// Fetch a single timeline, paginating its long FSMs after merge. + pub(crate) async fn paginate_single_timeline( + &self, + analyzer: Arc, + engine_id: Uuid, + request: SingleTimelineRequest< + ::TimelineGlobalParams, + ::TimelineParams, + >, + ) -> ServerResult + where + A: UiAnalyzer + Send + Sync + 'static, + ::TimelineGlobalParams: Hash + Eq + Clone + Send + 'static, + ::TimelineParams: Hash + Eq + Clone + Send + 'static, + { + let page = pagination_of(&request.entry); + let mut response = self + .cached_single_timeline(analyzer, engine_id, request) + .await?; + paginate_long_fsms(&mut response.data, page); + Ok(response) + } + + async fn cached_single_timeline( &self, analyzer: Arc, engine_id: Uuid, @@ -502,10 +564,15 @@ impl TimelineCache { if chunk_start_ns == req_span.start() && chunk_end_ns == req_span.end() { return Ok(chunk); } - return combine_chunks(&[chunk], req_span, epoch); + return combine_chunks(&[chunk], req_span, epoch, resource_of(&request.entry)); } - combine_chunks(&chunk_responses, req_span, epoch) + combine_chunks( + &chunk_responses, + req_span, + epoch, + resource_of(&request.entry), + ) } } @@ -622,7 +689,12 @@ fn assemble_bulk_response( Err(_) => continue, }; - let combined = combine_chunks(chunks, chunk_span, geometry.epoch)?; + let combined = combine_chunks( + chunks, + chunk_span, + geometry.epoch, + resource_of(&entries[key]), + )?; result_entries.insert( key.clone(), BulkTimelinesResponseEntry::Ok { @@ -638,10 +710,78 @@ fn assemble_bulk_response( }) } +/// Extract the long-entities page request from a timeline request entry. +/// +/// `None` means return all long entities (no paging); the threshold lives in +/// `long_entities` and is handled separately as part of the cache key. +fn pagination_of(entry: &TimelineRequest) -> Option { + let long_entities = match entry { + TimelineRequest::Resource(r) => r.long_entities.as_ref(), + TimelineRequest::ResourceGroup(rg) => rg.long_entities.as_ref(), + }; + long_entities.and_then(|le| le.page.clone()) +} + +/// The single resource a timeline targets, or `None` for a resource group +/// (whose leaf set isn't known to the cache layer). +fn resource_of(entry: &TimelineRequest) -> Option { + match entry { + TimelineRequest::Resource(r) => Some(r.resource_id), + TimelineRequest::ResourceGroup(_) => None, + } +} + +/// Retain only the requested page of a timeline's already-ordered long +/// entities, setting `long_fsms_total` to the full count first. +/// +/// Ordering is established upstream — by the builder, and re-established by +/// `combine_chunks` after its merge — so this only slices. `page` `None` or a +/// `max` of `0` keeps the whole set. +fn paginate_long_fsms(data: &mut ResourceTimeline, page: Option) { + let long_entities = match data { + ResourceTimeline::Binned(d) => d.long_entities.as_mut(), + ResourceTimeline::BinnedByState(d) => d.long_entities.as_mut(), + }; + let Some(le) = long_entities else { + return; + }; + + le.long_fsms_total = le.long_fsms.len() as u32; + + if let Some(PageParams { max, page }) = page + && max > 0 + { + let len = le.long_fsms.len(); + let start = ((page as u64) * max as u64).min(len as u64) as usize; + let keep = (len - start).min(max as usize); + le.long_fsms.drain(..start); + le.long_fsms.truncate(keep); + } +} + +/// Longest usage span (seconds) of an FSM on the timelined resource, derived +/// from its own transitions: the largest gap from a usage-bearing transition +/// to the next. `resource` is `Some(id)` for a single-resource timeline (only +/// usages on that resource count) and `None` for a resource-group timeline, +/// where membership can't be resolved here so all usages count (best effort). +/// Used to rank long entities after the chunk merge — derived, never carried. +fn long_entity_rank_secs(fsm: &FiniteStateMachine, resource: Option) -> f64 { + fsm.transitions + .windows(2) + .filter(|w| { + w[0].usages + .iter() + .any(|u| resource.is_none_or(|r| u.resource == r)) + }) + .map(|w| w[1].timestamp - w[0].timestamp) + .fold(0.0_f64, f64::max) +} + fn combine_chunks( chunks: &[SingleTimelineResponse], req_span: SpanNanoSec, epoch: TimeNanoSec, + resource: Option, ) -> ServerResult { let mut sorted: Vec<&SingleTimelineResponse> = chunks.iter().collect(); sorted.sort_by(|a, b| { @@ -654,20 +794,41 @@ fn combine_chunks( let is_binned_by_state = matches!(&sorted[0].data, ResourceTimeline::BinnedByState(_)); - // Collect long_fsms from all chunks, deduplicated by ID. + // Chunking concern: merge each chunk's long entities and dedup by FSM id. + // If every chunk reports `None` (not requested) the result stays `None`. let mut seen_fsm_ids = std::collections::HashSet::new(); - let mut long_fsms = Vec::new(); + let mut long_fsms: Vec = Vec::new(); + let mut requested = false; for chunk in &sorted { - let chunk_fsms = match &chunk.data { - ResourceTimeline::Binned(data) => &data.long_fsms, - ResourceTimeline::BinnedByState(data) => &data.long_fsms, + let chunk_le = match &chunk.data { + ResourceTimeline::Binned(data) => data.long_entities.as_ref(), + ResourceTimeline::BinnedByState(data) => data.long_entities.as_ref(), }; - for fsm in chunk_fsms { - if seen_fsm_ids.insert(fsm.id) { - long_fsms.push(fsm.clone()); + if let Some(le) = chunk_le { + requested = true; + for fsm in &le.long_fsms { + if seen_fsm_ids.insert(fsm.id) { + long_fsms.push(fsm.clone()); + } } } } + let long_entities = if requested { + // Chunking broke the per-chunk order; recover it by re-ranking the merged + // set on a rank derived from each FSM's transitions. + long_fsms.sort_by(|a, b| { + long_entity_rank_secs(b, resource) + .total_cmp(&long_entity_rank_secs(a, resource)) + .then_with(|| a.id.cmp(&b.id)) + }); + let long_fsms_total = long_fsms.len() as u32; + Some(LongEntities { + long_fsms, + long_fsms_total, + }) + } else { + None + }; if is_binned_by_state { let mut combined: std::collections::HashMap< @@ -718,7 +879,7 @@ fn combine_chunks( data: ResourceTimeline::BinnedByState(ResourceTimelineBinnedByState { config, capacities_states_values: combined, - long_fsms, + long_entities, }), }) } else { @@ -765,7 +926,7 @@ fn combine_chunks( data: ResourceTimeline::Binned(ResourceTimelineBinned { config, capacities_values: combined, - long_fsms, + long_entities, }), }) } @@ -827,14 +988,14 @@ mod tests { }; use quent_query_engine_model::engine::{EngineEvent, Exit, Init}; use quent_ui::{ - FiniteStateMachine, FsmTransition, + FsmTransition, timeline::{ request::{ - BulkTimelineRequest, EntityFilter, ResourceTimelineRequest, TimelineConfig, - TimelineRequest, + BulkTimelineRequest, EntityFilter, LongEntitiesParams, PageParams, + ResourceTimelineRequest, TimelineConfig, TimelineRequest, }, response::{ - BulkTimelinesResponse, BulkTimelinesResponseEntry, ResourceTimeline, + BulkTimelinesResponse, BulkTimelinesResponseEntry, LongEntities, ResourceTimeline, ResourceTimelineBinned, SingleTimelineResponse, }, }, @@ -843,6 +1004,93 @@ mod tests { use super::*; + fn fsm(id: u128) -> FiniteStateMachine { + FiniteStateMachine { + id: Uuid::from_u128(id), + type_name: "task".to_string(), + instance_name: format!("t{id}"), + transitions: vec![], + } + } + + fn ids(fsms: &[FiniteStateMachine]) -> Vec { + fsms.iter().map(|f| f.id.as_u128()).collect() + } + + // A binned timeline whose long entities are the given FSM ids, in order. + // Paginate only slices (ordering is the builder's/merge's job), so the ids + // are supplied already ranked; ranks are placeholders here. + fn binned_with_long_entities(entity_ids: &[u128]) -> ResourceTimeline { + let config = TimelineConfig { + num_bins: 1, + start: 0.0, + end: 1.0, + } + .try_into_binned_span(0) + .unwrap() + .try_to_secs_relative(0) + .unwrap(); + ResourceTimeline::Binned(ResourceTimelineBinned { + config, + capacities_values: HashMap::new(), + long_entities: Some(LongEntities { + long_fsms: entity_ids.iter().map(|&id| fsm(id)).collect(), + long_fsms_total: 0, + }), + }) + } + + fn page_ids(data: &ResourceTimeline) -> Vec { + match data { + ResourceTimeline::Binned(d) => ids(&d.long_entities.as_ref().unwrap().long_fsms), + _ => panic!("expected binned"), + } + } + + fn page_total(data: &ResourceTimeline) -> u32 { + match data { + ResourceTimeline::Binned(d) => d.long_entities.as_ref().unwrap().long_fsms_total, + _ => panic!("expected binned"), + } + } + + #[test] + fn paginate_slices_preserving_order() { + let mut data = binned_with_long_entities(&[2, 3, 1, 4]); + paginate_long_fsms(&mut data, None); + assert_eq!(page_total(&data), 4); + assert_eq!(page_ids(&data), vec![2, 3, 1, 4]); + } + + #[test] + fn paginate_pages_are_disjoint() { + let mut p0 = binned_with_long_entities(&[2, 3, 1, 4]); + let mut p1 = binned_with_long_entities(&[2, 3, 1, 4]); + paginate_long_fsms(&mut p0, Some(PageParams { max: 2, page: 0 })); + paginate_long_fsms(&mut p1, Some(PageParams { max: 2, page: 1 })); + assert_eq!((page_total(&p0), page_total(&p1)), (4, 4)); + assert_eq!(page_ids(&p0), vec![2, 3]); + assert_eq!(page_ids(&p1), vec![1, 4]); + } + + #[test] + fn paginate_page_past_end_is_empty_with_full_total() { + let mut data = binned_with_long_entities(&[2, 3, 1, 4]); + paginate_long_fsms(&mut data, Some(PageParams { max: 2, page: 5 })); + assert_eq!(page_total(&data), 4); + assert!(page_ids(&data).is_empty()); + } + + #[test] + fn paginate_max_zero_or_none_returns_all() { + for page in [None, Some(PageParams { max: 0, page: 0 })] { + let mut data = binned_with_long_entities(&[2, 3, 1, 4]); + paginate_long_fsms(&mut data, page); + assert_eq!(page_total(&data), 4); + assert_eq!(page_ids(&data), vec![2, 3, 1, 4]); + } + } + #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct TestGlobalParams { query_id: Uuid, @@ -1031,7 +1279,7 @@ mod tests { to_secs_relative(bin.start(), 0) + params.series_offset as f64 }) .collect::>(); - let long_fsms = params + let long_fsms: Vec = params .operator_id .map(|id| FiniteStateMachine { id, @@ -1055,7 +1303,10 @@ mod tests { let data = ResourceTimeline::Binned(ResourceTimelineBinned { config: config_secs, capacities_values: HashMap::from([("capacity".to_string(), values)]), - long_fsms, + long_entities: Some(LongEntities { + long_fsms_total: long_fsms.len() as u32, + long_fsms, + }), }); Ok(( @@ -1079,7 +1330,10 @@ mod tests { key.to_string(), TimelineRequest::Resource(ResourceTimelineRequest { resource_id: Uuid::from_u128(2), - long_entities_threshold_s: Some(0.0), + long_entities: Some(LongEntitiesParams { + threshold_s: 0.0, + page: None, + }), entity_filter: EntityFilter { entity_type_name: None, }, @@ -1126,7 +1380,12 @@ mod tests { BulkTimelinesResponseEntry::Ok { data: ResourceTimeline::Binned(data), .. - } => data.long_fsms.iter().map(|fsm| fsm.id).collect(), + } => data + .long_entities + .iter() + .flat_map(|le| &le.long_fsms) + .map(|fsm| fsm.id) + .collect(), _ => panic!("expected binned ok response"), } } @@ -1167,7 +1426,7 @@ mod tests { let cache = TimelineCache::new(); let response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 30.0, 80.0, 0, None)]), @@ -1202,7 +1461,7 @@ mod tests { let cache = TimelineCache::new(); cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 25.0, 75.0, 0, None)]), @@ -1210,7 +1469,7 @@ mod tests { .await .unwrap(); let response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 30.0, 80.0, 0, None)]), @@ -1238,7 +1497,7 @@ mod tests { let cache = TimelineCache::new(); cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![ @@ -1249,7 +1508,7 @@ mod tests { .await .unwrap(); let response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![ @@ -1289,7 +1548,7 @@ mod tests { let second_operator = Uuid::from_u128(7); cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 25.0, 75.0, 0, Some(first_operator))]), @@ -1297,7 +1556,7 @@ mod tests { .await .unwrap(); let response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 25.0, 75.0, 0, Some(second_operator))]), @@ -1335,7 +1594,7 @@ mod tests { let cache = TimelineCache::new(); let cold_response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("bad", 25.0, 75.0, 999, None)]), @@ -1345,7 +1604,7 @@ mod tests { assert_error(&cold_response, "bad"); cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![("a", 25.0, 75.0, 0, None)]), @@ -1353,7 +1612,7 @@ mod tests { .await .unwrap(); let partial_response = cache - .cached_bulk_timeline( + .paginate_bulk_timeline( Arc::clone(&analyzer), analyzer.engine_id, request(vec![ diff --git a/domains/query_engine/server/src/ui.rs b/domains/query_engine/server/src/ui.rs index 4efd8027..17f77985 100644 --- a/domains/query_engine/server/src/ui.rs +++ b/domains/query_engine/server/src/ui.rs @@ -234,7 +234,7 @@ where Ok(Json( state .timelines - .cached_single_timeline(analyzer, engine_id, request) + .paginate_single_timeline(analyzer, engine_id, request) .await?, )) } @@ -273,7 +273,7 @@ where Ok(Json( state .timelines - .cached_bulk_timeline(analyzer, engine_id, request) + .paginate_bulk_timeline(analyzer, engine_id, request) .await?, )) } diff --git a/examples/simulator/analyzer/src/lib.rs b/examples/simulator/analyzer/src/lib.rs index 35b8d39c..4560d7c4 100644 --- a/examples/simulator/analyzer/src/lib.rs +++ b/examples/simulator/analyzer/src/lib.rs @@ -15,7 +15,7 @@ use quent_ui::{ }, response::{ BulkChunkedTimelinesResponse, BulkTimelinesResponse, BulkTimelinesResponseEntry, - ResourceTimeline as UiResourceTimeline, ResourceTimelineBinned, + LongEntities, ResourceTimeline as UiResourceTimeline, ResourceTimelineBinned, ResourceTimelineBinnedByState, SingleTimelineResponse, }, }, @@ -32,7 +32,7 @@ use quent_analyzer::{ ResourceTypeDecl, Usage, Using, collection::ResourceCollection, tree::ResourceTreeNode, }, timeline::binned::resource::{ - ResourceTimeline, ResourceTimelineBuilder, ResourceTimelineByKey, + LongUsageEntity, ResourceTimeline, ResourceTimelineBuilder, ResourceTimelineByKey, ResourceTimelineByKeyBuilder, }, }; @@ -261,7 +261,10 @@ impl UiAnalyzer for SimulatorUiAnalyzer { match request.entry { TimelineRequest::Resource(req) => { let resource_type = self.model.resource_type_of(req.resource_id)?; - let long_entities_threshold = req.long_entities_threshold_s.map(to_nanosecs); + let long_entities_threshold = req + .long_entities + .as_ref() + .map(|le| to_nanosecs(le.threshold_s)); let task_filter = req.application; if req.entity_filter.entity_type_name.is_some() { @@ -304,7 +307,10 @@ impl UiAnalyzer for SimulatorUiAnalyzer { } TimelineRequest::ResourceGroup(req) => { let resource_type = self.model.resource_type(&req.resource_type_name)?; - let long_entities_threshold = req.long_entities_threshold_s.map(to_nanosecs); + let long_entities_threshold = req + .long_entities + .as_ref() + .map(|le| to_nanosecs(le.threshold_s)); // Build the resource tree for this group let tree = ResourceTreeNode::try_new(&self.model, req.resource_group_id)?; @@ -758,7 +764,10 @@ impl SimulatorUiAnalyzer { resource_id_filter: [r.resource_id].into_iter().collect(), entity_filter: r.entity_filter, task_filter: r.application, - long_entities_threshold: r.long_entities_threshold_s.map(to_nanosecs), + long_entities_threshold: r + .long_entities + .as_ref() + .map(|le| to_nanosecs(le.threshold_s)), }, TimelineRequest::ResourceGroup(rg) => { let resource_type = self.model.resource_type(&rg.resource_type_name)?; @@ -779,7 +788,10 @@ impl SimulatorUiAnalyzer { resource_id_filter: resource_ids, entity_filter: rg.entity_filter, task_filter: rg.app_params, - long_entities_threshold: rg.long_entities_threshold_s.map(to_nanosecs), + long_entities_threshold: rg + .long_entities + .as_ref() + .map(|le| to_nanosecs(le.threshold_s)), } } }) @@ -802,21 +814,27 @@ impl SimulatorUiAnalyzer { Ok(()) } - /// Turn a list of entity ids into UI-compatible FSM data. - fn task_entities_to_ui_fsm( + /// Resolve ordered long entities into UI FSMs, preserving order. Ranking + /// and pagination are the cache layer's concern, not this one. + fn long_entities_to_ui( &self, - entity_ids: &[Uuid], + entities: &[LongUsageEntity], epoch: TimeUnixNanoSec, - ) -> AnalyzerResult> { - entity_ids + ) -> AnalyzerResult { + let long_fsms = entities .iter() - .filter_map(|&id| { + .filter_map(|lu| { self.model .tasks - .get(&id) + .get(&lu.entity_id) .map(|task| task.try_to_ui_fsm(epoch)) }) - .collect() + .collect::>>()?; + let long_fsms_total = long_fsms.len() as u32; + Ok(LongEntities { + long_fsms, + long_fsms_total, + }) } /// Convert a timeline to a UI-compatible one. @@ -831,11 +849,14 @@ impl SimulatorUiAnalyzer { .into_iter() .map(|(k, v)| (k.to_owned(), v)) .collect(); - let long_fsms = self.task_entities_to_ui_fsm(&result.long_entities, epoch)?; + let long_entities = result + .long_usage_entities + .map(|entities| self.long_entities_to_ui(&entities, epoch)) + .transpose()?; Ok(UiResourceTimeline::Binned(ResourceTimelineBinned { config, capacities_values, - long_fsms, + long_entities, })) } @@ -853,12 +874,15 @@ impl SimulatorUiAnalyzer { .or_insert_with(StdHashMap::new) .insert(state_name.to_owned(), values); } - let long_fsms = self.task_entities_to_ui_fsm(&result.long_entities, epoch)?; + let long_entities = result + .long_usage_entities + .map(|entities| self.long_entities_to_ui(&entities, epoch)) + .transpose()?; Ok(UiResourceTimeline::BinnedByState( ResourceTimelineBinnedByState { config, capacities_states_values, - long_fsms, + long_entities, }, )) } diff --git a/examples/simulator/server/ts-bindings/LongEntities.ts b/examples/simulator/server/ts-bindings/LongEntities.ts new file mode 100644 index 00000000..33f33ac5 --- /dev/null +++ b/examples/simulator/server/ts-bindings/LongEntities.ts @@ -0,0 +1,18 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { FiniteStateMachine } from "./FiniteStateMachine"; + +/** + * Long entities in resource timelines. + */ +export type LongEntities = { +/** + * FSMs that have usage spans exceeding the long entities threshold. + * + * This may be empty if no long entities exist or if they were not requested. + * If pagination is requested, this will only hold one page. + */ +long_fsms: Array, +/** + * Total number of long FSMs, before pagination. + */ +long_fsms_total: number, }; diff --git a/examples/simulator/server/ts-bindings/LongEntitiesParams.ts b/examples/simulator/server/ts-bindings/LongEntitiesParams.ts new file mode 100644 index 00000000..0b93e886 --- /dev/null +++ b/examples/simulator/server/ts-bindings/LongEntitiesParams.ts @@ -0,0 +1,16 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { PageParams } from "./PageParams"; + +/** + * Parameters for requesting long entities in a resource timeline + */ +export type LongEntitiesParams = { +/** + * Fully include entities that have usages exceeding this amount of time. + */ +threshold_s: number, +/** + * Parameters of the page of long entities. If this is not set, return + * all long entities. + */ +page: PageParams | null, }; diff --git a/examples/simulator/server/ts-bindings/PageParams.ts b/examples/simulator/server/ts-bindings/PageParams.ts new file mode 100644 index 00000000..7aff16a2 --- /dev/null +++ b/examples/simulator/server/ts-bindings/PageParams.ts @@ -0,0 +1,14 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Parameters of paginated APIs. + */ +export type PageParams = { +/** + * Maximum number of items per page. + */ +max: number, +/** + * The zero-based page index requested. + */ +page: number, }; diff --git a/examples/simulator/server/ts-bindings/ResourceGroupTimelineRequest.ts b/examples/simulator/server/ts-bindings/ResourceGroupTimelineRequest.ts index 26960f0c..b68ff2a8 100644 --- a/examples/simulator/server/ts-bindings/ResourceGroupTimelineRequest.ts +++ b/examples/simulator/server/ts-bindings/ResourceGroupTimelineRequest.ts @@ -1,5 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { EntityFilter } from "./EntityFilter"; +import type { LongEntitiesParams } from "./LongEntitiesParams"; import type { TimelineConfig } from "./TimelineConfig"; /** @@ -16,10 +17,11 @@ resource_group_id: string, */ resource_type_name: string, /** - * If set, fully include entities that have usages exceeding this amount of - * time in seconds. + * Parameters for dealing with long entities. + * + * If this is not set, then no long entities are returned. */ -long_entities_threshold_s: number | null, +long_entities: LongEntitiesParams | null, /** * Entity filters. */ diff --git a/examples/simulator/server/ts-bindings/ResourceTimelineBinned.ts b/examples/simulator/server/ts-bindings/ResourceTimelineBinned.ts index 6d37cd4d..172c5798 100644 --- a/examples/simulator/server/ts-bindings/ResourceTimelineBinned.ts +++ b/examples/simulator/server/ts-bindings/ResourceTimelineBinned.ts @@ -1,6 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { BinnedSpanSec } from "./BinnedSpanSec"; -import type { FiniteStateMachine } from "./FiniteStateMachine"; +import type { LongEntities } from "./LongEntities"; export type ResourceTimelineBinned = { /** @@ -13,6 +13,6 @@ config: BinnedSpanSec, */ capacities_values: { [key in string]?: Array }, /** - * FSMs that have usage spans exceeding the long_entities_threshold. + * Entities with long usage durations of this resource, if requested. */ -long_fsms: Array, }; +long_entities: LongEntities | null, }; diff --git a/examples/simulator/server/ts-bindings/ResourceTimelineBinnedByState.ts b/examples/simulator/server/ts-bindings/ResourceTimelineBinnedByState.ts index e7d791c5..238b7506 100644 --- a/examples/simulator/server/ts-bindings/ResourceTimelineBinnedByState.ts +++ b/examples/simulator/server/ts-bindings/ResourceTimelineBinnedByState.ts @@ -1,6 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { BinnedSpanSec } from "./BinnedSpanSec"; -import type { FiniteStateMachine } from "./FiniteStateMachine"; +import type { LongEntities } from "./LongEntities"; export type ResourceTimelineBinnedByState = { /** @@ -13,6 +13,6 @@ config: BinnedSpanSec, */ capacities_states_values: { [key in string]?: { [key in string]?: Array } }, /** - * FSMs that have usage spans exceeding the long_entities_threshold. + * Entities with long usage durations of this resource, if requested. */ -long_fsms: Array, }; +long_entities: LongEntities | null, }; diff --git a/examples/simulator/server/ts-bindings/ResourceTimelineRequest.ts b/examples/simulator/server/ts-bindings/ResourceTimelineRequest.ts index abbd3a67..2835bc06 100644 --- a/examples/simulator/server/ts-bindings/ResourceTimelineRequest.ts +++ b/examples/simulator/server/ts-bindings/ResourceTimelineRequest.ts @@ -1,5 +1,6 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { EntityFilter } from "./EntityFilter"; +import type { LongEntitiesParams } from "./LongEntitiesParams"; import type { TimelineConfig } from "./TimelineConfig"; /** @@ -11,9 +12,11 @@ export type ResourceTimelineRequest = { */ resource_id: string, /** - * If set, fully include entities that have usages exceeding this amount of time. + * Parameters for dealing with long entities. + * + * If this is not set, then no long entities are returned. */ -long_entities_threshold_s: number | null, +long_entities: LongEntitiesParams | null, /** * Entity filters. */ diff --git a/ui/packages/@quent/components/src/lib/timeline.utils.test.ts b/ui/packages/@quent/components/src/lib/timeline.utils.test.ts index 77d241db..6f263567 100644 --- a/ui/packages/@quent/components/src/lib/timeline.utils.test.ts +++ b/ui/packages/@quent/components/src/lib/timeline.utils.test.ts @@ -53,7 +53,7 @@ function makeResourceEntry(): TimelineRequest { return { Resource: { resource_id: 'r1', - long_entities_threshold_s: null, + long_entities: null, entity_filter: baseFilter, application: { operator_id: null }, config: baseConfig, @@ -66,7 +66,7 @@ function makeGroupEntry(): TimelineRequest { ResourceGroup: { resource_group_id: 'g1', resource_type_name: 'disk', - long_entities_threshold_s: null, + long_entities: null, entity_filter: baseFilter, app_params: { operator_id: null }, config: baseConfig, diff --git a/ui/packages/@quent/components/src/lib/timeline.utils.ts b/ui/packages/@quent/components/src/lib/timeline.utils.ts index 1507ac33..6ad4e708 100644 --- a/ui/packages/@quent/components/src/lib/timeline.utils.ts +++ b/ui/packages/@quent/components/src/lib/timeline.utils.ts @@ -154,10 +154,10 @@ export function getTimelineConfig(response: SingleTimelineResponse): BinnedSpanS return response.config; } -/** Extract long_fsms from a ResourceTimeline response. */ +/** Extract the long-entity FSMs (current page) from a ResourceTimeline response. */ export function getLongFsms(data: ResourceTimeline): FiniteStateMachine[] { - if ('Binned' in data) return data.Binned.long_fsms; - if ('BinnedByState' in data) return data.BinnedByState.long_fsms; + if ('Binned' in data) return data.Binned.long_entities?.long_fsms ?? []; + if ('BinnedByState' in data) return data.BinnedByState.long_entities?.long_fsms ?? []; return []; } @@ -638,7 +638,7 @@ export function buildBulkParamsForItem( ResourceGroup: { resource_group_id: item.id, resource_type_name: resourceTypeName || '', - long_entities_threshold_s: null, + long_entities: null, entity_filter: { entity_type_name: fsmTypeName }, app_params: { operator_id: operatorId }, config, @@ -649,7 +649,7 @@ export function buildBulkParamsForItem( return { Resource: { resource_id: item.id, - long_entities_threshold_s: threshold, + long_entities: { threshold_s: threshold, page: null }, entity_filter: { entity_type_name: fsmTypeName }, application: { operator_id: operatorId }, config, diff --git a/ui/packages/@quent/components/src/timeline/ResourceTimeline.tsx b/ui/packages/@quent/components/src/timeline/ResourceTimeline.tsx index 5c716045..e8c8e965 100644 --- a/ui/packages/@quent/components/src/timeline/ResourceTimeline.tsx +++ b/ui/packages/@quent/components/src/timeline/ResourceTimeline.tsx @@ -156,7 +156,7 @@ export function ResourceTimeline({ ResourceGroup: { resource_group_id: resourceId, resource_type_name: resourceTypeName ?? '', - long_entities_threshold_s: getLongEntitiesThreshold(windowSeconds), + long_entities: { threshold_s: getLongEntitiesThreshold(windowSeconds), page: null }, entity_filter: { entity_type_name: fsmTypeName ?? null }, app_params: { operator_id: null }, config, @@ -165,7 +165,7 @@ export function ResourceTimeline({ : { Resource: { resource_id: resourceId, - long_entities_threshold_s: getLongEntitiesThreshold(windowSeconds), + long_entities: { threshold_s: getLongEntitiesThreshold(windowSeconds), page: null }, entity_filter: { entity_type_name: fsmTypeName ?? null }, application: { operator_id: null }, config, diff --git a/ui/packages/@quent/hooks/src/timeline/timeline.utils.test.ts b/ui/packages/@quent/hooks/src/timeline/timeline.utils.test.ts index 5e48f087..5774201f 100644 --- a/ui/packages/@quent/hooks/src/timeline/timeline.utils.test.ts +++ b/ui/packages/@quent/hooks/src/timeline/timeline.utils.test.ts @@ -18,7 +18,7 @@ function makeResourceRequest( return { Resource: { resource_id: resourceId, - long_entities_threshold_s: null, + long_entities: null, entity_filter: { entity_type_name: entityTypeName }, application: { operator_id: operatorId }, config: { window_start_s: 0, window_end_s: 1, num_bins: 10 } as never, @@ -36,7 +36,7 @@ function makeGroupRequest( ResourceGroup: { resource_group_id: groupId, resource_type_name: resourceTypeName, - long_entities_threshold_s: null, + long_entities: null, entity_filter: { entity_type_name: entityTypeName }, app_params: { operator_id: operatorId }, config: { window_start_s: 0, window_end_s: 1, num_bins: 10 } as never, diff --git a/ui/src/components/QueryResourceTree.tsx b/ui/src/components/QueryResourceTree.tsx index 3638f3be..31c42ad6 100644 --- a/ui/src/components/QueryResourceTree.tsx +++ b/ui/src/components/QueryResourceTree.tsx @@ -161,7 +161,7 @@ function QueryResourceTreeContent({ queryBundle, engineId }: QueryResourceTreePr ResourceGroup: { resource_group_id: rootResourceGroupId!, resource_type_name: rootResourceType ?? '', - long_entities_threshold_s: null, + long_entities: null, entity_filter: { entity_type_name: null }, app_params: { operator_id: null }, config: {