Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions domains/query_engine/analyzer/src/diff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::collections::{HashMap, HashSet};

use quent_attributes::Value;
use quent_query_engine_ui::{
self as ui,
diff::{
Compatibility, DiffDelta, DiffOperatorDelta, DiffOperatorRef, DiffQuerySummary, QueryDiff,
QueryStatDiffs,
},
};
use quent_time::TimeSec;
use uuid::Uuid;

/// Per-query operator data needed to compute a diff.
///
/// Returned by [`UiAnalyzer::query_operator_stats`].
pub struct QueryOperatorStats {
pub engine_id: Uuid,
pub instance_name: Option<String>,
pub query_group_id: Option<Uuid>,
pub query_group_name: Option<String>,
pub duration_s: TimeSec,
/// All operators that worked on this query, keyed by operator ID.
pub operators: HashMap<Uuid, ui::Operator>,
}

fn to_f64(v: &Value) -> Option<f64> {
match v {
Value::U8(n) => Some(*n as f64),
Value::U16(n) => Some(*n as f64),
Value::U32(n) => Some(*n as f64),
Value::U64(n) => Some(*n as f64),
Value::I8(n) => Some(*n as f64),
Value::I16(n) => Some(*n as f64),
Value::I32(n) => Some(*n as f64),
Value::I64(n) => Some(*n as f64),
Value::F32(n) => Some(*n as f64),
Value::F64(n) => Some(*n),
_ => None,
}
}

/// Groups operators by type name and sums all numeric stats within each group.
///
/// Returns a map of `operator_type_name → (count, summed_stats)`.
fn aggregate_by_type(
operators: &HashMap<Uuid, ui::Operator>,
) -> HashMap<String, (usize, HashMap<String, f64>)> {
let mut by_type: HashMap<String, (usize, HashMap<String, f64>)> = HashMap::new();
for op in operators.values() {
if let Some(type_name) = op.operator_type_name.as_deref() {
let entry = by_type.entry(type_name.to_string()).or_default();
entry.0 += 1;
if let Some(stats) = &op.statistics {
for (key, val) in &stats.custom_statistics {
if let Some(v) = val.as_ref().and_then(to_f64) {
*entry.1.entry(key.clone()).or_insert(0.0) += v;
}
}
}
}
}
by_type
}

