diff --git a/agents/s09_agent_teams.py b/agents/s09_agent_teams.py index 284a1ac19..5f235ce34 100644 --- a/agents/s09_agent_teams.py +++ b/agents/s09_agent_teams.py @@ -78,6 +78,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -92,19 +93,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s10_team_protocols.py b/agents/s10_team_protocols.py index 21f936df3..1bef669a5 100644 --- a/agents/s10_team_protocols.py +++ b/agents/s10_team_protocols.py @@ -88,6 +88,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -102,19 +103,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s11_autonomous_agents.py b/agents/s11_autonomous_agents.py index 856bc92c3..44a024022 100644 --- a/agents/s11_autonomous_agents.py +++ b/agents/s11_autonomous_agents.py @@ -81,6 +81,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -95,19 +96,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s_full.py b/agents/s_full.py index d4dcfd3c6..1a094801c 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -364,21 +364,33 @@ def drain(self) -> list: class MessageBus: def __init__(self): INBOX_DIR.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: msg = {"type": msg_type, "from": sender, "content": content, "timestamp": time.time()} if extra: msg.update(extra) - with open(INBOX_DIR / f"{to}.jsonl", "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(INBOX_DIR / f"{to}.jsonl", "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: path = INBOX_DIR / f"{name}.jsonl" - if not path.exists(): return [] - msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l] - path.write_text("") + if not path.exists(): + return [] + with self._lock: + content = path.read_text() + path.write_text("") + msgs = [] + for l in content.splitlines(): + l = l.strip() + if l: + try: + msgs.append(json.loads(l)) + except json.JSONDecodeError: + pass return msgs def broadcast(self, sender: str, content: str, names: list) -> str: