diff --git a/src/flowgentic/backend_engines/queued_engine.py b/src/flowgentic/backend_engines/queued_engine.py new file mode 100644 index 0000000..55b5eec --- /dev/null +++ b/src/flowgentic/backend_engines/queued_engine.py @@ -0,0 +1,273 @@ +""" +QueuedEngine - Open-loop queueing wrapper for backend engines. + +This wrapper provides proper queueing semantics for saturation experiments: +- Explicit queue buffer between submission and execution +- Fire-and-forget submission (non-blocking) +- Worker pool that pulls from queue +- Event emission for queue timing analysis + +Key events emitted: +- t_submit: Task submitted to queue (returns immediately) +- t_dispatched: Worker picks up task from queue (queue wait ends) +- t_complete: Task execution finished + +This enables measuring: +- Queue delay: t_dispatched - t_submit +- Execution time: t_complete - t_dispatched +- End-to-end latency: t_complete - t_submit +""" + +import asyncio +import time +import uuid +from typing import Any, Callable, Dict, List, Optional, Tuple + +from flowgentic.backend_engines.base import BaseEngine + + +class QueuedEngine(BaseEngine): + """ + Wraps a backend engine with explicit queueing semantics. + + Unlike the direct await pattern (closed-loop with backpressure), + this provides open-loop submission where callers don't block + waiting for execution slots. + """ + + def __init__( + self, + engine: BaseEngine, + max_workers: int, + observer: Optional[Callable[[Dict[str, Any]], None]] = None, + ): + """ + Args: + engine: The underlying execution engine (e.g., AsyncFlowEngine) + max_workers: Number of concurrent workers pulling from queue + observer: Optional callback for event emission + """ + super().__init__(observer=observer) + self._engine = engine + self._max_workers = max_workers + self._queue: asyncio.Queue = asyncio.Queue() # Unbounded buffer + self._workers: List[asyncio.Task] = [] + self._running = False + self._pending_futures: Dict[str, asyncio.Future] = {} + + async def start(self) -> None: + """Start the worker pool. Must be called before submitting tasks.""" + if self._running: + return + self._running = True + self._workers = [ + asyncio.create_task(self._worker(worker_id=i)) + for i in range(self._max_workers) + ] + + async def stop(self) -> None: + """Stop the worker pool gracefully.""" + if not self._running: + return + self._running = False + + # Signal workers to stop + for _ in self._workers: + await self._queue.put(None) + + # Wait for workers to finish + await asyncio.gather(*self._workers, return_exceptions=True) + self._workers = [] + + async def submit( + self, + func: Callable, + *args, + task_kwargs: Optional[Dict[str, Any]] = None, + **kwargs, + ) -> asyncio.Future: + """ + Submit a task for execution (fire-and-forget). + + Returns immediately with a Future that will be resolved + when the task completes. The caller can await the Future + later if they need the result. + + Args: + func: The function to execute + *args: Positional arguments for the function + task_kwargs: Kwargs passed to the task wrapper + **kwargs: Keyword arguments for the function + + Returns: + asyncio.Future that will contain the result + """ + task_id = str(uuid.uuid4()) + task_name = getattr(func, "__name__", str(func)) + future: asyncio.Future = asyncio.get_event_loop().create_future() + + # Emit submit event (task enters queue) + self.emit( + { + "event": "t_submit", + "ts": time.perf_counter(), + "task_id": task_id, + "task_name": task_name, + "queue_size": self._queue.qsize(), + } + ) + + # Package the work item + work_item = { + "task_id": task_id, + "task_name": task_name, + "func": func, + "args": args, + "kwargs": kwargs, + "task_kwargs": task_kwargs or {}, + "future": future, + "submit_ts": time.perf_counter(), + } + + # Non-blocking put (queue is unbounded) + await self._queue.put(work_item) + + return future + + async def _worker(self, worker_id: int) -> None: + """ + Worker coroutine that pulls from queue and executes tasks. + + This is where the actual execution happens. Each worker: + 1. Pulls a work item from the queue (may block here) + 2. Emits t_dispatched event + 3. Executes via the underlying engine + 4. Emits t_complete event + 5. Resolves the Future + """ + while self._running: + try: + work_item = await self._queue.get() + + # None is the shutdown signal + if work_item is None: + break + + task_id = work_item["task_id"] + task_name = work_item["task_name"] + func = work_item["func"] + args = work_item["args"] + kwargs = work_item["kwargs"] + task_kwargs = work_item["task_kwargs"] + future = work_item["future"] + submit_ts = work_item["submit_ts"] + + dispatch_ts = time.perf_counter() + queue_delay = dispatch_ts - submit_ts + + # Emit dispatched event (worker picks up task) + self.emit( + { + "event": "t_dispatched", + "ts": dispatch_ts, + "task_id": task_id, + "task_name": task_name, + "worker_id": worker_id, + "queue_delay": queue_delay, + } + ) + + try: + # Execute via underlying engine + result = await self._engine.execute_tool( + func, *args, task_kwargs=task_kwargs, **kwargs + ) + + complete_ts = time.perf_counter() + exec_time = complete_ts - dispatch_ts + + # Emit completion event + self.emit( + { + "event": "t_complete", + "ts": complete_ts, + "task_id": task_id, + "task_name": task_name, + "worker_id": worker_id, + "exec_time": exec_time, + "total_latency": complete_ts - submit_ts, + } + ) + + # Resolve the future + if not future.done(): + future.set_result(result) + + except Exception as e: + complete_ts = time.perf_counter() + + # Emit error event + self.emit( + { + "event": "t_error", + "ts": complete_ts, + "task_id": task_id, + "task_name": task_name, + "worker_id": worker_id, + "error": str(e), + } + ) + + # Reject the future + if not future.done(): + future.set_exception(e) + + finally: + self._queue.task_done() + + except asyncio.CancelledError: + break + except Exception as e: + # Log but don't crash the worker + self.emit( + { + "event": "worker_error", + "ts": time.perf_counter(), + "worker_id": worker_id, + "error": str(e), + } + ) + + async def execute_tool( + self, + func: Callable, + *args, + task_kwargs: Optional[Dict[str, Any]] = None, + **kwargs, + ) -> Any: + """ + Execute a tool through the queue and wait for result. + + This provides compatibility with the BaseEngine interface + while still using the queueing semantics internally. + """ + future = await self.submit(func, *args, task_kwargs=task_kwargs, **kwargs) + return await future + + def wrap_node(self, node_func: Callable): + """Delegate to underlying engine.""" + return self._engine.wrap_node(node_func) + + async def wait_for_completion(self) -> None: + """Wait for all queued tasks to complete.""" + await self._queue.join() + + @property + def queue_size(self) -> int: + """Current number of items in queue.""" + return self._queue.qsize() + + @property + def is_running(self) -> bool: + """Whether the worker pool is running.""" + return self._running diff --git a/tests/benchmark/config.yml b/tests/benchmark/config.yml index d830730..309a702 100644 --- a/tests/benchmark/config.yml +++ b/tests/benchmark/config.yml @@ -6,3 +6,10 @@ environment: run_description: 'Strong scaling: 512 agents x 64 tools = 32768 total invocations' run_name: N32768-k64-k512-version1-20260217_104215 workload_id: langgraph_asyncflow + +# Throughput Saturation Experiment Config +throughput_saturation: + run_name: throughput-saturation-S2-4-8-D2 + ensemble_sizes: [2, 4, 8] + tool_invocations_sweep: [2, 4, 6, 8, 12, 16, 20, 24, 30, 40] + tool_execution_duration_time: 2 diff --git a/tests/benchmark/data_generation/experiments/throughput_saturation/main.py b/tests/benchmark/data_generation/experiments/throughput_saturation/main.py new file mode 100644 index 0000000..f1dd7d1 --- /dev/null +++ b/tests/benchmark/data_generation/experiments/throughput_saturation/main.py @@ -0,0 +1,228 @@ +import logging +import yaml +import numpy as np +from pathlib import Path +from typing import Any, Dict, List + +from tests.benchmark.data_generation.experiments.base.base_experiment import ( + BaseExperiment, +) +from tests.benchmark.data_generation.experiments.throughput_saturation.utils.plots import ( + ThroughputSaturationPlotter, +) +from tests.benchmark.data_generation.utils.schemas import ( + BenchmarkConfig, + EngineIDs, + WorkloadConfig, + WorkloadResult, +) +from tests.benchmark.data_generation.workload.langgraph import LangraphWorkload + +logger = logging.getLogger(__name__) + + +def compute_latency_stats(events: List[Dict[str, Any]]) -> Dict[str, float]: + """ + Compute latency statistics from QueuedEngine events. + + Extracts end-to-end latency (t_complete - t_submit) for each invocation + and computes percentile statistics. + + Args: + events: List of engine events containing t_submit, t_dispatched, t_complete + + Returns: + Dict with latency statistics: p50, p95, p99, mean, max, and component breakdowns + """ + # Build lookup of submit times by task_id + submit_times: Dict[str, float] = {} + dispatch_times: Dict[str, float] = {} + complete_times: Dict[str, float] = {} + + for event in events: + event_type = event.get("event") + task_id = event.get("task_id") + + if not task_id: + continue + + if event_type == "t_submit": + submit_times[task_id] = event.get("ts", 0) + elif event_type == "t_dispatched": + dispatch_times[task_id] = event.get("ts", 0) + elif event_type == "t_complete": + complete_times[task_id] = event.get("ts", 0) + + # Compute latencies for completed tasks + end_to_end_latencies = [] + queue_delays = [] + exec_times = [] + + for task_id in complete_times: + if task_id in submit_times: + e2e = complete_times[task_id] - submit_times[task_id] + end_to_end_latencies.append(e2e) + + if task_id in dispatch_times: + queue_delay = dispatch_times[task_id] - submit_times[task_id] + exec_time = complete_times[task_id] - dispatch_times[task_id] + queue_delays.append(queue_delay) + exec_times.append(exec_time) + + if not end_to_end_latencies: + return { + "latency_p50": 0.0, + "latency_p95": 0.0, + "latency_p99": 0.0, + "latency_mean": 0.0, + "latency_max": 0.0, + "queue_delay_p95": 0.0, + "exec_time_p95": 0.0, + "n_completed": 0, + } + + latencies = np.array(end_to_end_latencies) + + stats = { + "latency_p50": float(np.percentile(latencies, 50)), + "latency_p95": float(np.percentile(latencies, 95)), + "latency_p99": float(np.percentile(latencies, 99)), + "latency_mean": float(np.mean(latencies)), + "latency_max": float(np.max(latencies)), + "n_completed": len(end_to_end_latencies), + } + + if queue_delays: + stats["queue_delay_p95"] = float(np.percentile(queue_delays, 95)) + stats["exec_time_p95"] = float(np.percentile(exec_times, 95)) + else: + stats["queue_delay_p95"] = 0.0 + stats["exec_time_p95"] = 0.0 + + return stats + + +N_TOOLS = 2 # fetch_temperature, fetch_humidity — fixed by LangraphWorkload +CONFIG_PATH = Path("tests/benchmark/config.yml") + + +class ThroughputSaturation(BaseExperiment): + """ + Throughput saturation experiment: coordination throughput vs invocation rate. + + Sweeps tool_invocations to increase offered load while keeping n_agents=1. + This reduces per-agent orchestration overhead compared to sweeping agents. + Each ensemble_size (n_of_backend_slots) becomes one series on the plot. + + Formulas: + total_invocations = calls_per_tool * N_TOOLS (with n_agents=1) + offered_load = total_invocations / tool_execution_duration_time + throughput = total_invocations / actual_makespan + + Expected shape: curves plateau at S / D (slots / duration). + Larger ensemble sizes yield a higher plateau. + """ + + def __init__( + self, benchmark_config: BenchmarkConfig, data_dir: str, plots_dir: str + ) -> None: + super().__init__(data_dir, plots_dir) + self.benchmark_config = benchmark_config + self.plotter = ThroughputSaturationPlotter(plots_dir=plots_dir) + self._load_experiment_config() + + def _load_experiment_config(self): + """Read experiment-specific sweep parameters from shared config.yml.""" + with open(CONFIG_PATH) as f: + raw = yaml.safe_load(f) + exp_cfg = raw.get("throughput_saturation", {}) + + self.ensemble_sizes: List[int] = exp_cfg.get("ensemble_sizes", [2, 4, 8]) + self.tool_invocations_sweep: List[int] = exp_cfg.get( + "tool_invocations_sweep", [2, 4, 6, 8, 12, 16, 20, 24, 30, 40] + ) + self.tool_duration: int = exp_cfg.get("tool_execution_duration_time", 2) + + async def run_experiment(self) -> Dict[Any, Any]: + results: Dict[str, List[Dict[str, Any]]] = {} + + # Fixed: 1 agent to minimize orchestration overhead + n_agents = 1 + + for ensemble_size in self.ensemble_sizes: + series_key = f"ensemble_size_{ensemble_size}" + logger.info(f"=== {series_key} (n_of_backend_slots={ensemble_size}) ===") + series_results: List[Dict[str, Any]] = [] + + for total_invocations in self.tool_invocations_sweep: + # Derive calls_per_tool from total invocations + # total_invocations = n_agents * calls_per_tool * N_TOOLS + # With n_agents=1: calls_per_tool = total_invocations / N_TOOLS + assert total_invocations % N_TOOLS == 0, ( + f"total_invocations ({total_invocations}) must be divisible by N_TOOLS ({N_TOOLS})" + ) + calls_per_tool = total_invocations // N_TOOLS + + logger.info( + f" total_invocations={total_invocations} (calls_per_tool={calls_per_tool})" + ) + + workload_config = WorkloadConfig( + n_of_agents=n_agents, + n_of_tool_calls_per_agent=calls_per_tool, + n_of_backend_slots=ensemble_size, + tool_execution_duration_time=self.tool_duration, + engine_id=EngineIDs.ASYNCFLOW_QUEUED.value, # Open-loop queueing + ) + + workload_result: WorkloadResult = await self.run_workload( + workload_orchestrator=LangraphWorkload, + workload_config=workload_config, + ) + + offered_load = total_invocations / self.tool_duration + throughput = total_invocations / workload_result.total_makespan + + # Compute latency statistics from queue events + latency_stats = compute_latency_stats(workload_result.events) + + logger.info( + f" offered_load={offered_load:.2f} inv/s " + f"throughput={throughput:.2f} inv/s " + f"makespan={workload_result.total_makespan:.2f}s " + f"p95_latency={latency_stats['latency_p95']:.3f}s" + ) + + series_results.append( + { + "ensemble_size": ensemble_size, + "n_agents": n_agents, + "calls_per_tool": calls_per_tool, + "tool_execution_duration_time": self.tool_duration, + "total_invocations": total_invocations, + "offered_load": offered_load, + "throughput": throughput, + "total_makespan": workload_result.total_makespan, + # Latency statistics + "latency_p50": latency_stats["latency_p50"], + "latency_p95": latency_stats["latency_p95"], + "latency_p99": latency_stats["latency_p99"], + "latency_mean": latency_stats["latency_mean"], + "latency_max": latency_stats["latency_max"], + "queue_delay_p95": latency_stats["queue_delay_p95"], + "exec_time_p95": latency_stats["exec_time_p95"], + "n_completed": latency_stats["n_completed"], + # Raw events for offline analysis + "events": workload_result.events, + } + ) + + results[series_key] = series_results + + # Save results to disk for finalize() to read + self.store_data_to_disk(results) + + return results + + def generate_plots(self, data: Dict[Any, Any]): + self.plotter.plot_results(data) diff --git a/tests/benchmark/data_generation/experiments/throughput_saturation/utils/plots.py b/tests/benchmark/data_generation/experiments/throughput_saturation/utils/plots.py new file mode 100644 index 0000000..7a442a7 --- /dev/null +++ b/tests/benchmark/data_generation/experiments/throughput_saturation/utils/plots.py @@ -0,0 +1,239 @@ +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +import matplotlib.pyplot as plt + +from tests.benchmark.data_generation.experiments.base.base_plots import BasePlotter + +logger = logging.getLogger(__name__) + + +class ThroughputSaturationPlotter(BasePlotter): + """ + Generates throughput saturation plots: + 1. Throughput vs Offered Load (saturation curve) + 2. p95 Latency vs Offered Load (tail latency under contention) + + Each series is an ensemble size (n_of_backend_slots). + """ + + def __init__(self, plots_dir: Optional[Path] = None) -> None: + super().__init__() + self.plots_dir = plots_dir + + def plot_results(self, data: Dict[Any, Any]) -> None: + """Generate both throughput and latency plots.""" + if not data: + logger.warning("No data to plot.") + return + + self._plot_throughput(data) + self._plot_latency(data) + self._plot_combined(data) + + def _plot_throughput(self, data: Dict[Any, Any]) -> None: + """Plot throughput vs offered load (saturation curve).""" + fig, ax = plt.subplots(figsize=(10, 7)) + + sorted_series = sorted( + data.items(), key=lambda item: item[1][0]["ensemble_size"] + ) + + for series_key, records in sorted_series: + if not records: + continue + + ensemble_size = records[0]["ensemble_size"] + offered_loads = [r["offered_load"] for r in records] + throughputs = [r["throughput"] for r in records] + + ax.plot( + offered_loads, + throughputs, + marker="o", + linewidth=2, + markersize=8, + label=f"S={ensemble_size}", + ) + + ax.set_xlabel("Offered Load (invocations/s)", fontsize=13) + ax.set_ylabel("Sustained Throughput (invocations/s)", fontsize=13) + ax.set_title( + "Coordination Throughput vs Invocation Rate\n(Saturation Curve)", + fontsize=14, + ) + ax.legend(loc="lower right", fontsize=11) + ax.grid(True, alpha=0.3) + ax.set_xlim(left=0) + ax.set_ylim(bottom=0) + + plt.tight_layout() + + if self.plots_dir: + plot_path = self.plots_dir / "throughput_vs_load.png" + fig.savefig(plot_path, dpi=150, bbox_inches="tight") + logger.info(f"Saved plot: {plot_path}") + + plt.close(fig) + + def _plot_latency(self, data: Dict[Any, Any]) -> None: + """ + Plot p95 latency vs offered load. + + This reveals how the system behaves under contention. While throughput + characterizes the maximum completion rate, p95 latency shows the + latency experienced by the slowest 5% of invocations, which is + sensitive to queueing, scheduling jitter, and control-plane contention. + """ + fig, ax = plt.subplots(figsize=(10, 7)) + + sorted_series = sorted( + data.items(), key=lambda item: item[1][0]["ensemble_size"] + ) + + for series_key, records in sorted_series: + if not records: + continue + + ensemble_size = records[0]["ensemble_size"] + tool_duration = records[0].get("tool_execution_duration_time", 2) + offered_loads = [r["offered_load"] for r in records] + latencies_p95 = [r.get("latency_p95", 0) for r in records] + + ax.plot( + offered_loads, + latencies_p95, + marker="s", + linewidth=2, + markersize=8, + label=f"S={ensemble_size}", + ) + + # Add reference line for tool duration (theoretical minimum latency) + if sorted_series: + first_records = sorted_series[0][1] + if first_records: + tool_duration = first_records[0].get("tool_execution_duration_time", 2) + max_load = max(r["offered_load"] for r in first_records) + ax.axhline( + y=tool_duration, + color="gray", + linestyle="--", + alpha=0.7, + label=f"Tool duration D={tool_duration}s", + ) + + ax.set_xlabel("Offered Load (invocations/s)", fontsize=13) + ax.set_ylabel("p95 Latency (seconds)", fontsize=13) + ax.set_title( + "Tail Latency vs Invocation Rate\n(p95 End-to-End Latency)", + fontsize=14, + ) + ax.legend(loc="upper left", fontsize=11) + ax.grid(True, alpha=0.3) + ax.set_xlim(left=0) + ax.set_ylim(bottom=0) + + plt.tight_layout() + + if self.plots_dir: + plot_path = self.plots_dir / "latency_vs_load.png" + fig.savefig(plot_path, dpi=150, bbox_inches="tight") + logger.info(f"Saved plot: {plot_path}") + + plt.close(fig) + + def _plot_combined(self, data: Dict[Any, Any]) -> None: + """ + Generate combined 2-panel figure with throughput and latency side-by-side. + + This provides a complete view of system behavior: + - Left: Where does throughput saturate? + - Right: How does latency degrade as we approach saturation? + """ + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6)) + + sorted_series = sorted( + data.items(), key=lambda item: item[1][0]["ensemble_size"] + ) + + colors = plt.cm.tab10.colors + + for idx, (series_key, records) in enumerate(sorted_series): + if not records: + continue + + ensemble_size = records[0]["ensemble_size"] + tool_duration = records[0].get("tool_execution_duration_time", 2) + offered_loads = [r["offered_load"] for r in records] + throughputs = [r["throughput"] for r in records] + latencies_p95 = [r.get("latency_p95", 0) for r in records] + + color = colors[idx % len(colors)] + + # Throughput plot + ax1.plot( + offered_loads, + throughputs, + marker="o", + linewidth=2, + markersize=8, + color=color, + label=f"S={ensemble_size}", + ) + + # Latency plot + ax2.plot( + offered_loads, + latencies_p95, + marker="s", + linewidth=2, + markersize=8, + color=color, + label=f"S={ensemble_size}", + ) + + # Throughput plot formatting + ax1.set_xlabel("Offered Load (invocations/s)", fontsize=12) + ax1.set_ylabel("Throughput (invocations/s)", fontsize=12) + ax1.set_title("Saturation Curve", fontsize=13) + ax1.legend(loc="lower right", fontsize=10) + ax1.grid(True, alpha=0.3) + ax1.set_xlim(left=0) + ax1.set_ylim(bottom=0) + + # Latency plot formatting + if sorted_series: + first_records = sorted_series[0][1] + if first_records: + tool_duration = first_records[0].get("tool_execution_duration_time", 2) + ax2.axhline( + y=tool_duration, + color="gray", + linestyle="--", + alpha=0.7, + label=f"D={tool_duration}s", + ) + + ax2.set_xlabel("Offered Load (invocations/s)", fontsize=12) + ax2.set_ylabel("p95 Latency (seconds)", fontsize=12) + ax2.set_title("Tail Latency Under Load", fontsize=13) + ax2.legend(loc="upper left", fontsize=10) + ax2.grid(True, alpha=0.3) + ax2.set_xlim(left=0) + ax2.set_ylim(bottom=0) + + plt.suptitle( + "Flowgentic Throughput Saturation Analysis", + fontsize=14, + fontweight="bold", + ) + plt.tight_layout() + + if self.plots_dir: + plot_path = self.plots_dir / "saturation_analysis.png" + fig.savefig(plot_path, dpi=150, bbox_inches="tight") + logger.info(f"Saved combined plot: {plot_path}") + + plt.close(fig) diff --git a/tests/benchmark/data_generation/run_experiments.py b/tests/benchmark/data_generation/run_experiments.py index c22afe4..f9ad31b 100644 --- a/tests/benchmark/data_generation/run_experiments.py +++ b/tests/benchmark/data_generation/run_experiments.py @@ -10,6 +10,9 @@ from tests.benchmark.data_generation.experiments.base.base_experiment import ( BaseExperiment, ) +from tests.benchmark.data_generation.experiments.throughput_saturation.main import ( + ThroughputSaturation, +) from tests.benchmark.data_generation.experiments.synthethic_adaptive.main import ( SynthethicAdaptive, ) @@ -30,6 +33,33 @@ logger = logging.getLogger(__name__) +CONFIG_PATH = Path("tests/benchmark/config.yml") + + +def _load_raw_config() -> Dict[str, Any]: + """Load raw YAML config.""" + with open(CONFIG_PATH) as f: + return yaml.safe_load(f) + + +def _create_experiment_dirs(run_name: str, experiment_name: str) -> tuple: + """Create output directories for an experiment.""" + output_dir = Path(f"tests/benchmark/results/{run_name}") + experiment_dir = output_dir / "experiments" / experiment_name + data_dir = experiment_dir / "data" + plots_dir = experiment_dir / "plots" + config_dir = output_dir / "config" + + # Create all directories + data_dir.mkdir(parents=True, exist_ok=True) + plots_dir.mkdir(parents=True, exist_ok=True) + config_dir.mkdir(parents=True, exist_ok=True) + + # Copy config for reproducibility + shutil.copy(CONFIG_PATH, config_dir / "config.yml") + + return data_dir, plots_dir + class FlowGenticBenchmarkManager: """Benchmark harness for FlowGentic scaling tests""" @@ -37,15 +67,26 @@ class FlowGenticBenchmarkManager: def __init__(self, config_path: Path = Path("tests/benchmark/config.yml")): self.io_utils = IOUtils(config_path) self.benchmark_config = self.io_utils.benchmark_config + self.raw_config = _load_raw_config() self.results: Dict[str, List[Dict]] = {} self.experiments: Dict[str, BaseExperiment] = {} def register_experiment( self, experiment_name: str, experiment_class: BaseExperiment ): - data_dir, plots_dir = self.io_utils.create_experiment_directory( - experiment_name=experiment_name - ) + # Check if experiment has its own run_name in config + exp_config = self.raw_config.get(experiment_name, {}) + exp_run_name = exp_config.get("run_name") + + if exp_run_name: + # Use experiment-specific run_name + data_dir, plots_dir = _create_experiment_dirs(exp_run_name, experiment_name) + else: + # Fall back to global run_name (original behavior) + data_dir, plots_dir = self.io_utils.create_experiment_directory( + experiment_name=experiment_name + ) + self.experiments[experiment_name] = { "experiment_class": experiment_class, "data_dir": data_dir, @@ -82,6 +123,9 @@ async def main(): benchmark = FlowGenticBenchmarkManager() + # Throughput saturation experiment + benchmark.register_experiment("throughput_saturation", ThroughputSaturation) + # Experiment 2 benchmark.register_experiment("syntethic_adaptive", SynthethicAdaptive) diff --git a/tests/benchmark/data_generation/utils/schemas.py b/tests/benchmark/data_generation/utils/schemas.py index 81aaab5..a30477f 100644 --- a/tests/benchmark/data_generation/utils/schemas.py +++ b/tests/benchmark/data_generation/utils/schemas.py @@ -14,6 +14,7 @@ class WorkloadType(str, Enum): class EngineIDs(str, Enum): ASYNCFLOW = "asyncflow" + ASYNCFLOW_QUEUED = "asyncflow_queued" # Open-loop queueing wrapper PARSL = "parsl" diff --git a/tests/benchmark/data_generation/workload/utils/engine.py b/tests/benchmark/data_generation/workload/utils/engine.py index f2766da..cb3b962 100644 --- a/tests/benchmark/data_generation/workload/utils/engine.py +++ b/tests/benchmark/data_generation/workload/utils/engine.py @@ -2,10 +2,10 @@ from contextlib import asynccontextmanager from typing import Any, Callable, Dict, Optional -from autogen.code_utils import ThreadPoolExecutor from radical.asyncflow import ConcurrentExecutionBackend, WorkflowEngine from flowgentic.backend_engines.radical_asyncflow import AsyncFlowEngine +from flowgentic.backend_engines.queued_engine import QueuedEngine import multiprocessing @@ -29,5 +29,32 @@ async def resolve_engine( # 3. Shutdown the flow, then manually shut down the executor await flow.shutdown() executor.shutdown(wait=True) + + elif engine_id == "asyncflow_queued": + # QueuedEngine wraps AsyncFlow with explicit open-loop queueing + # This provides proper saturation semantics (fire-and-forget submission) + ctx = multiprocessing.get_context("spawn") + + executor = ProcessPoolExecutor(max_workers=n_of_backend_slots, mp_context=ctx) + + try: + backend = await ConcurrentExecutionBackend(executor) + flow = await WorkflowEngine.create(backend) + base_engine = AsyncFlowEngine(flow, observer=observer) + + # QueuedEngine manages its own worker pool matching backend slots + queued = QueuedEngine( + engine=base_engine, + max_workers=n_of_backend_slots, + observer=observer, + ) + await queued.start() + + yield queued + finally: + await queued.stop() + await flow.shutdown() + executor.shutdown(wait=True) + else: raise Exception(f"Didnt match any engine for engine_id: {engine_id}")