pub fn compute_diff(
comparison_query_id: Uuid,
baseline: &QueryOperatorStats,
comparison: &QueryOperatorStats,
) -> QueryDiff {
let summary = DiffQuerySummary {
id: comparison_query_id,
engine_id: comparison.engine_id,
instance_name: comparison.instance_name.clone(),
query_group_id: comparison.query_group_id,
query_group_name: comparison.query_group_name.clone(),
};

let duration_delta = comparison.duration_s - baseline.duration_s;
let duration_pct = if baseline.duration_s != 0.0 {
Some(duration_delta / baseline.duration_s * 100.0)
} else {
None
};
let stat_diff = QueryStatDiffs {
duration: DiffDelta {
stats: (
Some(Value::F64(baseline.duration_s)),
Some(Value::F64(comparison.duration_s)),
),
delta: Some(duration_delta),
percent_delta: duration_pct,
},
};

let baseline_by_type = aggregate_by_type(&baseline.operators);
let comparison_by_type = aggregate_by_type(&comparison.operators);

let all_type_names: HashSet<&str> = baseline_by_type
.keys()
.map(|k| k.as_str())
.chain(comparison_by_type.keys().map(|k| k.as_str()))
.collect();

let empty: HashMap<String, f64> = HashMap::new();
let mut warnings: Vec<String> = Vec::new();
let mut operator_diffs: Vec<DiffOperatorDelta> = Vec::new();

for type_name in all_type_names {
let b_entry = baseline_by_type.get(type_name);
let c_entry = comparison_by_type.get(type_name);

if b_entry.is_none() {
warnings.push(format!("operator type '{type_name}' not found in baseline"));
}
if c_entry.is_none() {
warnings.push(format!("operator type '{type_name}' not found in comparison"));
}

let (b_count, b_stats) = b_entry.map(|(c, s)| (*c, s)).unwrap_or((0, &empty));
let (c_count, c_stats) = c_entry.map(|(c, s)| (*c, s)).unwrap_or((0, &empty));

let all_stat_keys: HashSet<&str> = b_stats
.keys()
.map(|k| k.as_str())
.chain(c_stats.keys().map(|k| k.as_str()))
.collect();

let stats: HashMap<String, DiffDelta> = all_stat_keys
.into_iter()
.map(|key| {
let b_val = b_stats.get(key).copied();
let c_val = c_stats.get(key).copied();

let (delta, percent_delta) = match (b_val, c_val) {
(Some(b), Some(c)) => {
let d = c - b;
let pct = if b != 0.0 { Some(d / b * 100.0) } else { None };
(Some(d), pct)
}
_ => (None, None),
};

(
key.to_string(),
DiffDelta {
stats: (b_val.map(Value::F64), c_val.map(Value::F64)),
delta,
percent_delta,
},
)
})
.collect();

operator_diffs.push(DiffOperatorDelta {
operators: (
DiffOperatorRef {
label: type_name.to_string(),
operator_type_name: Some(type_name.to_string()),
count: b_count,
},
DiffOperatorRef {
label: type_name.to_string(),
operator_type_name: Some(type_name.to_string()),
count: c_count,
},
),
stats,
});
}

QueryDiff {
compatibility: Compatibility::Compatible,
query: Some(summary),
operator_diffs: Some(operator_diffs),
stat_diffs: Some(stat_diff),
warnings: if warnings.is_empty() { None } else { Some(warnings) },
}
}
1 change: 1 addition & 0 deletions domains/query_engine/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod model;
pub mod view;

// UI related mods
pub mod diff;
pub mod ui;

