Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,174 @@ use crate::types::*;
pub mod parsers;
mod templates;
mod types;
pub mod execution_order {
use fxhash::FxHashMap;

/// Issue types detected at a given execution index across ranks
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecOrderIssue {
ScheduleMismatch,
CacheSkew,
}

/// One row in the execution-order report
#[derive(Debug, Clone)]
pub struct ExecOrderIndexRow {
/// Zero-based index into per-rank execution orders
pub index: usize,
/// Mapping: rank -> compile_id (compile directory name)
pub by_rank: FxHashMap<u32, String>,
/// Issues found for this index
pub issues: Vec<ExecOrderIssue>,
}

/// Final report for execution-order diagnostics
#[derive(Debug, Clone, Default)]
pub struct ExecOrderReport {
pub by_index: Vec<ExecOrderIndexRow>,
}

/// Analyze per-rank execution orders, aligning entries by index and flagging issues
/// using provided per-graph properties.
///
/// - `exec_orders`: rank -> ordered list of compile_id directory names
/// - `collective_schedule_by_graph`: (rank, compile_id) -> schedule ops sequence
/// - `cache_status_by_graph`: (rank, compile_id) -> cache status marker (e.g., "✅", "❌", "❓")
///
/// Notes:
/// - O(K·N) pass over data; uses internal memoization for property lookups
/// - Evaluates comparisons only among present ranks at each index
pub fn analyze_execution_order(
exec_orders: &FxHashMap<u32, Vec<String>>,
collective_schedule_by_graph: &FxHashMap<(u32, String), Vec<String>>,
cache_status_by_graph: &FxHashMap<(u32, String), String>,
) -> ExecOrderReport {
// Determine max length across ranks (N)
let max_len = exec_orders.values().map(|v| v.len()).max().unwrap_or(0);

// Fast no-op
if max_len == 0 || exec_orders.is_empty() {
return ExecOrderReport::default();
}

// Memoize property lookups per (rank, compile_id)
let mut sched_memo: FxHashMap<(u32, String), Option<Vec<String>>> = FxHashMap::default();
let mut cache_memo: FxHashMap<(u32, String), Option<String>> = FxHashMap::default();

let mut rows: Vec<ExecOrderIndexRow> = Vec::with_capacity(max_len);

for idx in 0..max_len {
// Gather present ranks and their compile_ids at this index
let mut by_rank: FxHashMap<u32, String> = FxHashMap::default();
for (&rank, order) in exec_orders.iter() {
if let Some(cid) = order.get(idx) {
by_rank.insert(rank, cid.clone());
}
}

if by_rank.is_empty() {
continue;
}

// Evaluate issues among present ranks
let mut issues: Vec<ExecOrderIssue> = Vec::new();

// Schedule mismatch: compare collective op sequences
{
// Collect schedules for ranks that have them (clone to avoid borrowing issues)
let mut schedules: Vec<Vec<String>> = Vec::new();
for (&rank, cid) in by_rank.iter() {
let key = (rank, cid.clone());
let entry = sched_memo.entry((rank, cid.clone())).or_insert_with(|| {
collective_schedule_by_graph
.get(&key)
.cloned()
.map(Some)
.unwrap_or(None)
});
if let Some(ref ops) = entry {
schedules.push(ops.clone());
}
}
if schedules.len() >= 2 {
let first = &schedules[0];
if schedules.iter().any(|s| s != first) {
issues.push(ExecOrderIssue::ScheduleMismatch);
}
}
}

// Cache skew: compare cache status markers
{
let mut statuses: Vec<String> = Vec::new();
for (&rank, cid) in by_rank.iter() {
let key = (rank, cid.clone());
let entry = cache_memo.entry((rank, cid.clone())).or_insert_with(|| {
cache_status_by_graph
.get(&key)
.cloned()
.map(Some)
.unwrap_or(None)
});
if let Some(ref s) = entry {
if !s.is_empty() {
statuses.push(s.clone());
}
}
}
if statuses.len() >= 2 {
let first = &statuses[0];
if statuses.iter().any(|s| s != first) {
issues.push(ExecOrderIssue::CacheSkew);
}
}
}

rows.push(ExecOrderIndexRow { index: idx, by_rank, issues });
}

ExecOrderReport { by_index: rows }
}

/// Parse the JSON payload for `artifact.name = "graph_execution"` and extract
/// the `graph_execution_order` sequence. The payload shape is expected to be:
/// { "graph_execution_order": [ "<compile_id_dir>", ... ] }
/// This function is tolerant to variations where entries might be objects with a
/// `graph`/`compile_id`/`id` string field; in such cases it prefers `graph`, then
/// `compile_id`, then `id`.
pub fn parse_graph_execution_order(payload: &str) -> anyhow::Result<Vec<String>> {
let value: serde_json::Value = serde_json::from_str(payload)?;
let arr = value
.get("graph_execution_order")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("missing graph_execution_order array"))?;

let mut out = Vec::with_capacity(arr.len());
for item in arr {
match item {
serde_json::Value::String(s) => out.push(s.clone()),
serde_json::Value::Object(map) => {
if let Some(s) = map
.get("graph")
.and_then(|v| v.as_str())
.or_else(|| map.get("compile_id").and_then(|v| v.as_str()))
.or_else(|| map.get("id").and_then(|v| v.as_str()))
{
out.push(s.to_string());
}
}
_ => {}
}
}
Ok(out)
}
}

