diff --git a/agents/s08_background_tasks.py b/agents/s08_background_tasks.py index 77a992eaf..459c1c121 100644 --- a/agents/s08_background_tasks.py +++ b/agents/s08_background_tasks.py @@ -51,6 +51,7 @@ def __init__(self): self.tasks = {} # task_id -> {status, result, command} self._notification_queue = [] # completed task results self._lock = threading.Lock() + self._notifications_ready = threading.Condition(self._lock) def run(self, command: str) -> str: """Start a background thread, return task_id immediately.""" @@ -77,15 +78,16 @@ def _execute(self, task_id: str, command: str): except Exception as e: output = f"Error: {e}" status = "error" - self.tasks[task_id]["status"] = status - self.tasks[task_id]["result"] = output or "(no output)" - with self._lock: + with self._notifications_ready: + self.tasks[task_id]["status"] = status + self.tasks[task_id]["result"] = output or "(no output)" self._notification_queue.append({ "task_id": task_id, "status": status, "command": command[:80], "result": (output or "(no output)")[:500], }) + self._notifications_ready.notify_all() def check(self, task_id: str = None) -> str: """Check status of one task or list all.""" @@ -106,6 +108,17 @@ def drain_notifications(self) -> list: self._notification_queue.clear() return notifs + def wait_for_notifications(self) -> list: + """Block until a background task finishes or no tasks remain running.""" + with self._notifications_ready: + while not self._notification_queue and any( + task["status"] == "running" for task in self.tasks.values() + ): + self._notifications_ready.wait() + notifs = list(self._notification_queue) + self._notification_queue.clear() + return notifs + BG = BackgroundManager() @@ -184,23 +197,30 @@ def run_edit(path: str, old_text: str, new_text: str) -> str: ] +def inject_background_results(messages: list, notifs: list): + if not notifs: + return + notif_text = "\n".join( + f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs + ) + messages.append({"role": "user", "content": f"\n{notif_text}\n"}) + messages.append({"role": "assistant", "content": "Noted background results."}) + + def agent_loop(messages: list): while True: - # Drain background notifications and inject as system message before LLM call - notifs = BG.drain_notifications() - if notifs and messages: - notif_text = "\n".join( - f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs - ) - messages.append({"role": "user", "content": f"\n{notif_text}\n"}) - messages.append({"role": "assistant", "content": "Noted background results."}) + inject_background_results(messages, BG.drain_notifications()) response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": - return + late_notifs = BG.wait_for_notifications() + if not late_notifs: + return + inject_background_results(messages, late_notifs) + continue results = [] for block in response.content: if block.type == "tool_use": diff --git a/tests/test_s08_background_tasks.py b/tests/test_s08_background_tasks.py new file mode 100644 index 000000000..3f8febb91 --- /dev/null +++ b/tests/test_s08_background_tasks.py @@ -0,0 +1,136 @@ +import importlib +import os +import sys +import threading +import time +import types +import unittest +from pathlib import Path +from types import SimpleNamespace + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +os.environ.setdefault("MODEL_ID", "test-model") + +if "dotenv" not in sys.modules: + dotenv = types.ModuleType("dotenv") + dotenv.load_dotenv = lambda override=True: None + sys.modules["dotenv"] = dotenv + +if "anthropic" not in sys.modules: + anthropic = types.ModuleType("anthropic") + + class Anthropic: # pragma: no cover - import shim only + def __init__(self, *args, **kwargs): + self.messages = SimpleNamespace(create=lambda **kwargs: None) + + anthropic.Anthropic = Anthropic + sys.modules["anthropic"] = anthropic + + +s08 = importlib.import_module("agents.s08_background_tasks") + + +class FakeMessagesAPI: + def __init__(self, responses): + self._responses = list(responses) + self.calls = [] + + def create(self, **kwargs): + self.calls.append(kwargs) + return self._responses.pop(0) + + +class BackgroundTasksTests(unittest.TestCase): + def setUp(self): + self.original_client = s08.client + self.original_bg = s08.BG + + def tearDown(self): + s08.client = self.original_client + s08.BG = self.original_bg + + def test_wait_for_notifications_unblocks_when_task_finishes(self): + bg = s08.BackgroundManager() + bg.tasks["task-1"] = {"status": "running", "result": None, "command": "demo"} + + def complete_task(): + time.sleep(0.05) + with bg._notifications_ready: + bg.tasks["task-1"]["status"] = "completed" + bg.tasks["task-1"]["result"] = "done" + bg._notification_queue.append( + { + "task_id": "task-1", + "status": "completed", + "command": "demo", + "result": "done", + } + ) + bg._notifications_ready.notify_all() + + threading.Thread(target=complete_task, daemon=True).start() + notifs = bg.wait_for_notifications() + + self.assertEqual( + notifs, + [ + { + "task_id": "task-1", + "status": "completed", + "command": "demo", + "result": "done", + } + ], + ) + + def test_agent_loop_resumes_after_background_completion(self): + bg = s08.BackgroundManager() + bg.tasks["task-1"] = {"status": "running", "result": None, "command": "demo"} + s08.BG = bg + + responses = [ + SimpleNamespace(stop_reason="end_turn", content=[SimpleNamespace(type="text", text="running")]), + SimpleNamespace(stop_reason="end_turn", content=[SimpleNamespace(type="text", text="done")]), + ] + fake_messages = FakeMessagesAPI(responses) + s08.client = SimpleNamespace(messages=fake_messages) + + def complete_task(): + time.sleep(0.05) + with bg._notifications_ready: + bg.tasks["task-1"]["status"] = "completed" + bg.tasks["task-1"]["result"] = "done" + bg._notification_queue.append( + { + "task_id": "task-1", + "status": "completed", + "command": "demo", + "result": "done", + } + ) + bg._notifications_ready.notify_all() + + threading.Thread(target=complete_task, daemon=True).start() + messages = [{"role": "user", "content": "run task in background"}] + + s08.agent_loop(messages) + + self.assertEqual(len(fake_messages.calls), 2) + second_call_messages = fake_messages.calls[1]["messages"] + self.assertTrue( + any( + msg["role"] == "user" + and isinstance(msg["content"], str) + and "" in msg["content"] + and "[bg:task-1] completed: done" in msg["content"] + for msg in second_call_messages + ) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_unit.py b/tests/test_unit.py new file mode 100644 index 000000000..e01a569d9 --- /dev/null +++ b/tests/test_unit.py @@ -0,0 +1,8 @@ +import sys +import unittest + + +if __name__ == "__main__": + suite = unittest.defaultTestLoader.discover("tests", pattern="test_*.py") + result = unittest.TextTestRunner(verbosity=2).run(suite) + sys.exit(0 if result.wasSuccessful() else 1)