diff --git a/src/lib.rs b/src/lib.rs index b439de2..4b19004 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,11 +23,174 @@ use crate::types::*; pub mod parsers; mod templates; mod types; +pub mod execution_order { + use fxhash::FxHashMap; + + /// Issue types detected at a given execution index across ranks + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum ExecOrderIssue { + ScheduleMismatch, + CacheSkew, + } + + /// One row in the execution-order report + #[derive(Debug, Clone)] + pub struct ExecOrderIndexRow { + /// Zero-based index into per-rank execution orders + pub index: usize, + /// Mapping: rank -> compile_id (compile directory name) + pub by_rank: FxHashMap, + /// Issues found for this index + pub issues: Vec, + } + + /// Final report for execution-order diagnostics + #[derive(Debug, Clone, Default)] + pub struct ExecOrderReport { + pub by_index: Vec, + } + + /// Analyze per-rank execution orders, aligning entries by index and flagging issues + /// using provided per-graph properties. + /// + /// - `exec_orders`: rank -> ordered list of compile_id directory names + /// - `collective_schedule_by_graph`: (rank, compile_id) -> schedule ops sequence + /// - `cache_status_by_graph`: (rank, compile_id) -> cache status marker (e.g., "✅", "❌", "❓") + /// + /// Notes: + /// - O(K·N) pass over data; uses internal memoization for property lookups + /// - Evaluates comparisons only among present ranks at each index + pub fn analyze_execution_order( + exec_orders: &FxHashMap>, + collective_schedule_by_graph: &FxHashMap<(u32, String), Vec>, + cache_status_by_graph: &FxHashMap<(u32, String), String>, + ) -> ExecOrderReport { + // Determine max length across ranks (N) + let max_len = exec_orders.values().map(|v| v.len()).max().unwrap_or(0); + + // Fast no-op + if max_len == 0 || exec_orders.is_empty() { + return ExecOrderReport::default(); + } + + // Memoize property lookups per (rank, compile_id) + let mut sched_memo: FxHashMap<(u32, String), Option>> = FxHashMap::default(); + let mut cache_memo: FxHashMap<(u32, String), Option> = FxHashMap::default(); + + let mut rows: Vec = Vec::with_capacity(max_len); + + for idx in 0..max_len { + // Gather present ranks and their compile_ids at this index + let mut by_rank: FxHashMap = FxHashMap::default(); + for (&rank, order) in exec_orders.iter() { + if let Some(cid) = order.get(idx) { + by_rank.insert(rank, cid.clone()); + } + } + + if by_rank.is_empty() { + continue; + } + + // Evaluate issues among present ranks + let mut issues: Vec = Vec::new(); + + // Schedule mismatch: compare collective op sequences + { + // Collect schedules for ranks that have them (clone to avoid borrowing issues) + let mut schedules: Vec> = Vec::new(); + for (&rank, cid) in by_rank.iter() { + let key = (rank, cid.clone()); + let entry = sched_memo.entry((rank, cid.clone())).or_insert_with(|| { + collective_schedule_by_graph + .get(&key) + .cloned() + .map(Some) + .unwrap_or(None) + }); + if let Some(ref ops) = entry { + schedules.push(ops.clone()); + } + } + if schedules.len() >= 2 { + let first = &schedules[0]; + if schedules.iter().any(|s| s != first) { + issues.push(ExecOrderIssue::ScheduleMismatch); + } + } + } + + // Cache skew: compare cache status markers + { + let mut statuses: Vec = Vec::new(); + for (&rank, cid) in by_rank.iter() { + let key = (rank, cid.clone()); + let entry = cache_memo.entry((rank, cid.clone())).or_insert_with(|| { + cache_status_by_graph + .get(&key) + .cloned() + .map(Some) + .unwrap_or(None) + }); + if let Some(ref s) = entry { + if !s.is_empty() { + statuses.push(s.clone()); + } + } + } + if statuses.len() >= 2 { + let first = &statuses[0]; + if statuses.iter().any(|s| s != first) { + issues.push(ExecOrderIssue::CacheSkew); + } + } + } + + rows.push(ExecOrderIndexRow { index: idx, by_rank, issues }); + } + + ExecOrderReport { by_index: rows } + } + + /// Parse the JSON payload for `artifact.name = "graph_execution"` and extract + /// the `graph_execution_order` sequence. The payload shape is expected to be: + /// { "graph_execution_order": [ "", ... ] } + /// This function is tolerant to variations where entries might be objects with a + /// `graph`/`compile_id`/`id` string field; in such cases it prefers `graph`, then + /// `compile_id`, then `id`. + pub fn parse_graph_execution_order(payload: &str) -> anyhow::Result> { + let value: serde_json::Value = serde_json::from_str(payload)?; + let arr = value + .get("graph_execution_order") + .and_then(|v| v.as_array()) + .ok_or_else(|| anyhow::anyhow!("missing graph_execution_order array"))?; + + let mut out = Vec::with_capacity(arr.len()); + for item in arr { + match item { + serde_json::Value::String(s) => out.push(s.clone()), + serde_json::Value::Object(map) => { + if let Some(s) = map + .get("graph") + .and_then(|v| v.as_str()) + .or_else(|| map.get("compile_id").and_then(|v| v.as_str())) + .or_else(|| map.get("id").and_then(|v| v.as_str())) + { + out.push(s.to_string()); + } + } + _ => {} + } + } + Ok(out) + } +} pub use types::{ ArtifactFlags, Diagnostics, DivergenceFlags, DivergenceGroup, GraphAnalysis, GraphRuntime, RankMetaData, RuntimeAnalysis, RuntimeRankDetail, }; +pub use execution_order::{analyze_execution_order, parse_graph_execution_order, ExecOrderIssue, ExecOrderIndexRow, ExecOrderReport}; #[derive(Debug)] enum ParserResult { diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 2569cb7..9144a9b 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -397,6 +397,70 @@ fn test_export_guard_report() { } } +#[test] +fn test_execution_order_end_to_end() { + // Build exec orders from JSON payloads (parser + analyzer together) + let p0 = r#"{"graph_execution_order": ["G1", {"graph":"G2"}, {"compile_id":"G3"}, {"id":"G4"}]}"#; + let p1 = r#"{"graph_execution_order": ["G1", "G2", "G3"]}"#; + let seq0 = tlparse::execution_order::parse_graph_execution_order(p0).unwrap(); + let seq1 = tlparse::execution_order::parse_graph_execution_order(p1).unwrap(); + let exec_orders: fxhash::FxHashMap> = + vec![(0_u32, seq0), (1_u32, seq1)].into_iter().collect(); + + // Collective schedules: equal at idx0, mismatch at idx1, equal at idx2, only rank0 at idx3 + let collective: fxhash::FxHashMap<(u32, String), Vec> = vec![ + ((0_u32, "G1".into()), vec!["op".into()]), + ((1_u32, "G1".into()), vec!["op".into()]), + ((0_u32, "G2".into()), vec!["all_reduce".into(), "barrier".into()]), + ((1_u32, "G2".into()), vec!["barrier".into(), "all_reduce".into()]), + ((0_u32, "G3".into()), vec!["noop".into()]), + ((1_u32, "G3".into()), vec!["noop".into()]), + ((0_u32, "G4".into()), vec!["noop".into()]), + ] + .into_iter() + .collect(); + + // Cache statuses: equal at idx0, blank on rank1 at idx1 (ignored), skew at idx2, rank0-only at idx3 + let cache: fxhash::FxHashMap<(u32, String), String> = vec![ + ((0_u32, "G1".into()), "✅".into()), + ((1_u32, "G1".into()), "✅".into()), + ((0_u32, "G2".into()), "✅".into()), + ((1_u32, "G2".into()), "".into()), + ((0_u32, "G3".into()), "✅".into()), + ((1_u32, "G3".into()), "❌".into()), + ((0_u32, "G4".into()), "❌".into()), + ] + .into_iter() + .collect(); + + let report = tlparse::execution_order::analyze_execution_order(&exec_orders, &collective, &cache); + assert_eq!(report.by_index.len(), 4); + + // idx0: both ranks present, equal schedules and cache => no issues + assert_eq!(report.by_index[0].by_rank.len(), 2); + assert!(report.by_index[0].issues.is_empty()); + + // idx1: both ranks present, schedule mismatch; blank cache ignored => only ScheduleMismatch + assert!(report.by_index[1] + .issues + .iter() + .any(|&i| i == tlparse::execution_order::ExecOrderIssue::ScheduleMismatch)); + assert!(!report.by_index[1] + .issues + .iter() + .any(|&i| i == tlparse::execution_order::ExecOrderIssue::CacheSkew)); + + // idx2: cache skew across ranks + assert!(report.by_index[2] + .issues + .iter() + .any(|&i| i == tlparse::execution_order::ExecOrderIssue::CacheSkew)); + + // idx3: only rank0 present => nothing to compare + assert_eq!(report.by_index[3].by_rank.len(), 1); + assert!(report.by_index[3].issues.is_empty()); +} + #[test] fn test_provenance_tracking_aot_cuda() { let expected_files = [