pub trait QueryEngineModel: Model {
Expand Down
7 changes: 7 additions & 0 deletions domains/query_engine/analyzer/src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::collections::HashMap;
use quent_analyzer::AnalyzerResult;
use quent_events::Event;
use quent_query_engine_ui as ui;

use crate::diff::QueryOperatorStats;
use quent_ui::timeline::{
request::{BulkChunkedTimelineRequest, BulkTimelineRequest, SingleTimelineRequest},
response::{
Expand Down Expand Up @@ -52,6 +54,11 @@ pub trait UiAnalyzer {
/// non-volumous information related to this query.
fn query_bundle(&self, query_id: Uuid) -> AnalyzerResult<ui::QueryBundle<Self::EntityRef>>;

/// Return the per-operator stats for a single query.
///
/// Used as input to the workload diff computation.
fn query_operator_stats(&self, query_id: Uuid) -> AnalyzerResult<QueryOperatorStats>;

/// Access the underlying query engine model of this analyzer.
fn query_engine_model(&self) -> &impl QueryEngineModel;

Expand Down
4 changes: 3 additions & 1 deletion domains/query_engine/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ where
timelines: TimelineCache::new(),
};

let mut http_routes = axum::Router::new().nest("/api/engines", ui::routes(state));
let mut http_routes = axum::Router::new()
.nest("/api/engines", ui::routes(state.clone()))
.nest("/api", ui::diff_routes(state));

#[cfg(feature = "swagger")]
{
Expand Down
10 changes: 9 additions & 1 deletion domains/query_engine/server/src/timeline_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ mod tests {
use quent_analyzer::AnalyzerResult;
use quent_events::Event;
use quent_query_engine_analyzer::{
QueryEngineModel, engine::Engine, model::InMemoryQueryEngineModel, ui::UiAnalyzer,
QueryEngineModel, diff::QueryOperatorStats, engine::Engine,
model::InMemoryQueryEngineModel, ui::UiAnalyzer,
};
use quent_query_engine_model::engine::{EngineEvent, Exit, Init};
use quent_ui::{
Expand Down Expand Up @@ -961,6 +962,13 @@ mod tests {
unimplemented!("not needed by timeline cache tests")
}

fn query_operator_stats(
&self,
_query_id: Uuid,
) -> AnalyzerResult<QueryOperatorStats> {
unimplemented!("not needed by timeline cache tests")
}

fn query_engine_model(&self) -> &impl QueryEngineModel {
&self.model
}
Expand Down
60 changes: 59 additions & 1 deletion domains/query_engine/server/src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use axum::{
};

use quent_analyzer::AnalyzerResult;
use quent_query_engine_analyzer::diff::compute_diff;
use quent_query_engine_analyzer::{QueryEngineModel, query_group::QueryGroup, ui::UiAnalyzer};
use quent_query_engine_ui as ui;
use quent_ui::timeline::{
Expand All @@ -32,7 +33,6 @@ pub(crate) mod embedded {
#[derive(Embed)]
#[folder = "../../../ui/dist/"]
struct UiAssets;

pub async fn serve(uri: axum::http::Uri) -> impl IntoResponse {
let path = uri.path().trim_start_matches('/');
let file = UiAssets::get(path).or_else(|| UiAssets::get("index.html"));
Expand Down Expand Up @@ -278,6 +278,47 @@ where
))
}

#[cfg_attr(feature = "swagger", utoipa::path(
post,
path = "/api/workload-diff",
tag = "diff",
request_body = Object,
responses(
(status = 200, description = "Workload diff statistics", body = Object)
)
))]
#[tracing::instrument(skip_all, err)]
async fn workload_diff<A>(
State(state): State<ServiceState<A>>,
Json(request): Json<ui::diff::DiffRequest>,
) -> ServerResult<Json<ui::diff::DiffResponse>>
where
A: UiAnalyzer + Send + Sync + 'static,
{
let baseline_analyzer = state
.analyzers
.get(request.baseline_query.engine_id)
.await?;
let baseline_stats =
baseline_analyzer.query_operator_stats(request.baseline_query.query_id)?;

let mut diffs = Vec::new();
for comparison_ref in &request.comparison_queries {
let comparison_analyzer = state.analyzers.get(comparison_ref.engine_id).await?;
let comparison_stats =
comparison_analyzer.query_operator_stats(comparison_ref.query_id)?;
diffs.push(compute_diff(
comparison_ref.query_id,
&baseline_stats,
&comparison_stats,
));
}

Ok(Json(ui::diff::DiffResponse {
comparison_queries: diffs,
}))
}

#[cfg(feature = "swagger")]
#[derive(utoipa::OpenApi)]
#[openapi(
Expand All @@ -289,14 +330,31 @@ where
query,
single_timeline,
bulk_timelines,
workload_diff,
),
tags(
(name = "engines", description = "Engine, query group, and query management"),
(name = "timelines", description = "Resource timeline data"),
(name = "diff", description = "Cross-engine query comparison"),
)
)]
pub(crate) struct ApiDoc;

pub fn diff_routes<A>(state: ServiceState<A>) -> Router<()>
where
A: UiAnalyzer + Send + Sync + 'static,
<A as UiAnalyzer>::EntityRef: serde::Serialize,
<A as UiAnalyzer>::TimelineGlobalParams:
Send + Sync + Clone + serde::Serialize + Hash + Eq + 'static,
<A as UiAnalyzer>::TimelineParams: Send + Sync + Clone + serde::Serialize + Hash + Eq + 'static,
for<'de> <A as UiAnalyzer>::TimelineGlobalParams: serde::Deserialize<'de>,
for<'de> <A as UiAnalyzer>::TimelineParams: serde::Deserialize<'de>,
{
Router::new()
.route("/workload-diff", post(workload_diff))
.with_state(state)
}

pub fn routes<A>(state: ServiceState<A>) -> Router<()>
where
A: UiAnalyzer + Send + Sync + 'static,
Expand Down
Loading
Loading