Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions agents/s08_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
self._notifications_ready = threading.Condition(self._lock)

def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
Expand All @@ -77,15 +78,16 @@ def _execute(self, task_id: str, command: str):
except Exception as e:
output = f"Error: {e}"
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
with self._notifications_ready:
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})
self._notifications_ready.notify_all()

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
Expand All @@ -106,6 +108,17 @@ def drain_notifications(self) -> list:
self._notification_queue.clear()
return notifs

def wait_for_notifications(self) -> list:
"""Block until a background task finishes or no tasks remain running."""
with self._notifications_ready:
while not self._notification_queue and any(
task["status"] == "running" for task in self.tasks.values()
):
self._notifications_ready.wait()
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs


BG = BackgroundManager()

Expand Down Expand Up @@ -184,23 +197,30 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:
]


def inject_background_results(messages: list, notifs: list):
if not notifs:
return
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})


def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
messages.append({"role": "assistant", "content": "Noted background results."})
inject_background_results(messages, BG.drain_notifications())
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
late_notifs = BG.wait_for_notifications()
if not late_notifs:
return
inject_background_results(messages, late_notifs)
continue
results = []
for block in response.content:
if block.type == "tool_use":
Expand Down
136 changes: 136 additions & 0 deletions tests/test_s08_background_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import importlib
import os
import sys
import threading
import time
import types
import unittest
from pathlib import Path
from types import SimpleNamespace


REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))

os.environ.setdefault("MODEL_ID", "test-model")

if "dotenv" not in sys.modules:
dotenv = types.ModuleType("dotenv")
dotenv.load_dotenv = lambda override=True: None
sys.modules["dotenv"] = dotenv

if "anthropic" not in sys.modules:
anthropic = types.ModuleType("anthropic")

class Anthropic: # pragma: no cover - import shim only
def __init__(self, *args, **kwargs):
self.messages = SimpleNamespace(create=lambda **kwargs: None)

anthropic.Anthropic = Anthropic
sys.modules["anthropic"] = anthropic


s08 = importlib.import_module("agents.s08_background_tasks")


class FakeMessagesAPI:
def __init__(self, responses):
self._responses = list(responses)
self.calls = []

def create(self, **kwargs):
self.calls.append(kwargs)
return self._responses.pop(0)


class BackgroundTasksTests(unittest.TestCase):
def setUp(self):
self.original_client = s08.client
self.original_bg = s08.BG

def tearDown(self):
s08.client = self.original_client
s08.BG = self.original_bg

def test_wait_for_notifications_unblocks_when_task_finishes(self):
bg = s08.BackgroundManager()
bg.tasks["task-1"] = {"status": "running", "result": None, "command": "demo"}

def complete_task():
time.sleep(0.05)
with bg._notifications_ready:
bg.tasks["task-1"]["status"] = "completed"
bg.tasks["task-1"]["result"] = "done"
bg._notification_queue.append(
{
"task_id": "task-1",
"status": "completed",
"command": "demo",
"result": "done",
}
)
bg._notifications_ready.notify_all()

threading.Thread(target=complete_task, daemon=True).start()
notifs = bg.wait_for_notifications()

self.assertEqual(
notifs,
[
{
"task_id": "task-1",
"status": "completed",
"command": "demo",
"result": "done",
}
],
)

def test_agent_loop_resumes_after_background_completion(self):
bg = s08.BackgroundManager()
bg.tasks["task-1"] = {"status": "running", "result": None, "command": "demo"}
s08.BG = bg

responses = [
SimpleNamespace(stop_reason="end_turn", content=[SimpleNamespace(type="text", text="running")]),
SimpleNamespace(stop_reason="end_turn", content=[SimpleNamespace(type="text", text="done")]),
]
fake_messages = FakeMessagesAPI(responses)
s08.client = SimpleNamespace(messages=fake_messages)

def complete_task():
time.sleep(0.05)
with bg._notifications_ready:
bg.tasks["task-1"]["status"] = "completed"
bg.tasks["task-1"]["result"] = "done"
bg._notification_queue.append(
{
"task_id": "task-1",
"status": "completed",
"command": "demo",
"result": "done",
}
)
bg._notifications_ready.notify_all()

threading.Thread(target=complete_task, daemon=True).start()
messages = [{"role": "user", "content": "run task in background"}]

s08.agent_loop(messages)

self.assertEqual(len(fake_messages.calls), 2)
second_call_messages = fake_messages.calls[1]["messages"]
self.assertTrue(
any(
msg["role"] == "user"
and isinstance(msg["content"], str)
and "<background-results>" in msg["content"]
and "[bg:task-1] completed: done" in msg["content"]
for msg in second_call_messages
)
)


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sys
import unittest


if __name__ == "__main__":
suite = unittest.defaultTestLoader.discover("tests", pattern="test_*.py")
result = unittest.TextTestRunner(verbosity=2).run(suite)
sys.exit(0 if result.wasSuccessful() else 1)