diff --git a/docker-compose.local.yml b/docker-compose.local.yml index e88c1da..c7d9da0 100644 --- a/docker-compose.local.yml +++ b/docker-compose.local.yml @@ -12,15 +12,13 @@ services: build: context: . dockerfile: Dockerfile + env_file: .env environment: - AGENTFIELD_SERVER=http://host.docker.internal:8080 - NODE_ID=swe-planner - PORT=8003 # Callback URL for control plane to reach this agent - AGENT_CALLBACK_URL=http://localhost:8003 - - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} - - CLAUDE_CODE_OAUTH_TOKEN=${CLAUDE_CODE_OAUTH_TOKEN} - - GH_TOKEN=${GH_TOKEN} ports: - "8003:8003" volumes: diff --git a/pyproject.toml b/pyproject.toml index 2e2ae3a..e69320c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,9 @@ requires-python = ">=3.12" dependencies = [ "agentfield>=0.1.9", "pydantic>=2.0", + "httpx>=0.25", + "aiohttp>=3.9", + "python-dotenv>=1.0", # Compatibility pin: newer SDK builds have surfaced # "Unknown message type: rate_limit_event" during streaming. "claude-agent-sdk==0.1.20", diff --git a/requirements.txt b/requirements.txt index d2cbc81..3281be6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,6 @@ # Install: python -m pip install -r requirements.txt agentfield>=0.1.9 +hax-sdk>=0.1.0 pydantic>=2.0 claude-agent-sdk==0.1.20 diff --git a/swe_af/app.py b/swe_af/app.py index 1f7693d..6b6409a 100644 --- a/swe_af/app.py +++ b/swe_af/app.py @@ -13,11 +13,16 @@ import subprocess import uuid +from dotenv import load_dotenv + +load_dotenv() # Load .env file so HAX_API_KEY (and other vars) are available + from swe_af.reasoners import router from swe_af.reasoners.pipeline import _assign_sequence_numbers, _compute_levels, _validate_file_conflicts from swe_af.reasoners.schemas import PlanResult, ReviewResult from agentfield import Agent +from agentfield.client import ApprovalResult from swe_af.execution.envelope import unwrap_call_result as _unwrap from swe_af.execution.schemas import ( BuildConfig, @@ -169,6 +174,61 @@ def _run() -> str: ) +def _format_plan_for_approval( + plan_result: dict, +) -> tuple[str, str, str, list[dict]]: + """Format plan_result into template-ready fields. + + Returns (plan_summary, prd_markdown, architecture_markdown, issues_for_template). + """ + plan_summary = plan_result.get("rationale", "") + prd_data = plan_result.get("prd", {}) + arch_data = plan_result.get("architecture", {}) + + # Format PRD as markdown + prd_md_parts: list[str] = [] + if prd_data.get("validated_description"): + prd_md_parts.append(f"## Description\n{prd_data['validated_description']}") + if prd_data.get("must_have"): + prd_md_parts.append("## Must Have\n" + "\n".join(f"- {item}" for item in prd_data["must_have"])) + if prd_data.get("nice_to_have"): + prd_md_parts.append("## Nice to Have\n" + "\n".join(f"- {item}" for item in prd_data["nice_to_have"])) + if prd_data.get("acceptance_criteria"): + prd_md_parts.append("## Acceptance Criteria\n" + "\n".join(f"- {item}" for item in prd_data["acceptance_criteria"])) + prd_markdown = "\n\n".join(prd_md_parts) if prd_md_parts else "" + + # Format architecture as markdown + arch_md_parts: list[str] = [] + if arch_data.get("summary"): + arch_md_parts.append(f"## Summary\n{arch_data['summary']}") + if arch_data.get("components"): + arch_md_parts.append("## Components") + for comp in arch_data["components"]: + arch_md_parts.append(f"### {comp.get('name', 'Component')}\n{comp.get('responsibility', '')}") + if comp.get("touches_files"): + arch_md_parts.append("Files: " + ", ".join(f"`{f}`" for f in comp["touches_files"])) + if arch_data.get("decisions"): + arch_md_parts.append("## Key Decisions") + for dec in arch_data["decisions"]: + arch_md_parts.append(f"- **{dec.get('decision', '')}**: {dec.get('rationale', '')}") + architecture_markdown = "\n\n".join(arch_md_parts) if arch_md_parts else "" + + # Format issues for the template + issues_for_template = [] + for issue in plan_result.get("issues", []): + issues_for_template.append({ + "name": issue.get("name", ""), + "title": issue.get("title", ""), + "description": issue.get("description", ""), + "dependsOn": issue.get("depends_on", []), + "filesToModify": issue.get("files_to_modify", []), + "filesToCreate": issue.get("files_to_create", []), + "acceptanceCriteria": issue.get("acceptance_criteria", []), + }) + + return plan_summary, prd_markdown, architecture_markdown, issues_for_template + + @app.reasoner() async def build( goal: str, @@ -419,6 +479,229 @@ async def build( tags=["build", "git_init", "error"], ) + # 1.5 APPROVAL CHECKPOINT — pause for human plan review if configured + # Automatically enabled when HAX_API_KEY is set in the environment. + # SWE-AF calls hax-sdk directly; the CP only manages execution state. + # Supports iterative revision: reviewer can "request changes" and the agent + # will re-plan (Architect → TechLead → SprintPlanner) with the feedback. + _hax_api_key = os.environ.get("HAX_API_KEY", "").strip() + execution_id = app.ctx.execution_id if app.ctx else "" + _approval_enabled = bool(_hax_api_key) + app.note( + f"Approval check: HAX_API_KEY={'set' if _approval_enabled else 'NOT SET'}, " + f"execution_id={'present' if execution_id else 'MISSING'}", + tags=["build", "approval", "check"], + ) + if _approval_enabled and execution_id: + import json as _json + from hax import HaxClient + + hax_client = HaxClient( + api_key=_hax_api_key, + base_url=os.environ.get("HAX_SDK_URL", "http://localhost:3000") + "/api/v1", + ) + cp_base_url = (app.agentfield_server or "http://localhost:8080").rstrip("/") + approval_state_path = os.path.join(abs_artifacts_dir, "approval_state.json") + os.makedirs(os.path.dirname(approval_state_path), exist_ok=True) + revision_history: list[dict] = [] + + for revision_iter in range(cfg.max_plan_revision_iterations + 1): + app.note( + f"Phase 1.5: Requesting plan approval (iteration {revision_iter})", + tags=["build", "approval"], + ) + + plan_summary, prd_markdown, architecture_markdown, issues_for_template = ( + _format_plan_for_approval(plan_result) + ) + + # Create the approval request on hax-sdk + title = "SWE-AF Plan Review" + if revision_iter > 0: + title = f"SWE-AF Plan Review (Revision {revision_iter})" + + hax_payload = { + "planSummary": plan_summary, + "issues": issues_for_template, + "architecture": architecture_markdown, + "prd": prd_markdown, + "metadata": { + "repoUrl": cfg.repo_url, + "goalDescription": goal, + "agentNodeId": NODE_ID, + "executionId": execution_id, + }, + "revisionNumber": revision_iter, + "revisionHistory": revision_history, + } + + hax_create_kwargs: dict = { + "type": "plan-review-v2", + "title": title, + "description": "Review the proposed implementation plan before execution begins", + "payload": hax_payload, + "webhook_url": f"{cp_base_url}/api/v1/webhooks/approval-response", + "expires_in_seconds": cfg.approval_expires_in_hours * 3600, + } + approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "") + if approval_user_id: + hax_create_kwargs["user_id"] = approval_user_id + + hax_request = hax_client.create_request(**hax_create_kwargs) + + # Save pending state + with open(approval_state_path, "w") as _fp: + _json.dump({ + "decision": "pending", + "feedback": "", + "request_id": hax_request.id, + "request_url": hax_request.url, + "revision_number": revision_iter, + }, _fp, indent=2) + + # Pause execution — SDK transitions CP to "waiting" and waits for + # the webhook callback (hax-sdk → CP → agent /webhooks/approval) + approval_result = await app.pause( + approval_request_id=hax_request.id, + approval_request_url=hax_request.url, + expires_in_hours=cfg.approval_expires_in_hours, + ) + + # Persist decision + with open(approval_state_path, "w") as _f: + _json.dump({ + "decision": approval_result.decision, + "feedback": approval_result.feedback, + "request_id": approval_result.approval_request_id, + "request_url": hax_request.url, + "revision_number": revision_iter, + "revision_history": revision_history, + }, _f, indent=2) + + if approval_result.approved: + app.note( + "Plan approved — proceeding to execution", + tags=["build", "approval", "approved"], + ) + break + + if approval_result.changes_requested: + # Check if we've exhausted revision attempts + if revision_iter >= cfg.max_plan_revision_iterations: + app.note( + f"Max plan revision iterations ({cfg.max_plan_revision_iterations}) reached", + tags=["build", "approval", "exhausted"], + ) + return BuildResult( + plan_result=plan_result, + dag_state={}, + success=False, + summary=f"Plan revision limit reached after {revision_iter + 1} iterations", + ).model_dump() + + revision_history.append({ + "iteration": revision_iter, + "feedback": approval_result.feedback, + }) + + app.note( + f"Changes requested (iteration {revision_iter}): " + f"{approval_result.feedback[:200]}", + tags=["build", "approval", "request_changes"], + ) + + # Re-plan: Architect → Tech Lead review loop → Sprint Planner + # (skip PM — the PRD/scope doesn't change from reviewer feedback) + app.note( + f"Re-planning with human feedback (revision {revision_iter + 1})", + tags=["build", "replan"], + ) + + arch = _unwrap(await app.call( + f"{NODE_ID}.run_architect", + prd=plan_result.get("prd", {}), + repo_path=repo_path, + artifacts_dir=artifacts_dir, + feedback=approval_result.feedback, + model=resolved["architect_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_architect (human revision)") + + review = None + for tl_iter in range(cfg.max_review_iterations + 1): + review = _unwrap(await app.call( + f"{NODE_ID}.run_tech_lead", + prd=plan_result.get("prd", {}), + repo_path=repo_path, + artifacts_dir=artifacts_dir, + revision_number=tl_iter, + model=resolved["tech_lead_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_tech_lead") + if review["approved"]: + break + if tl_iter < cfg.max_review_iterations: + arch = _unwrap(await app.call( + f"{NODE_ID}.run_architect", + prd=plan_result.get("prd", {}), + repo_path=repo_path, + artifacts_dir=artifacts_dir, + feedback=review["feedback"], + model=resolved["architect_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_architect (tech lead revision)") + + # Force-approve if tech lead iterations exhausted + if review and not review["approved"]: + review = ReviewResult( + approved=True, + feedback=review["feedback"], + scope_issues=review.get("scope_issues", []), + complexity_assessment=review.get("complexity_assessment", "appropriate"), + summary=review["summary"] + " [auto-approved after max iterations]", + ).model_dump() + + sprint_result = _unwrap(await app.call( + f"{NODE_ID}.run_sprint_planner", + prd=plan_result.get("prd", {}), + architecture=arch, + repo_path=repo_path, + artifacts_dir=artifacts_dir, + model=resolved["sprint_planner_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_sprint_planner (revision)") + + # Update plan_result with revised architecture + issues + plan_result = { + **plan_result, + "architecture": arch, + "review": review, + "issues": sprint_result["issues"], + "rationale": sprint_result["rationale"], + } + continue + + # Terminal rejection or expired/error + reason = approval_result.feedback or approval_result.decision + app.note( + f"Plan {approval_result.decision} by human reviewer: {reason}", + tags=["build", "approval", approval_result.decision], + ) + return BuildResult( + plan_result=plan_result, + dag_state={}, + success=False, + summary=f"Plan {approval_result.decision}: {reason}", + ).model_dump() + # 2. EXECUTE exec_config = cfg.to_execution_config_dict() @@ -1069,6 +1352,56 @@ async def resume_build( "rationale": checkpoint.get("original_plan_summary", ""), } + # Check for pending approval state — if process crashed while waiting, + # resume polling instead of re-requesting approval. + approval_state_path = os.path.join(base, "approval_state.json") + if os.path.exists(approval_state_path): + with open(approval_state_path, "r") as f: + approval_state = json.load(f) + + prev_decision = approval_state.get("decision", "") + + if prev_decision == "approved": + app.note( + "Approval already granted in previous run — skipping", + tags=["build", "resume", "approval", "skipped"], + ) + elif prev_decision in ("rejected", "expired", "error"): + app.note( + f"Previous approval was {prev_decision} — cannot resume", + tags=["build", "resume", "approval", prev_decision], + ) + return { + "success": False, + "error": f"Plan was {prev_decision} in previous run", + } + elif prev_decision == "pending" and approval_state.get("request_id"): + # Process crashed while waiting — resume via SDK wait_for_resume + app.note( + "Resuming approval wait from previous run", + tags=["build", "resume", "approval", "waiting"], + ) + execution_id = app.ctx.execution_id if app.ctx else "" + if execution_id: + result = await app.wait_for_resume( + approval_request_id=approval_state["request_id"], + execution_id=execution_id, + ) + # Update saved state + with open(approval_state_path, "w") as f: + json.dump({ + "decision": result.decision, + "feedback": result.feedback, + "request_id": result.approval_request_id, + "request_url": approval_state.get("request_url", ""), + }, f, indent=2) + + if not result.approved: + return { + "success": False, + "error": f"Plan {result.decision}: {result.feedback}", + } + app.note("Resuming build from checkpoint", tags=["build", "resume"]) result = await app.call( diff --git a/swe_af/approval.py b/swe_af/approval.py new file mode 100644 index 0000000..130e58e --- /dev/null +++ b/swe_af/approval.py @@ -0,0 +1,467 @@ +"""Approval workflow client for pausing SWE-AF execution for human review. + +Calls hax-sdk directly to create approval requests, then notifies the +AgentField control plane to transition execution state to "waiting". +When the approval resolves (via hax-sdk webhook), notifies the CP to +transition back. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import socket +from aiohttp import web +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import httpx + +if TYPE_CHECKING: + from agentfield import Agent + +logger = logging.getLogger(__name__) + +# Environment variables for hax-sdk integration +HAX_API_KEY_ENV = "HAX_API_KEY" +HAX_SDK_URL_ENV = "HAX_SDK_URL" +HAX_SDK_URL_DEFAULT = "http://localhost:3000" + + +def is_approval_enabled() -> bool: + """Check whether approval is enabled (HAX_API_KEY is set).""" + return bool(os.environ.get(HAX_API_KEY_ENV, "").strip()) + + +@dataclass +class ApprovalResult: + """Outcome of a human approval request.""" + + decision: str # "approved", "rejected", "request_changes", "expired", "error" + feedback: str = "" + request_id: str = "" + request_url: str = "" + raw_response: dict = field(default_factory=dict) + + @property + def approved(self) -> bool: + return self.decision == "approved" + + @property + def changes_requested(self) -> bool: + return self.decision == "request_changes" + + +def _find_free_port() -> int: + """Find an available TCP port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +class _CallbackServer: + """Lightweight HTTP server that receives a single approval callback then shuts down.""" + + def __init__(self) -> None: + self._event: asyncio.Event = asyncio.Event() + self._result: dict | None = None + self._port: int = 0 + self._runner: web.AppRunner | None = None + + @property + def url(self) -> str: + return f"http://127.0.0.1:{self._port}/approval-callback" + + async def start(self) -> str: + """Start the callback server and return its URL.""" + app = web.Application() + app.router.add_post("/approval-callback", self._handle) + + self._port = _find_free_port() + self._runner = web.AppRunner(app) + await self._runner.setup() + site = web.TCPSite(self._runner, "127.0.0.1", self._port) + await site.start() + logger.info("Approval callback server listening on %s", self.url) + return self.url + + async def _handle(self, request: web.Request) -> web.Response: + """Handle the approval callback from hax-sdk.""" + try: + self._result = await request.json() + logger.info("Received approval callback: %s", self._result) + except Exception: + self._result = {"decision": "error", "feedback": "malformed callback payload"} + self._event.set() + return web.json_response({"status": "received"}) + + async def wait(self, timeout: float) -> dict | None: + """Wait for the callback. Returns the payload or None on timeout.""" + try: + await asyncio.wait_for(self._event.wait(), timeout=timeout) + return self._result + except asyncio.TimeoutError: + return None + + async def stop(self) -> None: + """Shut down the callback server.""" + if self._runner: + await self._runner.cleanup() + self._runner = None + + +class ApprovalClient: + """Handles approval workflow: hax-sdk for requests, CP for execution state. + + Parameters + ---------- + agent: + The AgentField ``Agent`` instance (provides ``agentfield_server``, + ``api_key``, ``node_id``, and ``note()``). + """ + + # Default timeout for waiting on the callback (matches default expiry) + DEFAULT_TIMEOUT = 72 * 3600 # 72 hours in seconds + + def __init__(self, agent: Agent) -> None: + self._agent = agent + self._cp_base_url = agent.agentfield_server.rstrip("/") + self._cp_api_key = agent.api_key or "" + self._hax_api_key = os.environ.get(HAX_API_KEY_ENV, "") + self._hax_sdk_url = os.environ.get(HAX_SDK_URL_ENV, HAX_SDK_URL_DEFAULT).rstrip("/") + + async def request_plan_approval( + self, + execution_id: str, + plan_summary: str, + issues: list[dict], + architecture: str, + prd: str, + goal_description: str = "", + repo_url: str = "", + expires_in_hours: int = 72, + on_request_created: callable | None = None, + revision_number: int = 0, + revision_history: list[dict] | None = None, + ) -> ApprovalResult: + """Request human approval for a plan. + + 1. Starts a local callback server + 2. Calls hax-sdk directly to create the approval request + 3. Notifies CP to transition execution to "waiting" + 4. Waits for hax-sdk webhook callback + 5. Notifies CP to resolve the approval + """ + node_id = self._agent.node_id + + # Build the plan-review-v2 template payload + payload = { + "planSummary": plan_summary, + "issues": issues, + "architecture": architecture, + "prd": prd, + "metadata": { + "repoUrl": repo_url, + "goalDescription": goal_description, + "agentNodeId": node_id, + "executionId": execution_id, + }, + "revisionNumber": revision_number, + "revisionHistory": revision_history or [], + } + + # Start callback server for hax-sdk webhook + callback = _CallbackServer() + callback_url = await callback.start() + + title = "SWE-AF Plan Review" + if revision_number > 0: + title = f"SWE-AF Plan Review (Revision {revision_number})" + + # --- Step 1: Create request on hax-sdk directly --- + hax_request_body = { + "title": title, + "description": "Review the proposed implementation plan before execution begins", + "type": "plan-review-v2", + "payload": payload, + "webhookUrl": callback_url, + "expiresInSeconds": expires_in_hours * 3600, + } + + # Optionally assign the request to a specific Hub user + approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "") + if approval_user_id: + hax_request_body["userId"] = approval_user_id + + self._agent.note( + f"Requesting plan approval for execution {execution_id}", + tags=["build", "approval", "request"], + ) + + try: + hax_resp = await self._hax_post("/api/v1/requests", json=hax_request_body) + except Exception as exc: + await callback.stop() + logger.error("Failed to create approval request on hax-sdk: %s", exc) + self._agent.note( + f"Approval request failed: {exc}", + tags=["build", "approval", "error"], + ) + return ApprovalResult( + decision="error", + feedback=f"Failed to create approval request: {exc}", + ) + + if hax_resp.status_code >= 400: + await callback.stop() + error_detail = hax_resp.text[:500] + logger.error( + "hax-sdk returned %d: %s", hax_resp.status_code, error_detail + ) + self._agent.note( + f"hax-sdk returned HTTP {hax_resp.status_code}", + tags=["build", "approval", "error"], + ) + return ApprovalResult( + decision="error", + feedback=f"hax-sdk HTTP {hax_resp.status_code}: {error_detail}", + ) + + hax_data = hax_resp.json() + request_id = hax_data.get("id", "") + request_url = hax_data.get("url", "") + + # --- Step 2: Tell CP to transition execution to "waiting" --- + try: + cp_body = { + "approval_request_id": request_id, + "approval_request_url": request_url, + "expires_in_hours": expires_in_hours, + } + cp_resp = await self._cp_post( + f"/api/v1/executions/{execution_id}/request-approval", + json=cp_body, + ) + if cp_resp.status_code >= 400: + logger.warning( + "CP request-approval returned %d: %s (non-fatal)", + cp_resp.status_code, cp_resp.text[:200], + ) + except Exception as exc: + logger.warning("Failed to notify CP of waiting state: %s (non-fatal)", exc) + + # Notify caller of request creation (for state persistence) + if on_request_created: + on_request_created(request_id, request_url) + + self._agent.note( + f"Approval requested — waiting for webhook callback at {callback_url}", + tags=["build", "approval", "waiting"], + ) + + # --- Step 3: Wait for hax-sdk webhook callback --- + timeout = expires_in_hours * 3600 + try: + result = await self._wait_for_callback( + callback, execution_id, request_id, request_url, timeout + ) + finally: + await callback.stop() + + # --- Step 4: Tell CP to resolve the approval state --- + await self._notify_cp_resolution(request_id, result) + + return result + + async def _wait_for_callback( + self, + callback: _CallbackServer, + execution_id: str, + request_id: str, + request_url: str, + timeout: float, + ) -> ApprovalResult: + """Wait for the hax-sdk webhook callback, falling back to status poll.""" + cb_data = await callback.wait(timeout) + + if cb_data is not None: + return self._parse_hax_webhook(cb_data, request_id, request_url) + + # Timeout — fall back to one status poll via CP + logger.warning("Callback timed out after %ds — falling back to status poll", timeout) + return await self._fallback_poll(execution_id, request_id, request_url) + + def _parse_hax_webhook(self, data: dict, request_id: str, request_url: str) -> ApprovalResult: + """Parse the hax-sdk webhook payload into an ApprovalResult. + + Supports two formats: + 1. hax-sdk envelope: {"type":"completed","data":{"requestId":"...","response":{"decision":"approved"}}} + 2. Flat format: {"decision":"approved","feedback":"..."} + """ + decision = "" + feedback = "" + raw_response = {} + + # Try hax-sdk envelope format + event_data = data.get("data") + if event_data and isinstance(event_data, dict): + response_obj = event_data.get("response", {}) + if isinstance(response_obj, dict): + decision = response_obj.get("decision", "") + feedback = response_obj.get("feedback", "") + raw_response = response_obj + + # Handle "expired" event type + if data.get("type") == "expired": + decision = "expired" + else: + # Flat format + decision = data.get("decision", "error") + feedback = data.get("feedback", "") + response_field = data.get("response") + if response_field: + try: + raw_response = json.loads(response_field) if isinstance(response_field, str) else response_field + if not feedback: + feedback = raw_response.get("feedback", "") + except (json.JSONDecodeError, AttributeError): + if not feedback: + feedback = str(response_field) + + if not decision: + decision = "error" + + self._agent.note( + f"Approval resolved: {decision}" + + (f" — feedback: {feedback[:200]}" if feedback else ""), + tags=["build", "approval", decision], + ) + + return ApprovalResult( + decision=decision, + feedback=feedback, + request_id=request_id, + request_url=request_url, + raw_response=raw_response, + ) + + async def _notify_cp_resolution(self, request_id: str, result: ApprovalResult) -> None: + """Notify the CP that the approval has resolved (best-effort).""" + try: + body = { + "requestId": request_id, + "decision": result.decision, + "feedback": result.feedback, + } + if result.raw_response: + body["response"] = json.dumps(result.raw_response) + + resp = await self._cp_post("/api/v1/webhooks/approval-response", json=body) + if resp.status_code >= 400: + logger.warning( + "CP approval-response returned %d: %s", + resp.status_code, resp.text[:200], + ) + except Exception as exc: + logger.warning("Failed to notify CP of approval resolution: %s", exc) + + async def _fallback_poll( + self, + execution_id: str, + request_id: str, + request_url: str, + ) -> ApprovalResult: + """Single status poll as fallback when callback doesn't arrive.""" + try: + resp = await self._cp_get( + f"/api/v1/executions/{execution_id}/approval-status", + ) + if resp.status_code < 400: + data = resp.json() + status = data.get("status", "unknown") + if status != "pending": + return ApprovalResult( + decision=status, + feedback="", + request_id=request_id, + request_url=request_url, + ) + except Exception as exc: + logger.warning("Fallback poll failed: %s", exc) + + return ApprovalResult( + decision="expired", + feedback="Approval timed out without response", + request_id=request_id, + request_url=request_url, + ) + + async def wait_for_approval( + self, + execution_id: str, + request_id: str, + request_url: str = "", + timeout: float | None = None, + ) -> ApprovalResult: + """Resume waiting for an approval (e.g. after crash recovery). + + Starts a callback server and waits for the result. + Falls back to polling if the callback doesn't arrive. + """ + callback = _CallbackServer() + await callback.start() + + effective_timeout = timeout or self.DEFAULT_TIMEOUT + + try: + result = await self._wait_for_callback( + callback, execution_id, request_id, request_url, effective_timeout + ) + finally: + await callback.stop() + + # Notify CP of resolution + await self._notify_cp_resolution(request_id, result) + + return result + + async def get_approval_status(self, execution_id: str) -> dict: + """One-shot poll of approval status (no blocking/retry).""" + resp = await self._cp_get( + f"/api/v1/executions/{execution_id}/approval-status", + ) + resp.raise_for_status() + return resp.json() + + # ------------------------------------------------------------------ # + # HTTP helpers # + # ------------------------------------------------------------------ # + + def _cp_headers(self) -> dict[str, str]: + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._cp_api_key: + headers["X-API-Key"] = self._cp_api_key + return headers + + def _hax_headers(self) -> dict[str, str]: + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._hax_api_key: + headers["Authorization"] = f"Bearer {self._hax_api_key}" + return headers + + async def _cp_post(self, path: str, **kwargs) -> httpx.Response: + url = self._cp_base_url + path + async with httpx.AsyncClient(timeout=30.0) as client: + return await client.post(url, headers=self._cp_headers(), **kwargs) + + async def _cp_get(self, path: str, **kwargs) -> httpx.Response: + url = self._cp_base_url + path + async with httpx.AsyncClient(timeout=30.0) as client: + return await client.get(url, headers=self._cp_headers(), **kwargs) + + async def _hax_post(self, path: str, **kwargs) -> httpx.Response: + url = self._hax_sdk_url + path + async with httpx.AsyncClient(timeout=30.0) as client: + return await client.post(url, headers=self._hax_headers(), **kwargs) diff --git a/swe_af/execution/schemas.py b/swe_af/execution/schemas.py index a12fa59..d7d6e23 100644 --- a/swe_af/execution/schemas.py +++ b/swe_af/execution/schemas.py @@ -602,6 +602,7 @@ class BuildConfig(BaseModel): models: dict[str, str] | None = None max_review_iterations: int = 2 + max_plan_revision_iterations: int = 2 max_retries_per_issue: int = 2 max_replans: int = 2 enable_replanning: bool = True @@ -625,6 +626,11 @@ class BuildConfig(BaseModel): max_concurrent_issues: int = 3 # max parallel issues per level (0 = unlimited) level_failure_abort_threshold: float = 0.8 # abort DAG when >= this fraction of a level fails + # Approval workflow — automatically enabled when the control plane has a + # hax-sdk API key configured. No explicit flag needed; SWE-AF always + # attempts approval and gracefully skips when the CP returns "not configured". + approval_expires_in_hours: int = 72 # how long before approval request expires + @model_validator(mode="before") @classmethod def _validate_v2_keys(cls, data: Any) -> Any: diff --git a/swe_af/fast/app.py b/swe_af/fast/app.py index 62aaeaf..c1f2641 100644 --- a/swe_af/fast/app.py +++ b/swe_af/fast/app.py @@ -12,6 +12,10 @@ import os import re +from dotenv import load_dotenv + +load_dotenv() + from agentfield import Agent from swe_af.execution.envelope import unwrap_call_result as _unwrap from swe_af.fast import fast_router