diff --git a/apps/memos-local-plugin/adapters/hermes/__init__.py b/apps/memos-local-plugin/adapters/hermes/__init__.py index fcb8ad58..4e2f2d51 100644 --- a/apps/memos-local-plugin/adapters/hermes/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/__init__.py @@ -8,22 +8,25 @@ from __future__ import annotations +import contextlib import json import logging -import os import re import sys import threading + from pathlib import Path -from typing import Any, Dict, List +from typing import Any + # Add this directory to sys.path so sibling modules (config, bridge_client, …) resolve _PLUGIN_DIR = Path(__file__).resolve().parent if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from agent.memory_provider import MemoryProvider -from tools.registry import tool_error +from agent.memory_provider import MemoryProvider # noqa: E402 +from tools.registry import tool_error # noqa: E402 + logger = logging.getLogger(__name__) @@ -51,10 +54,10 @@ re.compile(r'^\s*\{["\']?ok["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE), re.compile(r'^\s*\{["\']?success["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE), re.compile(r'^\s*\{["\']?status["\']?\s*:\s*["\']?ok["\']?\s*\}\s*$', re.IGNORECASE), - re.compile(r'^Operation interrupted:', re.IGNORECASE), - re.compile(r'^Error:', re.IGNORECASE), - re.compile(r'waiting for model response.*elapsed', re.IGNORECASE), - re.compile(r'^\s*$'), + re.compile(r"^Operation interrupted:", re.IGNORECASE), + re.compile(r"^Error:", re.IGNORECASE), + re.compile(r"waiting for model response.*elapsed", re.IGNORECASE), + re.compile(r"^\s*$"), ] _MIN_CONTENT_LENGTH = 6 @@ -74,7 +77,10 @@ def _is_trivial(text: str) -> bool: keys = {k.lower() for k in obj} if keys <= {"ok", "success", "status", "result", "error", "message"}: vals = list(obj.values()) - if all(isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20) for v in vals): + if all( + isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20) + for v in vals + ): return True except (json.JSONDecodeError, TypeError): pass @@ -100,6 +106,7 @@ def name(self) -> str: def is_available(self) -> bool: try: from config import find_bridge_script + find_bridge_script() return True except Exception: @@ -109,6 +116,7 @@ def initialize(self, session_id: str, **kwargs) -> None: self._session_id = session_id from daemon_manager import ensure_daemon + try: info = ensure_daemon() logger.info( @@ -121,6 +129,7 @@ def initialize(self, session_id: str, **kwargs) -> None: logger.warning("Failed to start MemTensor daemon: %s", e) from bridge_client import MemosCoreBridge + try: self._bridge = MemosCoreBridge() logger.info("MemTensor bridge connected") @@ -175,6 +184,7 @@ def _run(): if pending and self._bridge: try: from config import OWNER + user_content, assistant_content, sid = pending messages = [] if user_content: @@ -201,9 +211,7 @@ def _do_recall(self, query: str) -> str: parts: list[str] = [] try: - search_resp = self._bridge.search( - query, max_results=5, min_score=0.4, owner=OWNER - ) + search_resp = self._bridge.search(query, max_results=5, min_score=0.4, owner=OWNER) hits = search_resp.get("hits") or search_resp.get("memories") or [] for h in hits: text = h.get("original_excerpt") or h.get("content") or h.get("summary", "") @@ -225,9 +233,7 @@ def _do_recall(self, query: str) -> str: return "\n".join(parts) - def sync_turn( - self, user_content: str, assistant_content: str, *, session_id: str = "" - ) -> None: + def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: """Queue turn data for deferred ingest. Hermes calls sync_all() BEFORE queue_prefetch_all(), so ingesting @@ -239,8 +245,11 @@ def sync_turn( if not self._bridge: return if _is_trivial(user_content) and _is_trivial(assistant_content): - logger.debug("sync_turn: skipping trivial turn (user=%r, assistant=%r)", - user_content[:80] if user_content else "", assistant_content[:80] if assistant_content else "") + logger.debug( + "sync_turn: skipping trivial turn (user=%r, assistant=%r)", + user_content[:80] if user_content else "", + assistant_content[:80] if assistant_content else "", + ) return if _is_trivial(user_content): user_content = "" @@ -249,10 +258,10 @@ def sync_turn( sid = session_id or self._session_id or "default" self._pending_ingest = (user_content, assistant_content, sid) - def get_tool_schemas(self) -> List[Dict[str, Any]]: + def get_tool_schemas(self) -> list[dict[str, Any]]: return [MEMORY_SEARCH_SCHEMA] - def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: + def handle_tool_call(self, tool_name: str, args: dict[str, Any], **kwargs) -> str: if tool_name != "memory_search": return tool_error(f"Unknown tool: {tool_name}") @@ -265,6 +274,7 @@ def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> st try: from config import OWNER + resp = self._bridge.search(query, max_results=8, owner=OWNER) hits = resp.get("hits") or resp.get("memories") or [] if not hits: @@ -285,6 +295,7 @@ def on_memory_write(self, action: str, target: str, content: str) -> None: return from config import OWNER + label = "user_profile" if target == "user" else "memory" messages = [ {"role": "system", "content": f"[{label}] {content}"}, @@ -298,14 +309,16 @@ def _write(): owner=OWNER, ) self._bridge.flush() - logger.info("MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content)) + logger.info( + "MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content) + ) except Exception as e: logger.warning("MemTensor on_memory_write failed: %s", e) t = threading.Thread(target=_write, daemon=True, name="memtensor-memory-write") t.start() - def on_session_end(self, messages: List[Dict[str, Any]]) -> None: + def on_session_end(self, messages: list[dict[str, Any]]) -> None: if not self._bridge: return # Flush any deferred ingest that hasn't been picked up by queue_prefetch @@ -314,6 +327,7 @@ def on_session_end(self, messages: List[Dict[str, Any]]) -> None: if pending: try: from config import OWNER + user_content, assistant_content, sid = pending msgs = [] if user_content: @@ -334,10 +348,8 @@ def shutdown(self) -> None: if t and t.is_alive(): t.join(timeout=5.0) if self._bridge: - try: + with contextlib.suppress(Exception): self._bridge.shutdown() - except Exception: - pass self._bridge = None diff --git a/apps/memos-local-plugin/adapters/hermes/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/bridge_client.py index 47ca0b01..426f40c8 100644 --- a/apps/memos-local-plugin/adapters/hermes/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/bridge_client.py @@ -7,15 +7,18 @@ from __future__ import annotations +import contextlib import json import logging import os import socket import subprocess import threading + from typing import Any -from config import find_bridge_script, get_bridge_config, get_daemon_port, OWNER, _get_plugin_root +from config import OWNER, _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_port + logger = logging.getLogger(__name__) @@ -45,10 +48,8 @@ def send(self, data: str) -> str: def close(self) -> None: if self._sock: - try: + with contextlib.suppress(OSError): self._sock.close() - except OSError: - pass self._sock = None @@ -133,20 +134,27 @@ def call(self, method: str, params: dict[str, Any] | None = None) -> Any: raise RuntimeError(f"Bridge error: {resp['error']}") return resp.get("result", {}) - def search(self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER) -> dict: - return self.call("search", { - "query": query, - "maxResults": max_results, - "minScore": min_score, - "owner": owner, - }) + def search( + self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER + ) -> dict: + return self.call( + "search", + { + "query": query, + "maxResults": max_results, + "minScore": min_score, + "owner": owner, + }, + ) def ingest(self, messages: list[dict], session_id: str = "default", owner: str = OWNER) -> None: params: dict[str, Any] = {"messages": messages, "sessionId": session_id, "owner": owner} self.call("ingest", params) def build_prompt(self, query: str, max_results: int = 6, owner: str = OWNER) -> dict: - return self.call("build_prompt", {"query": query, "maxResults": max_results, "owner": owner}) + return self.call( + "build_prompt", {"query": query, "maxResults": max_results, "owner": owner} + ) def flush(self) -> None: self.call("flush") diff --git a/apps/memos-local-plugin/adapters/hermes/config.py b/apps/memos-local-plugin/adapters/hermes/config.py index 5613111c..f1a2a8b4 100644 --- a/apps/memos-local-plugin/adapters/hermes/config.py +++ b/apps/memos-local-plugin/adapters/hermes/config.py @@ -4,8 +4,10 @@ import json import os + from pathlib import Path + DAEMON_PORT = 18992 VIEWER_PORT = 18901 OWNER = "hermes" @@ -139,6 +141,7 @@ def _resolve_tsx(plugin_root: Path) -> str: if local_tsx.exists(): return str(local_tsx) import shutil + global_tsx = shutil.which("tsx") if global_tsx: return global_tsx @@ -170,7 +173,7 @@ def find_bridge_script() -> list[str]: return ["node", str(candidate)] tsx = _resolve_tsx(candidate.parent) if " " in tsx: - return tsx.split() + [str(candidate)] + return [*tsx.split(), str(candidate)] return [tsx, str(candidate)] raise FileNotFoundError( diff --git a/apps/memos-local-plugin/adapters/hermes/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/daemon_manager.py index c922c5d3..1ac11da5 100644 --- a/apps/memos-local-plugin/adapters/hermes/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/daemon_manager.py @@ -6,6 +6,7 @@ from __future__ import annotations +import contextlib import json import logging import os @@ -13,18 +14,20 @@ import socket import subprocess import time + from typing import Any from config import ( DAEMON_PORT, VIEWER_PORT, + _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_dir, get_daemon_port, - _get_plugin_root, ) + logger = logging.getLogger(__name__) @@ -83,10 +86,8 @@ def _cleanup_pid_files() -> None: for name in ("bridge.pid", "bridge.port", "viewer.url"): f = daemon_dir / name if f.exists(): - try: + with contextlib.suppress(OSError): f.unlink() - except OSError: - pass def start_daemon( @@ -105,10 +106,8 @@ def start_daemon( pid = 0 pf = daemon_dir / "bridge.pid" if pf.exists(): - try: + with contextlib.suppress(ValueError, OSError): pid = int(pf.read_text().strip()) - except (ValueError, OSError): - pass logger.info("Daemon already responsive on port %d (shared)", port) return { "daemonPort": port, @@ -125,57 +124,55 @@ def start_daemon( env["OPENCLAW_STATE_DIR"] = str(get_daemon_dir().parent) log_dir = get_daemon_dir() - log_file = open(log_dir / "bridge.log", "a") logger.info("Starting daemon: %s", " ".join(bridge_cmd)) - proc = subprocess.Popen( - bridge_cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=log_file, - env=env, - cwd=str(_get_plugin_root()), - start_new_session=True, - ) - - deadline = time.monotonic() + timeout - info: dict[str, Any] = {} - - import select - while time.monotonic() < deadline: - if proc.poll() is not None: - log_file.close() - stderr_out = "" - try: - stderr_out = (log_dir / "bridge.log").read_text()[-2000:] - except OSError: - pass - raise RuntimeError( - f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" - ) - - if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: - line = proc.stdout.readline().decode("utf-8").strip() - if line: - try: - info = json.loads(line) - break - except json.JSONDecodeError: - logger.debug("Non-JSON stdout line from daemon: %s", line) - - if not info: - log_file.close() - raise RuntimeError("Daemon did not produce startup info within timeout") + with open(log_dir / "bridge.log", "a") as log_file: + proc = subprocess.Popen( + bridge_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=log_file, + env=env, + cwd=str(_get_plugin_root()), + start_new_session=True, + ) + + deadline = time.monotonic() + timeout + info: dict[str, Any] = {} + + import select + + while time.monotonic() < deadline: + if proc.poll() is not None: + stderr_out = "" + with contextlib.suppress(OSError): + stderr_out = (log_dir / "bridge.log").read_text()[-2000:] + raise RuntimeError( + f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" + ) + + if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: + line = proc.stdout.readline().decode("utf-8").strip() + if line: + try: + info = json.loads(line) + break + except json.JSONDecodeError: + logger.debug("Non-JSON stdout line from daemon: %s", line) + + if not info: + raise RuntimeError("Daemon did not produce startup info within timeout") if proc.stdout: proc.stdout.close() - log_file.close() info["already_running"] = False logger.info( "Daemon started: pid=%s, port=%s, viewer=%s", - info.get("pid"), info.get("daemonPort"), info.get("viewerUrl"), + info.get("pid"), + info.get("daemonPort"), + info.get("viewerUrl"), ) return info diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py b/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py index ce2b8165..d620606f 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py @@ -7,15 +7,18 @@ from __future__ import annotations +import contextlib import json import logging import os import socket import subprocess import threading + from typing import Any -from config import find_bridge_script, get_bridge_config, get_daemon_port, _get_plugin_root +from config import _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_port + logger = logging.getLogger(__name__) @@ -47,10 +50,8 @@ def send(self, data: str) -> str: def close(self) -> None: if self._sock: - try: + with contextlib.suppress(OSError): self._sock.close() - except OSError: - pass self._sock = None @@ -139,22 +140,31 @@ def call(self, method: str, params: dict[str, Any] | None = None) -> Any: raise RuntimeError(f"Bridge error: {resp['error']}") return resp.get("result", {}) - def search(self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = "openharness") -> dict: - return self.call("search", { - "query": query, - "maxResults": max_results, - "minScore": min_score, - "owner": owner, - }) + def search( + self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = "openharness" + ) -> dict: + return self.call( + "search", + { + "query": query, + "maxResults": max_results, + "minScore": min_score, + "owner": owner, + }, + ) - def ingest(self, messages: list[dict], session_id: str = "default", owner: str | None = None) -> None: + def ingest( + self, messages: list[dict], session_id: str = "default", owner: str | None = None + ) -> None: params: dict[str, Any] = {"messages": messages, "sessionId": session_id} if owner: params["owner"] = owner self.call("ingest", params) def build_prompt(self, query: str, max_results: int = 6, owner: str = "openharness") -> dict: - return self.call("build_prompt", {"query": query, "maxResults": max_results, "owner": owner}) + return self.call( + "build_prompt", {"query": query, "maxResults": max_results, "owner": owner} + ) def flush(self) -> None: self.call("flush") diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/config.py b/apps/memos-local-plugin/adapters/openharness/scripts/config.py index 69476c1b..e12a53e6 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/config.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/config.py @@ -4,9 +4,11 @@ import json import os + from hashlib import sha1 from pathlib import Path + # Default ports DAEMON_PORT = 18990 VIEWER_PORT = 18899 @@ -165,6 +167,7 @@ def _resolve_tsx(plugin_root: Path) -> str: if local_tsx.exists(): return str(local_tsx) import shutil + global_tsx = shutil.which("tsx") if global_tsx: return global_tsx @@ -197,7 +200,7 @@ def find_bridge_script() -> list[str]: return ["node", str(candidate)] tsx = _resolve_tsx(candidate.parent) if " " in tsx: - return tsx.split() + [str(candidate)] + return [*tsx.split(), str(candidate)] return [tsx, str(candidate)] raise FileNotFoundError( diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py b/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py index 7b3c55ba..dae0e675 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py @@ -7,6 +7,7 @@ from __future__ import annotations +import contextlib import json import logging import os @@ -14,19 +15,20 @@ import socket import subprocess import time -from pathlib import Path + from typing import Any from config import ( DAEMON_PORT, VIEWER_PORT, + _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_dir, get_daemon_port, - _get_plugin_root, ) + logger = logging.getLogger(__name__) @@ -84,10 +86,8 @@ def _cleanup_pid_files() -> None: for name in ("bridge.pid", "bridge.port", "viewer.url"): f = daemon_dir / name if f.exists(): - try: + with contextlib.suppress(OSError): f.unlink() - except OSError: - pass def start_daemon( @@ -109,10 +109,8 @@ def start_daemon( pid = 0 pf = daemon_dir / "bridge.pid" if pf.exists(): - try: + with contextlib.suppress(ValueError, OSError): pid = int(pf.read_text().strip()) - except (ValueError, OSError): - pass return { "daemonPort": port, "viewerUrl": viewer_url, @@ -128,62 +126,56 @@ def start_daemon( # Isolate viewer: prevent migration scan from showing OpenClaw data env["OPENCLAW_STATE_DIR"] = str(get_daemon_dir().parent) - # Redirect stderr to a log file so closing the pipe doesn't EPIPE the daemon log_dir = get_daemon_dir() - log_file = open(log_dir / "bridge.log", "a") logger.info("Starting daemon: %s", " ".join(bridge_cmd)) - proc = subprocess.Popen( - bridge_cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=log_file, - env=env, - cwd=str(_get_plugin_root()), - start_new_session=True, - ) + with open(log_dir / "bridge.log", "a") as log_file: + proc = subprocess.Popen( + bridge_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=log_file, + env=env, + cwd=str(_get_plugin_root()), + start_new_session=True, + ) + + deadline = time.monotonic() + timeout + info: dict[str, Any] = {} + + import select + + while time.monotonic() < deadline: + if proc.poll() is not None: + stderr_out = "" + with contextlib.suppress(OSError): + stderr_out = (log_dir / "bridge.log").read_text()[-2000:] + raise RuntimeError( + f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" + ) + + if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: + line = proc.stdout.readline().decode("utf-8").strip() + if line: + try: + info = json.loads(line) + break + except json.JSONDecodeError: + logger.debug("Non-JSON stdout line from daemon: %s", line) + + if not info: + raise RuntimeError("Daemon did not produce startup info within timeout") - # Wait for the daemon to print its startup JSON line to stdout - deadline = time.monotonic() + timeout - info: dict[str, Any] = {} - - import select - while time.monotonic() < deadline: - if proc.poll() is not None: - log_file.close() - # Read the log for error context - stderr_out = "" - try: - stderr_out = (log_dir / "bridge.log").read_text()[-2000:] - except OSError: - pass - raise RuntimeError( - f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" - ) - - if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: - line = proc.stdout.readline().decode("utf-8").strip() - if line: - try: - info = json.loads(line) - break - except json.JSONDecodeError: - logger.debug("Non-JSON stdout line from daemon: %s", line) - - if not info: - log_file.close() - raise RuntimeError("Daemon did not produce startup info within timeout") - - # Close our handle to stdout; stderr goes to the log file which stays open if proc.stdout: proc.stdout.close() - log_file.close() info["already_running"] = False logger.info( "Daemon started: pid=%s, port=%s, viewer=%s", - info.get("pid"), info.get("daemonPort"), info.get("viewerUrl"), + info.get("pid"), + info.get("daemonPort"), + info.get("viewerUrl"), ) return info