pub use types::{
ArtifactFlags, Diagnostics, DivergenceFlags, DivergenceGroup, GraphAnalysis, GraphRuntime,
RankMetaData, RuntimeAnalysis, RuntimeRankDetail,
};
pub use execution_order::{analyze_execution_order, parse_graph_execution_order, ExecOrderIssue, ExecOrderIndexRow, ExecOrderReport};

#[derive(Debug)]
enum ParserResult {
Expand Down
64 changes: 64 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,70 @@ fn test_export_guard_report() {
}
}

#[test]
fn test_execution_order_end_to_end() {
// Build exec orders from JSON payloads (parser + analyzer together)
let p0 = r#"{"graph_execution_order": ["G1", {"graph":"G2"}, {"compile_id":"G3"}, {"id":"G4"}]}"#;
let p1 = r#"{"graph_execution_order": ["G1", "G2", "G3"]}"#;
let seq0 = tlparse::execution_order::parse_graph_execution_order(p0).unwrap();
let seq1 = tlparse::execution_order::parse_graph_execution_order(p1).unwrap();
let exec_orders: fxhash::FxHashMap<u32, Vec<String>> =
vec![(0_u32, seq0), (1_u32, seq1)].into_iter().collect();

// Collective schedules: equal at idx0, mismatch at idx1, equal at idx2, only rank0 at idx3
let collective: fxhash::FxHashMap<(u32, String), Vec<String>> = vec![
((0_u32, "G1".into()), vec!["op".into()]),
((1_u32, "G1".into()), vec!["op".into()]),
((0_u32, "G2".into()), vec!["all_reduce".into(), "barrier".into()]),
((1_u32, "G2".into()), vec!["barrier".into(), "all_reduce".into()]),
((0_u32, "G3".into()), vec!["noop".into()]),
((1_u32, "G3".into()), vec!["noop".into()]),
((0_u32, "G4".into()), vec!["noop".into()]),
]
.into_iter()
.collect();

// Cache statuses: equal at idx0, blank on rank1 at idx1 (ignored), skew at idx2, rank0-only at idx3
let cache: fxhash::FxHashMap<(u32, String), String> = vec![
((0_u32, "G1".into()), "✅".into()),
((1_u32, "G1".into()), "✅".into()),
((0_u32, "G2".into()), "✅".into()),
((1_u32, "G2".into()), "".into()),
((0_u32, "G3".into()), "✅".into()),
((1_u32, "G3".into()), "❌".into()),
((0_u32, "G4".into()), "❌".into()),
]
.into_iter()
.collect();

let report = tlparse::execution_order::analyze_execution_order(&exec_orders, &collective, &cache);
assert_eq!(report.by_index.len(), 4);

// idx0: both ranks present, equal schedules and cache => no issues
assert_eq!(report.by_index[0].by_rank.len(), 2);
assert!(report.by_index[0].issues.is_empty());

// idx1: both ranks present, schedule mismatch; blank cache ignored => only ScheduleMismatch
assert!(report.by_index[1]
.issues
.iter()
.any(|&i| i == tlparse::execution_order::ExecOrderIssue::ScheduleMismatch));
assert!(!report.by_index[1]
.issues
.iter()
.any(|&i| i == tlparse::execution_order::ExecOrderIssue::CacheSkew));

// idx2: cache skew across ranks
assert!(report.by_index[2]
.issues
.iter()
.any(|&i| i == tlparse::execution_order::ExecOrderIssue::CacheSkew));

// idx3: only rank0 present => nothing to compare
assert_eq!(report.by_index[3].by_rank.len(), 1);
assert!(report.by_index[3].issues.is_empty());
}

#[test]
fn test_provenance_tracking_aot_cuda() {
let expected_files = [
Expand Down