Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions benchmark/data_generation/experiments/throughput_saturation/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import logging
import yaml
import numpy as np
from pathlib import Path
from typing import Any, Dict, List

from data_generation.experiments.base.base_experiment import (
BaseExperiment,
)
from data_generation.experiments.throughput_saturation.utils.plots import (
ThroughputSaturationPlotter,
)
from data_generation.utils.schemas import (
BenchmarkConfig,
EngineIDs,
WorkloadConfig,
WorkloadResult,
)
from data_generation.workload.langgraph import LangraphWorkload

logger = logging.getLogger(__name__)

CONFIG_PATH = Path("config.yml")


def extract_invocation_metrics(events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Extract per-invocation timing metrics from the event model.

Groups events by invocation_id and computes:
D_resolve = Ts_resolve_end - Ts_invoke_start (FlowGentic registry lookup)
D_backend = Ts_collect_start - Ts_resolve_end (AsyncFlow execution, ≈0 with noop)
D_collect = Ts_collect_end - Ts_collect_start (FlowGentic result handling)
D_overhead = D_resolve + D_collect (total FlowGentic overhead)
D_total = Ts_collect_end - Ts_invoke_start (end-to-end per invocation)

T_run = max(Ts_collect_end) - min(Ts_invoke_start)
throughput = N_completions / T_run
"""
# Group timestamps by invocation_id
by_id: Dict[str, Dict[str, float]] = {}
for event in events:
inv_id = event.get("invocation_id")
if not inv_id:
continue
by_id.setdefault(inv_id, {})
by_id[inv_id][event["event"]] = event["ts"]

d_resolve_list = []
d_backend_list = []
d_collect_list = []
d_overhead_list = []
d_total_list = []
invoke_starts = []
collect_ends = []
cache_hits = []
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cache_hits list is initialized but never used within the function. It can be removed to improve code clarity.


for inv_id, ts in by_id.items():
ts_invoke_start = ts.get("tool_invoke_start")
ts_resolve_end = ts.get("tool_resolve_end")
ts_collect_start = ts.get("tool_collect_start")
ts_collect_end = ts.get("tool_invoke_end") # tool_invoke_end == Ts_collect_end

# Only include complete invocations
if not all([ts_invoke_start, ts_resolve_end, ts_collect_start, ts_collect_end]):
continue
Comment on lines +58 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use explicit None checks for timestamp presence.

A valid timestamp of 0.0 is falsy, so normalized traces can drop complete invocations here and undercount both throughput and latency metrics. Compare against None instead of relying on truthiness.

🛠️ Suggested fix
-		if not all([ts_invoke_start, ts_resolve_end, ts_collect_start, ts_collect_end]):
+		if any(
+			ts is None
+			for ts in (
+				ts_invoke_start,
+				ts_resolve_end,
+				ts_collect_start,
+				ts_collect_end,
+			)
+		):
 			continue
🧰 Tools
🪛 Ruff (0.15.7)

[warning] 58-58: Loop control variable inv_id not used within loop body

(B007)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@benchmark/data_generation/experiments/throughput_saturation/main.py` around
lines 58 - 66, The current check uses truthiness and will drop valid zero
timestamps; update the completeness check to use explicit None comparisons:
replace the if not all([ts_invoke_start, ts_resolve_end, ts_collect_start,
ts_collect_end]): continue with a check such as if not all(v is not None for v
in (ts_invoke_start, ts_resolve_end, ts_collect_start, ts_collect_end)):
continue (or equivalently if any(v is None for v in (...)): continue) so by_id
loop keeps invocations with timestamp 0.0.


d_resolve = ts_resolve_end - ts_invoke_start
d_backend = ts_collect_start - ts_resolve_end
d_collect = ts_collect_end - ts_collect_start
d_overhead = d_resolve + d_collect
d_total = ts_collect_end - ts_invoke_start

d_resolve_list.append(d_resolve)
d_backend_list.append(d_backend)
d_collect_list.append(d_collect)
d_overhead_list.append(d_overhead)
d_total_list.append(d_total)
invoke_starts.append(ts_invoke_start)
collect_ends.append(ts_collect_end)

# Cache hit tracking (from tool_resolve_end events)
cache_hit_events = [e for e in events if e.get("event") == "tool_resolve_end"]
n_cache_hits = sum(1 for e in cache_hit_events if e.get("cache_hit"))
n_total_resolve = len(cache_hit_events)

n_completions = len(d_total_list)
if n_completions == 0:
return {"n_completions": 0}

def _box(arr):
return {
"p5": float(np.percentile(arr, 5)),
"p25": float(np.percentile(arr, 25)),
"p50": float(np.percentile(arr, 50)),
"p75": float(np.percentile(arr, 75)),
"p95": float(np.percentile(arr, 95)),
}

t_run = max(collect_ends) - min(invoke_starts)
throughput = n_completions / t_run if t_run > 0 else 0.0

return {
"n_completions": n_completions,
"t_run": t_run,
"throughput": throughput,
# D_resolve
"d_resolve_mean": float(np.mean(d_resolve_list)),
"d_resolve_p95": float(np.percentile(d_resolve_list, 95)),
# D_backend (≈0 with noop tools)
"d_backend_mean": float(np.mean(d_backend_list)),
"d_backend_p95": float(np.percentile(d_backend_list, 95)),
# D_collect
"d_collect_mean": float(np.mean(d_collect_list)),
"d_collect_p95": float(np.percentile(d_collect_list, 95)),
# D_overhead = D_resolve + D_collect
"d_overhead_mean": float(np.mean(d_overhead_list)),
"d_overhead_p95": float(np.percentile(d_overhead_list, 95)),
# D_total
"d_total_mean": float(np.mean(d_total_list)),
"d_total_p95": float(np.percentile(d_total_list, 95)),
# Overhead fraction (should be ~1.0 with noop tools)
"overhead_fraction_mean": float(np.mean(
[oh / tot for oh, tot in zip(d_overhead_list, d_total_list) if tot > 0]
)),
# Cache hits
"n_cache_hits": n_cache_hits,
"n_total_resolve": n_total_resolve,
# Box summary stats for boxplots (p5/p25/p50/p75/p95)
"d_resolve_box": _box(d_resolve_list),
"d_collect_box": _box(d_collect_list),
"d_overhead_box": _box(d_overhead_list),
"d_backend_box": _box(d_backend_list),
}


class ThroughputSaturation(BaseExperiment):
"""
FlowGentic coordination throughput saturation experiment.

Uses noop tools (tool_execution_duration_time=0) so D_backend ≈ 0.
Sweeps n_of_agents to increase concurrent load on FlowGentic's async event loop.
Measures where FlowGentic's coordination throughput saturates.

Throughput = N_completions / T_run
where T_run = max(Ts_collect_end) - min(Ts_invoke_start)
"""

def __init__(
self, benchmark_config: BenchmarkConfig, data_dir: str, plots_dir: str
) -> None:
super().__init__(data_dir, plots_dir)
self.benchmark_config = benchmark_config
self.plotter = ThroughputSaturationPlotter(plots_dir=plots_dir)
self.results: Dict[str, Any] = {}
self._load_experiment_config()
Comment on lines +153 to +156
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't bypass the already-loaded benchmark config.

FlowGenticBenchmarkManager already builds benchmark_config from its configured path, but this experiment then reopens Path("config.yml") directly. That makes custom config files and different working directories diverge from the rest of the benchmark run. Please pass the resolved config path down, or source these sweep values from the already-loaded configuration object instead.

Also applies to: 158-167

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@benchmark/data_generation/experiments/throughput_saturation/main.py` around
lines 153 - 156, The experiment is reopening a hard-coded Path("config.yml")
instead of using the already-resolved benchmark config; update the code in
ThroughputSaturationExperiment (and the _load_experiment_config logic) to use
the existing self.benchmark_config (or accept and use a resolved config_path
passed down from FlowGenticBenchmarkManager) when reading sweep values and
settings rather than opening Path("config.yml"); ensure any logic at the block
referenced (lines ~158-167) sources values from the benchmark_config object or
the passed config_path so custom config files and working directories remain
consistent across the benchmark run.


def _load_experiment_config(self):
"""Read experiment-specific sweep parameters from config.yml."""
with open(CONFIG_PATH) as f:
raw = yaml.safe_load(f)
exp_cfg = raw.get("throughput_saturation", {})

self.agent_sweep: List[int] = exp_cfg.get("agent_sweep", [1, 2, 4, 8, 16, 32, 64, 128])
self.n_of_tool_calls_per_agent: int = exp_cfg.get("n_of_tool_calls_per_agent", 64)
self.n_of_backend_slots: int = exp_cfg.get("n_of_backend_slots", 512)
self.tool_execution_duration_time: int = exp_cfg.get("tool_execution_duration_time", 0)

async def run_experiment(self) -> None:
workloads_results = []

logger.info("=== FLOWGENTIC THROUGHPUT SATURATION (noop tools) ===")
logger.info(
f"agent_sweep={self.agent_sweep} "
f"k={self.n_of_tool_calls_per_agent} "
f"S={self.n_of_backend_slots} "
f"D={self.tool_execution_duration_time}"
)

for n_agents in self.agent_sweep:
total_invocations = n_agents * self.n_of_tool_calls_per_agent
logger.info(f"\n--- n_agents={n_agents} total_invocations={total_invocations} ---")

workload_config = WorkloadConfig(
n_of_agents=n_agents,
n_of_tool_calls_per_agent=self.n_of_tool_calls_per_agent,
n_of_backend_slots=self.n_of_backend_slots,
tool_execution_duration_time=self.tool_execution_duration_time,
engine_id=EngineIDs.ASYNCFLOW.value,
)

workload_result: WorkloadResult = await self.run_workload(
workload_orchestrator=LangraphWorkload,
workload_config=workload_config,
)

metrics = extract_invocation_metrics(workload_result.events)

logger.info(
f" throughput={metrics.get('throughput', 0):.2f} inv/s "
f"t_run={metrics.get('t_run', 0):.3f}s "
f"d_overhead_mean={metrics.get('d_overhead_mean', 0)*1000:.2f}ms "
f"d_total_p95={metrics.get('d_total_p95', 0)*1000:.2f}ms"
)

workloads_results.append(
{
"n_agents": n_agents,
"n_of_tool_calls_per_agent": self.n_of_tool_calls_per_agent,
"n_of_backend_slots": self.n_of_backend_slots,
"tool_execution_duration_time": self.tool_execution_duration_time,
"total_invocations": total_invocations,
"total_makespan": workload_result.total_makespan,
**metrics,
}
)

# Incremental save after each iteration
self.results["throughput_saturation"] = workloads_results
self.store_data_to_disk(self.results)

def generate_plots(self, data: Dict[Any, Any]):
self.plotter.plot_results(data)
Loading