From 05e62044cdcc590b7ac45ff8a8dee31596d8f652 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:35:55 +0800 Subject: [PATCH] Fix thread-unsafe MessageBus in s10, s11, and s_full (#1) * Initial plan * Fix JSONDecodeError in MessageBus.read_inbox via thread-safe lock and defensive parsing Co-authored-by: leesf <10128888+leesf@users.noreply.github.com> * Fix JSONDecodeError in MessageBus.read_inbox in s10, s11, and s_full Co-authored-by: leesf <10128888+leesf@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: leesf <10128888+leesf@users.noreply.github.com> --- agents/s09_agent_teams.py | 18 +++++++++++++----- agents/s10_team_protocols.py | 18 +++++++++++++----- agents/s11_autonomous_agents.py | 18 +++++++++++++----- agents/s_full.py | 22 +++++++++++++++++----- 4 files changed, 56 insertions(+), 20 deletions(-) diff --git a/agents/s09_agent_teams.py b/agents/s09_agent_teams.py index 284a1ac19..5f235ce34 100644 --- a/agents/s09_agent_teams.py +++ b/agents/s09_agent_teams.py @@ -78,6 +78,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -92,19 +93,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s10_team_protocols.py b/agents/s10_team_protocols.py index 21f936df3..1bef669a5 100644 --- a/agents/s10_team_protocols.py +++ b/agents/s10_team_protocols.py @@ -88,6 +88,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -102,19 +103,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s11_autonomous_agents.py b/agents/s11_autonomous_agents.py index 856bc92c3..44a024022 100644 --- a/agents/s11_autonomous_agents.py +++ b/agents/s11_autonomous_agents.py @@ -81,6 +81,7 @@ class MessageBus: def __init__(self, inbox_dir: Path): self.dir = inbox_dir self.dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: @@ -95,19 +96,26 @@ def send(self, sender: str, to: str, content: str, if extra: msg.update(extra) inbox_path = self.dir / f"{to}.jsonl" - with open(inbox_path, "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(inbox_path, "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: inbox_path = self.dir / f"{name}.jsonl" if not inbox_path.exists(): return [] + with self._lock: + content = inbox_path.read_text() + inbox_path.write_text("") messages = [] - for line in inbox_path.read_text().strip().splitlines(): + for line in content.splitlines(): + line = line.strip() if line: - messages.append(json.loads(line)) - inbox_path.write_text("") + try: + messages.append(json.loads(line)) + except json.JSONDecodeError: + pass return messages def broadcast(self, sender: str, content: str, teammates: list) -> str: diff --git a/agents/s_full.py b/agents/s_full.py index d4dcfd3c6..1a094801c 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -364,21 +364,33 @@ def drain(self) -> list: class MessageBus: def __init__(self): INBOX_DIR.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() def send(self, sender: str, to: str, content: str, msg_type: str = "message", extra: dict = None) -> str: msg = {"type": msg_type, "from": sender, "content": content, "timestamp": time.time()} if extra: msg.update(extra) - with open(INBOX_DIR / f"{to}.jsonl", "a") as f: - f.write(json.dumps(msg) + "\n") + with self._lock: + with open(INBOX_DIR / f"{to}.jsonl", "a") as f: + f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: path = INBOX_DIR / f"{name}.jsonl" - if not path.exists(): return [] - msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l] - path.write_text("") + if not path.exists(): + return [] + with self._lock: + content = path.read_text() + path.write_text("") + msgs = [] + for l in content.splitlines(): + l = l.strip() + if l: + try: + msgs.append(json.loads(l)) + except json.JSONDecodeError: + pass return msgs def broadcast(self, sender: str, content: str, names: list) -> str: