Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ examples-ca-asyncflow: ## CrewAI + Asyncfow
$(VENV_ACTIVATE) && python3 -m examples.crewai_asyncflow
examples-ag-asyncflow: ## AutoGen + Asyncfow
$(VENV_ACTIVATE) && python3 -m examples.autogen_asyncflow
examples-ac-asyncflow: ## Academy + Asyncflow
$(VENV_ACTIVATE) && python3 -m examples.academy_asyncflow
examples-lg-parsl: ## AutoGen + Parsl
$(VENV_ACTIVATE) && python3 -m examples.langgraph_parsl
examples-ag-parsl: ## AutoGen + Parsl
Expand Down
85 changes: 85 additions & 0 deletions examples/academy_asyncflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
import logging
import time
from dotenv import load_dotenv

from radical.asyncflow import LocalExecutionBackend, WorkflowEngine
from concurrent.futures import ThreadPoolExecutor

from academy.agent import Agent, action
from academy.exchange.local import LocalExchangeFactory
from academy.manager import Manager

from flowgentic.backend_engines.radical_asyncflow import AsyncFlowEngine
from flowgentic.agent_orchestration_frameworks.academy import AcademyOrchestrator

load_dotenv()

# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class WeatherAgent(Agent):
"""Academy agent that fetches weather data via HPC-backed tools."""

def __init__(self, fetch_temperature_fn, fetch_humidity_fn) -> None:
self._fetch_temperature = fetch_temperature_fn
self._fetch_humidity = fetch_humidity_fn

@action
async def get_weather(self, location: str = "SFO") -> dict:
"""Fetch temperature and humidity for a location."""
temperature = await self._fetch_temperature(location=location)
humidity = await self._fetch_humidity(location=location)
Comment on lines +33 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _fetch_temperature and _fetch_humidity calls are independent and can be executed concurrently. Using asyncio.gather will run these operations in parallel, which can improve performance by reducing the total execution time from ~4 seconds to ~2 seconds in this example.

Suggested change
temperature = await self._fetch_temperature(location=location)
humidity = await self._fetch_humidity(location=location)
temperature, humidity = await asyncio.gather(
self._fetch_temperature(location=location),
self._fetch_humidity(location=location)
)

return {**temperature, **humidity}


async def start_app():
# --- SETUP HPC BACKEND ---
backend = await LocalExecutionBackend(ThreadPoolExecutor(max_workers=2))
flow = await WorkflowEngine.create(backend)

# --- INITIALIZE FLOWGENTIC ---
engine = AsyncFlowEngine(flow)
orchestrator = AcademyOrchestrator(engine)

# --- DEFINE HPC TOOLS ---
@orchestrator.hpc_task
async def fetch_temperature(location: str = "SFO") -> dict:
"""Fetches temperature of a given city."""
logger.info(f"Executing temperature tool for {location}")
await asyncio.sleep(2)
return {"temperature": 70, "location": location}

@orchestrator.hpc_task
async def fetch_humidity(location: str = "SFO") -> dict:
"""Fetches humidity of a given city."""
logger.info(f"Executing humidity tool for {location}")
await asyncio.sleep(2)
return {"humidity": 50, "location": location}

# --- LAUNCH AGENT AND EXECUTE ---
t_start = time.perf_counter()

manager = await Manager.from_exchange_factory(
factory=LocalExchangeFactory(),
executors=None,
)

async with manager:
handle = await manager.launch(
WeatherAgent,
args=(fetch_temperature, fetch_humidity),
)
result = await handle.get_weather(location="SFO")
logger.info(f"Weather result: {result}")

t_end = time.perf_counter()
logger.info(f"Workflow finished in {t_end - t_start:.4f} seconds")

await flow.shutdown()


if __name__ == "__main__":
asyncio.run(start_app())
92 changes: 92 additions & 0 deletions src/flowgentic/agent_orchestration_frameworks/academy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from functools import wraps
import time
import uuid
from typing import Callable

