Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docker-compose.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ services:
build:
context: .
dockerfile: Dockerfile
env_file: .env
environment:
- AGENTFIELD_SERVER=http://host.docker.internal:8080
- NODE_ID=swe-planner
- PORT=8003
# Callback URL for control plane to reach this agent
- AGENT_CALLBACK_URL=http://localhost:8003
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- CLAUDE_CODE_OAUTH_TOKEN=${CLAUDE_CODE_OAUTH_TOKEN}
- GH_TOKEN=${GH_TOKEN}
ports:
- "8003:8003"
volumes:
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ requires-python = ">=3.12"
dependencies = [
"agentfield>=0.1.9",
"pydantic>=2.0",
"httpx>=0.25",
"aiohttp>=3.9",
"python-dotenv>=1.0",
# Compatibility pin: newer SDK builds have surfaced
# "Unknown message type: rate_limit_event" during streaming.
"claude-agent-sdk==0.1.20",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
# Install: python -m pip install -r requirements.txt

agentfield>=0.1.9
hax-sdk>=0.1.0
pydantic>=2.0
claude-agent-sdk==0.1.20
333 changes: 333 additions & 0 deletions swe_af/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
import subprocess
import uuid

from dotenv import load_dotenv

load_dotenv() # Load .env file so HAX_API_KEY (and other vars) are available

from swe_af.reasoners import router
from swe_af.reasoners.pipeline import _assign_sequence_numbers, _compute_levels, _validate_file_conflicts
from swe_af.reasoners.schemas import PlanResult, ReviewResult

from agentfield import Agent
from agentfield.client import ApprovalResult
from swe_af.execution.envelope import unwrap_call_result as _unwrap
from swe_af.execution.schemas import (
BuildConfig,
Expand Down Expand Up @@ -169,6 +174,61 @@ def _run() -> str:
)


def _format_plan_for_approval(
plan_result: dict,
) -> tuple[str, str, str, list[dict]]:
"""Format plan_result into template-ready fields.

Returns (plan_summary, prd_markdown, architecture_markdown, issues_for_template).
"""
plan_summary = plan_result.get("rationale", "")
prd_data = plan_result.get("prd", {})
arch_data = plan_result.get("architecture", {})

# Format PRD as markdown
prd_md_parts: list[str] = []
if prd_data.get("validated_description"):
prd_md_parts.append(f"## Description\n{prd_data['validated_description']}")
if prd_data.get("must_have"):
prd_md_parts.append("## Must Have\n" + "\n".join(f"- {item}" for item in prd_data["must_have"]))
if prd_data.get("nice_to_have"):
prd_md_parts.append("## Nice to Have\n" + "\n".join(f"- {item}" for item in prd_data["nice_to_have"]))
if prd_data.get("acceptance_criteria"):
prd_md_parts.append("## Acceptance Criteria\n" + "\n".join(f"- {item}" for item in prd_data["acceptance_criteria"]))
prd_markdown = "\n\n".join(prd_md_parts) if prd_md_parts else ""

# Format architecture as markdown
arch_md_parts: list[str] = []
if arch_data.get("summary"):
arch_md_parts.append(f"## Summary\n{arch_data['summary']}")
if arch_data.get("components"):
arch_md_parts.append("## Components")
for comp in arch_data["components"]:
arch_md_parts.append(f"### {comp.get('name', 'Component')}\n{comp.get('responsibility', '')}")
if comp.get("touches_files"):
arch_md_parts.append("Files: " + ", ".join(f"`{f}`" for f in comp["touches_files"]))
if arch_data.get("decisions"):
arch_md_parts.append("## Key Decisions")
for dec in arch_data["decisions"]:
arch_md_parts.append(f"- **{dec.get('decision', '')}**: {dec.get('rationale', '')}")
architecture_markdown = "\n\n".join(arch_md_parts) if arch_md_parts else ""

# Format issues for the template
issues_for_template = []
for issue in plan_result.get("issues", []):
issues_for_template.append({
"name": issue.get("name", ""),
"title": issue.get("title", ""),
"description": issue.get("description", ""),
"dependsOn": issue.get("depends_on", []),
"filesToModify": issue.get("files_to_modify", []),
"filesToCreate": issue.get("files_to_create", []),
"acceptanceCriteria": issue.get("acceptance_criteria", []),
})

return plan_summary, prd_markdown, architecture_markdown, issues_for_template


@app.reasoner()
async def build(
goal: str,
Expand Down Expand Up @@ -419,6 +479,229 @@ async def build(
tags=["build", "git_init", "error"],
)

# 1.5 APPROVAL CHECKPOINT — pause for human plan review if configured
# Automatically enabled when HAX_API_KEY is set in the environment.
# SWE-AF calls hax-sdk directly; the CP only manages execution state.
# Supports iterative revision: reviewer can "request changes" and the agent
# will re-plan (Architect → TechLead → SprintPlanner) with the feedback.
_hax_api_key = os.environ.get("HAX_API_KEY", "").strip()
execution_id = app.ctx.execution_id if app.ctx else ""
_approval_enabled = bool(_hax_api_key)
app.note(
f"Approval check: HAX_API_KEY={'set' if _approval_enabled else 'NOT SET'}, "
f"execution_id={'present' if execution_id else 'MISSING'}",
tags=["build", "approval", "check"],
)
if _approval_enabled and execution_id:
import json as _json
from hax import HaxClient

hax_client = HaxClient(
api_key=_hax_api_key,
base_url=os.environ.get("HAX_SDK_URL", "http://localhost:3000") + "/api/v1",
)
cp_base_url = (app.agentfield_server or "http://localhost:8080").rstrip("/")
approval_state_path = os.path.join(abs_artifacts_dir, "approval_state.json")
os.makedirs(os.path.dirname(approval_state_path), exist_ok=True)
revision_history: list[dict] = []

for revision_iter in range(cfg.max_plan_revision_iterations + 1):
app.note(
f"Phase 1.5: Requesting plan approval (iteration {revision_iter})",
tags=["build", "approval"],
)

plan_summary, prd_markdown, architecture_markdown, issues_for_template = (
_format_plan_for_approval(plan_result)
)

# Create the approval request on hax-sdk
title = "SWE-AF Plan Review"
if revision_iter > 0:
title = f"SWE-AF Plan Review (Revision {revision_iter})"

hax_payload = {
"planSummary": plan_summary,
"issues": issues_for_template,
"architecture": architecture_markdown,
"prd": prd_markdown,
"metadata": {
"repoUrl": cfg.repo_url,
"goalDescription": goal,
"agentNodeId": NODE_ID,
"executionId": execution_id,
},
"revisionNumber": revision_iter,
"revisionHistory": revision_history,
}

hax_create_kwargs: dict = {
"type": "plan-review-v2",
"title": title,
"description": "Review the proposed implementation plan before execution begins",
"payload": hax_payload,
"webhook_url": f"{cp_base_url}/api/v1/webhooks/approval-response",
"expires_in_seconds": cfg.approval_expires_in_hours * 3600,
}
approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "")
if approval_user_id:
hax_create_kwargs["user_id"] = approval_user_id

hax_request = hax_client.create_request(**hax_create_kwargs)

# Save pending state
with open(approval_state_path, "w") as _fp:
_json.dump({
"decision": "pending",
"feedback": "",
"request_id": hax_request.id,
"request_url": hax_request.url,
"revision_number": revision_iter,
}, _fp, indent=2)

# Pause execution — SDK transitions CP to "waiting" and waits for
# the webhook callback (hax-sdk → CP → agent /webhooks/approval)
approval_result = await app.pause(
approval_request_id=hax_request.id,
approval_request_url=hax_request.url,
expires_in_hours=cfg.approval_expires_in_hours,
)

# Persist decision
with open(approval_state_path, "w") as _f:
_json.dump({
"decision": approval_result.decision,
"feedback": approval_result.feedback,
"request_id": approval_result.approval_request_id,
"request_url": hax_request.url,
"revision_number": revision_iter,
"revision_history": revision_history,
}, _f, indent=2)

if approval_result.approved:
app.note(
"Plan approved — proceeding to execution",
tags=["build", "approval", "approved"],
)
break

if approval_result.changes_requested:
# Check if we've exhausted revision attempts
if revision_iter >= cfg.max_plan_revision_iterations:
app.note(
f"Max plan revision iterations ({cfg.max_plan_revision_iterations}) reached",
tags=["build", "approval", "exhausted"],
)
return BuildResult(
plan_result=plan_result,
dag_state={},
success=False,
summary=f"Plan revision limit reached after {revision_iter + 1} iterations",
).model_dump()

revision_history.append({
"iteration": revision_iter,
"feedback": approval_result.feedback,
})

app.note(
f"Changes requested (iteration {revision_iter}): "
f"{approval_result.feedback[:200]}",
tags=["build", "approval", "request_changes"],
)

# Re-plan: Architect → Tech Lead review loop → Sprint Planner
# (skip PM — the PRD/scope doesn't change from reviewer feedback)
app.note(
f"Re-planning with human feedback (revision {revision_iter + 1})",
tags=["build", "replan"],
)

arch = _unwrap(await app.call(
f"{NODE_ID}.run_architect",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
feedback=approval_result.feedback,
model=resolved["architect_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_architect (human revision)")

review = None
for tl_iter in range(cfg.max_review_iterations + 1):
review = _unwrap(await app.call(
f"{NODE_ID}.run_tech_lead",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
revision_number=tl_iter,
model=resolved["tech_lead_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_tech_lead")
if review["approved"]:
break
if tl_iter < cfg.max_review_iterations:
arch = _unwrap(await app.call(
f"{NODE_ID}.run_architect",
prd=plan_result.get("prd", {}),
repo_path=repo_path,
artifacts_dir=artifacts_dir,
feedback=review["feedback"],
model=resolved["architect_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_architect (tech lead revision)")

# Force-approve if tech lead iterations exhausted
if review and not review["approved"]:
review = ReviewResult(
approved=True,
feedback=review["feedback"],
scope_issues=review.get("scope_issues", []),
complexity_assessment=review.get("complexity_assessment", "appropriate"),
summary=review["summary"] + " [auto-approved after max iterations]",
).model_dump()

sprint_result = _unwrap(await app.call(
f"{NODE_ID}.run_sprint_planner",
prd=plan_result.get("prd", {}),
architecture=arch,
repo_path=repo_path,
artifacts_dir=artifacts_dir,
model=resolved["sprint_planner_model"],
permission_mode=cfg.permission_mode,
ai_provider=cfg.ai_provider,
workspace_manifest=manifest.model_dump() if manifest else None,
), "run_sprint_planner (revision)")

# Update plan_result with revised architecture + issues
plan_result = {
**plan_result,
"architecture": arch,
"review": review,
"issues": sprint_result["issues"],
"rationale": sprint_result["rationale"],
}
continue

# Terminal rejection or expired/error
reason = approval_result.feedback or approval_result.decision
app.note(
f"Plan {approval_result.decision} by human reviewer: {reason}",
tags=["build", "approval", approval_result.decision],
)
return BuildResult(
plan_result=plan_result,
dag_state={},
success=False,
summary=f"Plan {approval_result.decision}: {reason}",
).model_dump()

# 2. EXECUTE
exec_config = cfg.to_execution_config_dict()

Expand Down Expand Up @@ -1069,6 +1352,56 @@ async def resume_build(
"rationale": checkpoint.get("original_plan_summary", ""),
}

# Check for pending approval state — if process crashed while waiting,
# resume polling instead of re-requesting approval.
approval_state_path = os.path.join(base, "approval_state.json")
if os.path.exists(approval_state_path):
with open(approval_state_path, "r") as f:
approval_state = json.load(f)

prev_decision = approval_state.get("decision", "")

if prev_decision == "approved":
app.note(
"Approval already granted in previous run — skipping",
tags=["build", "resume", "approval", "skipped"],
)
elif prev_decision in ("rejected", "expired", "error"):
app.note(
f"Previous approval was {prev_decision} — cannot resume",
tags=["build", "resume", "approval", prev_decision],
)
return {
"success": False,
"error": f"Plan was {prev_decision} in previous run",
}
elif prev_decision == "pending" and approval_state.get("request_id"):
# Process crashed while waiting — resume via SDK wait_for_resume
app.note(
"Resuming approval wait from previous run",
tags=["build", "resume", "approval", "waiting"],
)
execution_id = app.ctx.execution_id if app.ctx else ""
if execution_id:
result = await app.wait_for_resume(
approval_request_id=approval_state["request_id"],
execution_id=execution_id,
)
# Update saved state
with open(approval_state_path, "w") as f:
json.dump({
"decision": result.decision,
"feedback": result.feedback,
"request_id": result.approval_request_id,
"request_url": approval_state.get("request_url", ""),
}, f, indent=2)

if not result.approved:
return {
"success": False,
"error": f"Plan {result.decision}: {result.feedback}",
}

app.note("Resuming build from checkpoint", tags=["build", "resume"])

result = await app.call(
Expand Down
Loading
Loading