From a521a075f550970e2a1801a36a4fd6210be3db44 Mon Sep 17 00:00:00 2001 From: HamidA1998 <165214817+HamidA1998@users.noreply.github.com> Date: Mon, 11 May 2026 14:01:46 +0100 Subject: [PATCH] feat: add CrewAI APort task decorator --- examples/agent-frameworks/crewai/README.md | 134 +++++++++ .../crewai/examples/basic_usage.py | 34 +++ .../agent-frameworks/crewai/pyproject.toml | 42 +++ .../crewai/src/aport_crewai/__init__.py | 15 + .../crewai/src/aport_crewai/decorator.py | 271 ++++++++++++++++++ .../crewai/tests/test_decorator.py | 142 +++++++++ 6 files changed, 638 insertions(+) create mode 100644 examples/agent-frameworks/crewai/README.md create mode 100644 examples/agent-frameworks/crewai/examples/basic_usage.py create mode 100644 examples/agent-frameworks/crewai/pyproject.toml create mode 100644 examples/agent-frameworks/crewai/src/aport_crewai/__init__.py create mode 100644 examples/agent-frameworks/crewai/src/aport_crewai/decorator.py create mode 100644 examples/agent-frameworks/crewai/tests/test_decorator.py diff --git a/examples/agent-frameworks/crewai/README.md b/examples/agent-frameworks/crewai/README.md new file mode 100644 index 0000000..293be75 --- /dev/null +++ b/examples/agent-frameworks/crewai/README.md @@ -0,0 +1,134 @@ +# APort CrewAI Task Decorator + +This integration adds an `@aport_verify` decorator that verifies an APort +agent before a CrewAI task function runs. + +It is dependency-light by design: + +- Works with normal Python task functions and async task functions +- Accepts any client with a `verify(policy, agent_id, context)` method +- Falls back to a small standard-library HTTP client when no client is provided +- Preserves task metadata with `functools.wraps` +- Raises a clear `APortVerificationError` before task execution when verification is denied + +## Installation + +```bash +cd examples/agent-frameworks/crewai +pip install -e . +``` + +CrewAI itself is optional for testing this decorator. Install the optional +extra when using it inside a real CrewAI project: + +```bash +pip install -e ".[crewai]" +``` + +## Basic Usage + +```python +from aport_crewai import aport_verify + + +@aport_verify( + "finance.payment.refund.v1", + agent_id="agt_inst_refund_bot_123", + context=lambda order_id, amount: { + "order_id": order_id, + "amount": amount, + "currency": "USD", + }, +) +def create_refund(order_id: str, amount: float) -> dict: + return {"order_id": order_id, "amount": amount, "status": "created"} +``` + +The task body only executes after APort verification succeeds. + +## Agent ID Sources + +Pass an agent ID directly: + +```python +@aport_verify("data.export.v1", agent_id="agt_inst_data_exporter_456") +def export_rows(rows: int) -> int: + return rows +``` + +Derive it from task arguments: + +```python +@aport_verify( + "data.export.v1", + agent_id=lambda agent_id, rows: agent_id, + context=lambda agent_id, rows: {"rows": rows}, +) +def export_rows(agent_id: str, rows: int) -> int: + return rows +``` + +Or provide it at runtime with `aport_agent_id`, `agent_id`, or the +`APORT_AGENT_ID` environment variable. + +## Client Configuration + +By default, the decorator uses `APortHTTPClient`, which reads: + +```bash +APORT_API_KEY=your_api_key_here +APORT_BASE_URL=https://api.aport.io +APORT_AGENT_ID=agt_inst_refund_bot_123 +``` + +For tests or custom SDK clients, inject your own client: + +```python +class MyAPortClient: + def verify(self, policy, agent_id, context=None): + return {"allowed": True} + + +@aport_verify("admin.access.v1", agent_id="agt_inst_admin_123", client=MyAPortClient()) +def admin_task(): + return "allowed" +``` + +## Async Tasks + +```python +@aport_verify("code.repository.merge.v1", agent_id="agt_inst_pr_merger_789") +async def merge_pull_request() -> str: + return "merged" +``` + +If the decorated task is async, async verification clients are awaited automatically. + +## Error Handling + +Verification denial always raises `APortVerificationError` before the task body runs. + +```python +from aport_crewai import APortVerificationError + +try: + create_refund("ord_123", 250) +except APortVerificationError as exc: + print(exc) +``` + +Set `strict=False` to let the task continue when the APort client itself is +temporarily unavailable. Explicit deny responses still block execution. + +## Run the Example + +```bash +python examples/basic_usage.py +``` + +## Tests + +```bash +python -m unittest discover -s tests +python -m compileall src tests examples +``` diff --git a/examples/agent-frameworks/crewai/examples/basic_usage.py b/examples/agent-frameworks/crewai/examples/basic_usage.py new file mode 100644 index 0000000..89434bd --- /dev/null +++ b/examples/agent-frameworks/crewai/examples/basic_usage.py @@ -0,0 +1,34 @@ +"""Basic CrewAI task usage with the APort verification decorator.""" + +from aport_crewai import aport_verify + + +class DemoAPortClient: + """Example client used to keep this demo runnable without credentials.""" + + def verify(self, policy, agent_id, context=None): + print(f"Verifying {agent_id} against {policy} with context {context}") + return {"allowed": True} + + +@aport_verify( + "finance.payment.refund.v1", + agent_id="agt_inst_refund_bot_123", + client=DemoAPortClient(), + context=lambda order_id, amount: { + "order_id": order_id, + "amount": amount, + "currency": "USD", + }, +) +def create_refund(order_id: str, amount: float) -> dict: + """CrewAI task body that only runs after APort verification succeeds.""" + return { + "refund_id": f"refund_{order_id}", + "amount": amount, + "status": "created", + } + + +if __name__ == "__main__": + print(create_refund("ord_123", 42.5)) diff --git a/examples/agent-frameworks/crewai/pyproject.toml b/examples/agent-frameworks/crewai/pyproject.toml new file mode 100644 index 0000000..f3c6692 --- /dev/null +++ b/examples/agent-frameworks/crewai/pyproject.toml @@ -0,0 +1,42 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "aport-crewai" +version = "0.1.0" +description = "APort verification decorator for CrewAI tasks" +authors = [ + {name = "APort Community", email = "community@aport.io"} +] +license = {text = "MIT"} +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [] + +[project.optional-dependencies] +crewai = [ + "crewai>=0.41.0" +] +dev = [ + "pytest>=7.0.0" +] + +[project.urls] +Homepage = "https://github.com/aporthq/aport-integrations" +Repository = "https://github.com/aporthq/aport-integrations" +Documentation = "https://aport.io/docs" +Issues = "https://github.com/aporthq/aport-integrations/issues" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/examples/agent-frameworks/crewai/src/aport_crewai/__init__.py b/examples/agent-frameworks/crewai/src/aport_crewai/__init__.py new file mode 100644 index 0000000..517ee51 --- /dev/null +++ b/examples/agent-frameworks/crewai/src/aport_crewai/__init__.py @@ -0,0 +1,15 @@ +"""APort verification helpers for CrewAI tasks.""" + +from .decorator import ( + APortHTTPClient, + APortVerificationError, + aport_verify, +) + +__all__ = [ + "APortHTTPClient", + "APortVerificationError", + "aport_verify", +] + +__version__ = "0.1.0" diff --git a/examples/agent-frameworks/crewai/src/aport_crewai/decorator.py b/examples/agent-frameworks/crewai/src/aport_crewai/decorator.py new file mode 100644 index 0000000..f04dda6 --- /dev/null +++ b/examples/agent-frameworks/crewai/src/aport_crewai/decorator.py @@ -0,0 +1,271 @@ +"""APort verification decorator for CrewAI task functions.""" + +from __future__ import annotations + +import asyncio +import functools +import inspect +import json +import os +from dataclasses import dataclass +from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Protocol, TypeVar, Union +from urllib import error, request + +F = TypeVar("F", bound=Callable[..., Any]) +ContextFactory = Callable[..., Mapping[str, Any]] +AgentIdFactory = Callable[..., Optional[str]] +VerificationResult = Union[Mapping[str, Any], Any] + + +class APortClientProtocol(Protocol): + """Client interface expected by the decorator.""" + + def verify( + self, + policy: str, + agent_id: str, + context: Optional[Mapping[str, Any]] = None, + ) -> Union[VerificationResult, Awaitable[VerificationResult]]: + """Verify an agent against an APort policy pack.""" + + +class APortVerificationError(Exception): + """Raised when APort verification fails or cannot be completed.""" + + def __init__(self, message: str, details: Optional[Mapping[str, Any]] = None): + super().__init__(message) + self.details = dict(details or {}) + + +@dataclass +class APortHTTPClient: + """Small dependency-free HTTP client for APort verification calls.""" + + api_key: Optional[str] = None + base_url: Optional[str] = None + timeout: float = 10.0 + + def __post_init__(self) -> None: + self.api_key = self.api_key or os.getenv("APORT_API_KEY") + self.base_url = ( + self.base_url or os.getenv("APORT_BASE_URL") or "https://api.aport.io" + ).rstrip("/") + + def verify( + self, + policy: str, + agent_id: str, + context: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + """Verify an agent with APort's verify API.""" + payload = json.dumps( + { + "policy": policy, + "agent_id": agent_id, + "context": dict(context or {}), + } + ).encode("utf-8") + + headers = { + "Content-Type": "application/json", + "User-Agent": "aport-crewai/0.1.0", + } + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + req = request.Request( + f"{self.base_url}/api/verify", + data=payload, + headers=headers, + method="POST", + ) + + try: + with request.urlopen(req, timeout=self.timeout) as response: + return json.loads(response.read().decode("utf-8")) + except error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + raise APortVerificationError( + f"APort verification request failed with HTTP {exc.code}", + {"status": exc.code, "body": body}, + ) from exc + + +def aport_verify( + policy_pack: str, + *, + agent_id: Optional[Union[str, AgentIdFactory]] = None, + client: Optional[APortClientProtocol] = None, + context: Optional[Union[Mapping[str, Any], ContextFactory]] = None, + strict: bool = True, +) -> Callable[[F], F]: + """Verify an APort agent before running a CrewAI task function. + + Args: + policy_pack: APort policy pack identifier, for example + ``finance.payment.refund.v1``. + agent_id: Static agent ID, a callable that derives the agent ID from + task arguments, or None to read from kwargs/env. + client: APort client with a ``verify(policy, agent_id, context)`` method. + context: Static context mapping or callable that derives context from + task arguments. + strict: If true, API/client errors stop the task. If false, client + errors are attached to context and the task continues. Explicit + verification denial always stops the task. + """ + + if not policy_pack: + raise ValueError("policy_pack is required") + + verifier = client or APortHTTPClient() + + def decorator(func: F) -> F: + if inspect.iscoroutinefunction(func): + + @functools.wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> Any: + await _verify_async( + verifier, + policy_pack, + _resolve_agent_id(agent_id, args, kwargs), + _build_context(func, context, args, kwargs), + strict, + ) + return await func(*args, **kwargs) + + return async_wrapper # type: ignore[return-value] + + @functools.wraps(func) + def sync_wrapper(*args: Any, **kwargs: Any) -> Any: + _verify_sync( + verifier, + policy_pack, + _resolve_agent_id(agent_id, args, kwargs), + _build_context(func, context, args, kwargs), + strict, + ) + return func(*args, **kwargs) + + return sync_wrapper # type: ignore[return-value] + + return decorator + + +async def _verify_async( + client: APortClientProtocol, + policy_pack: str, + agent_id: str, + context: Mapping[str, Any], + strict: bool, +) -> None: + try: + result = client.verify(policy_pack, agent_id, context) + if inspect.isawaitable(result): + result = await result + _raise_if_denied(result) + except APortVerificationError: + raise + except Exception as exc: + if strict: + raise APortVerificationError("APort verification failed before task execution") from exc + + +def _verify_sync( + client: APortClientProtocol, + policy_pack: str, + agent_id: str, + context: Mapping[str, Any], + strict: bool, +) -> None: + try: + result = client.verify(policy_pack, agent_id, context) + if inspect.isawaitable(result): + result = _run_awaitable(result) + _raise_if_denied(result) + except APortVerificationError: + raise + except Exception as exc: + if strict: + raise APortVerificationError("APort verification failed before task execution") from exc + + +def _run_awaitable(awaitable: Awaitable[VerificationResult]) -> VerificationResult: + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(awaitable) + raise APortVerificationError( + "Synchronous task received an async APort client while an event loop is already running. " + "Use an async task function or a synchronous client." + ) + + +def _resolve_agent_id( + configured_agent_id: Optional[Union[str, AgentIdFactory]], + args: tuple[Any, ...], + kwargs: Dict[str, Any], +) -> str: + if callable(configured_agent_id): + value = configured_agent_id(*args, **kwargs) + else: + value = configured_agent_id + + value = ( + value + or kwargs.get("aport_agent_id") + or kwargs.get("agent_id") + or os.getenv("APORT_AGENT_ID") + ) + + if not value: + raise APortVerificationError( + "APort agent_id is required. Pass agent_id, aport_agent_id, or set APORT_AGENT_ID." + ) + + return str(value) + + +def _build_context( + func: Callable[..., Any], + configured_context: Optional[Union[Mapping[str, Any], ContextFactory]], + args: tuple[Any, ...], + kwargs: Dict[str, Any], +) -> Mapping[str, Any]: + if callable(configured_context): + user_context = dict(configured_context(*args, **kwargs)) + else: + user_context = dict(configured_context or {}) + + return { + "framework": "crewai", + "task_name": func.__name__, + **user_context, + } + + +def _raise_if_denied(result: VerificationResult) -> None: + allowed = _read_result_flag(result, ("allowed", "verified", "success", "ok")) + if allowed is False: + reason = ( + _read_result_value(result, ("reason", "message", "error")) + or "APort verification denied" + ) + raise APortVerificationError(str(reason), {"result": result}) + + +def _read_result_flag(result: VerificationResult, names: tuple[str, ...]) -> Optional[bool]: + value = _read_result_value(result, names) + return value if isinstance(value, bool) else None + + +def _read_result_value(result: VerificationResult, names: tuple[str, ...]) -> Any: + if isinstance(result, Mapping): + for name in names: + if name in result: + return result[name] + return None + + for name in names: + if hasattr(result, name): + return getattr(result, name) + return None diff --git a/examples/agent-frameworks/crewai/tests/test_decorator.py b/examples/agent-frameworks/crewai/tests/test_decorator.py new file mode 100644 index 0000000..36790e6 --- /dev/null +++ b/examples/agent-frameworks/crewai/tests/test_decorator.py @@ -0,0 +1,142 @@ +import os +import unittest + +from aport_crewai import APortVerificationError, aport_verify + + +class FakeClient: + def __init__(self, result=None, error=None): + self.result = result if result is not None else {"allowed": True} + self.error = error + self.calls = [] + + def verify(self, policy, agent_id, context=None): + self.calls.append((policy, agent_id, context)) + if self.error: + raise self.error + return self.result + + +class AsyncFakeClient(FakeClient): + async def verify(self, policy, agent_id, context=None): + self.calls.append((policy, agent_id, context)) + if self.error: + raise self.error + return self.result + + +class APortVerifyTests(unittest.TestCase): + def test_sync_task_runs_after_successful_verification(self): + client = FakeClient({"allowed": True}) + calls = [] + + @aport_verify( + "finance.payment.refund.v1", + agent_id="agt_inst_refund_bot_123", + client=client, + context={"amount": 50}, + ) + def refund_task(): + calls.append("task") + return "ok" + + self.assertEqual(refund_task(), "ok") + self.assertEqual(calls, ["task"]) + self.assertEqual(client.calls[0][0], "finance.payment.refund.v1") + self.assertEqual(client.calls[0][1], "agt_inst_refund_bot_123") + self.assertEqual(client.calls[0][2]["framework"], "crewai") + self.assertEqual(client.calls[0][2]["amount"], 50) + + def test_denied_verification_stops_task(self): + client = FakeClient({"allowed": False, "reason": "refund limit exceeded"}) + calls = [] + + @aport_verify( + "finance.payment.refund.v1", + agent_id="agt_inst_refund_bot_123", + client=client, + ) + def refund_task(): + calls.append("task") + + with self.assertRaises(APortVerificationError) as error: + refund_task() + + self.assertIn("refund limit exceeded", str(error.exception)) + self.assertEqual(calls, []) + + def test_agent_id_can_be_derived_from_task_arguments(self): + client = FakeClient() + + @aport_verify( + "data.export.v1", + agent_id=lambda agent_id, rows: agent_id, + client=client, + context=lambda agent_id, rows: {"rows": rows}, + ) + def export_task(agent_id, rows): + return rows + + self.assertEqual(export_task("agt_inst_data_exporter_456", 1000), 1000) + self.assertEqual(client.calls[0][1], "agt_inst_data_exporter_456") + self.assertEqual(client.calls[0][2]["rows"], 1000) + + def test_agent_id_can_come_from_environment(self): + client = FakeClient() + old_agent_id = os.environ.get("APORT_AGENT_ID") + os.environ["APORT_AGENT_ID"] = "agt_inst_env_123" + + try: + @aport_verify("admin.access.v1", client=client) + def admin_task(): + return "admin" + + self.assertEqual(admin_task(), "admin") + self.assertEqual(client.calls[0][1], "agt_inst_env_123") + finally: + if old_agent_id is None: + os.environ.pop("APORT_AGENT_ID", None) + else: + os.environ["APORT_AGENT_ID"] = old_agent_id + + def test_missing_agent_id_raises_clear_error(self): + client = FakeClient() + + @aport_verify("finance.payment.refund.v1", client=client) + def refund_task(): + return "ok" + + with self.assertRaises(APortVerificationError) as error: + refund_task() + + self.assertIn("agent_id is required", str(error.exception)) + + def test_strict_false_allows_client_errors(self): + client = FakeClient(error=RuntimeError("network down")) + + @aport_verify( + "finance.payment.refund.v1", + agent_id="agt_inst_refund_bot_123", + client=client, + strict=False, + ) + def refund_task(): + return "ok" + + self.assertEqual(refund_task(), "ok") + + +class APortVerifyAsyncTests(unittest.IsolatedAsyncioTestCase): + async def test_async_task_runs_after_async_verification(self): + client = AsyncFakeClient({"verified": True}) + + @aport_verify("code.repository.merge.v1", agent_id="agt_inst_pr_merger_789", client=client) + async def merge_task(): + return "merged" + + self.assertEqual(await merge_task(), "merged") + self.assertEqual(client.calls[0][0], "code.repository.merge.v1") + + +if __name__ == "__main__": + unittest.main()