from flowgentic.agent_orchestration_frameworks.base import AgentOrchestrator
from flowgentic.backend_engines.base import BaseEngine


class AcademyOrchestrator(AgentOrchestrator):
def __init__(self, engine: BaseEngine) -> None:
self.engine = engine

def hpc_task(self, func: Callable):
"""
Wraps a function to execute as an HPC task.
"""
task_name = getattr(func, "__name__", str(func))
wrap_id = str(uuid.uuid4())

self.engine.emit(
{
"event": "tool_wrap_start",
"ts": time.perf_counter(),
"tool_name": task_name,
"wrap_id": wrap_id,
}
)

@wraps(func)
async def wrapper(*args, **kwargs):
invocation_id = str(uuid.uuid4())
self.engine.emit(
{
"event": "tool_invoke_start",
"ts": time.perf_counter(),
"tool_name": task_name,
"invocation_id": invocation_id,
}
)
result = await self.engine.execute_tool(
func, *args, invocation_id=invocation_id, **kwargs
)
self.engine.emit(
{
"event": "tool_invoke_end",
"ts": time.perf_counter(),
"tool_name": task_name,
"invocation_id": invocation_id,
}
)
return result

self.engine.emit(
{
"event": "tool_wrap_end",
"ts": time.perf_counter(),
"tool_name": task_name,
"wrap_id": wrap_id,
}
)

return wrapper

def hpc_block(self, block_func: Callable):
"""
Wraps a function to execute as an HPC block.
"""
block_name = getattr(block_func, "__name__", str(block_func))
wrap_id = str(uuid.uuid4())

self.engine.emit(
{
"event": "block_wrap_start",
"ts": time.perf_counter(),
"block_name": block_name,
"wrap_id": wrap_id,
}
)

wrapped_block = self.engine.wrap_node(block_func)

self.engine.emit(
{
"event": "block_wrap_end",
"ts": time.perf_counter(),
"block_name": block_name,
"wrap_id": wrap_id,
}
)

return wrapped_block
Comment on lines +10 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of AcademyOrchestrator is nearly identical to the updated AutoGenOrchestrator. Both hpc_task and hpc_block methods contain the same logic for event emission and wrapping. To improve maintainability and follow the DRY (Don't Repeat Yourself) principle, this duplicated code should be moved to the AgentOrchestrator base class.

31 changes: 25 additions & 6 deletions src/flowgentic/agent_orchestration_frameworks/autogen.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,42 @@ def hpc_task(self, func: Callable):
# Emit setup start event
self.engine.emit(
{
"event": "task_wrap_start",
"event": "tool_wrap_start",
"ts": time.perf_counter(),
"task_name": task_name,
"tool_name": task_name,
"wrap_id": wrap_id,
}
)

@wraps(func)
async def wrapper(*args, **kwargs):
return await self.engine.execute_tool(func, *args, **kwargs)
invocation_id = str(uuid.uuid4())
self.engine.emit(
{
"event": "tool_invoke_start",
"ts": time.perf_counter(),
"tool_name": task_name,
"invocation_id": invocation_id,
}
)
result = await self.engine.execute_tool(
func, *args, invocation_id=invocation_id, **kwargs
)
self.engine.emit(
{
"event": "tool_invoke_end",
"ts": time.perf_counter(),
"tool_name": task_name,
"invocation_id": invocation_id,
}
)
return result

# Emit setup end event
self.engine.emit(
{
"event": "task_wrap_end",
"event": "tool_wrap_end",
"ts": time.perf_counter(),
"task_name": task_name,
"tool_name": task_name,
"wrap_id": wrap_id,
}
)
Expand Down
1 change: 1 addition & 0 deletions src/flowgentic/backend_engines/parsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(
observer: Optional[Callable[[Dict[str, Any]], None]] = None,
):
super().__init__(observer=observer)
parsl.clear()
parsl.load(config)

self._task_registry: Dict[str, Any] = {}
Expand Down
Loading
Loading