Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions apps/memos-local-plugin/adapters/hermes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@

from __future__ import annotations

import contextlib
import json
import logging
import os
import re
import sys
import threading

from pathlib import Path
from typing import Any, Dict, List
from typing import Any


# Add this directory to sys.path so sibling modules (config, bridge_client, …) resolve
_PLUGIN_DIR = Path(__file__).resolve().parent
if str(_PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(_PLUGIN_DIR))

from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
from agent.memory_provider import MemoryProvider # noqa: E402
from tools.registry import tool_error # noqa: E402


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,10 +54,10 @@
re.compile(r'^\s*\{["\']?ok["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE),
re.compile(r'^\s*\{["\']?success["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE),
re.compile(r'^\s*\{["\']?status["\']?\s*:\s*["\']?ok["\']?\s*\}\s*$', re.IGNORECASE),
re.compile(r'^Operation interrupted:', re.IGNORECASE),
re.compile(r'^Error:', re.IGNORECASE),
re.compile(r'waiting for model response.*elapsed', re.IGNORECASE),
re.compile(r'^\s*$'),
re.compile(r"^Operation interrupted:", re.IGNORECASE),
re.compile(r"^Error:", re.IGNORECASE),
re.compile(r"waiting for model response.*elapsed", re.IGNORECASE),
re.compile(r"^\s*$"),
]

_MIN_CONTENT_LENGTH = 6
Expand All @@ -74,7 +77,10 @@ def _is_trivial(text: str) -> bool:
keys = {k.lower() for k in obj}
if keys <= {"ok", "success", "status", "result", "error", "message"}:
vals = list(obj.values())
if all(isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20) for v in vals):
if all(
isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20)
for v in vals
):
return True
except (json.JSONDecodeError, TypeError):
pass
Expand All @@ -100,6 +106,7 @@ def name(self) -> str:
def is_available(self) -> bool:
try:
from config import find_bridge_script

find_bridge_script()
return True
except Exception:
Expand All @@ -109,6 +116,7 @@ def initialize(self, session_id: str, **kwargs) -> None:
self._session_id = session_id

from daemon_manager import ensure_daemon

try:
info = ensure_daemon()
logger.info(
Expand All @@ -121,6 +129,7 @@ def initialize(self, session_id: str, **kwargs) -> None:
logger.warning("Failed to start MemTensor daemon: %s", e)

from bridge_client import MemosCoreBridge

try:
self._bridge = MemosCoreBridge()
logger.info("MemTensor bridge connected")
Expand Down Expand Up @@ -175,6 +184,7 @@ def _run():
if pending and self._bridge:
try:
from config import OWNER

user_content, assistant_content, sid = pending
messages = []
if user_content:
Expand All @@ -201,9 +211,7 @@ def _do_recall(self, query: str) -> str:
parts: list[str] = []

try:
search_resp = self._bridge.search(
query, max_results=5, min_score=0.4, owner=OWNER
)
search_resp = self._bridge.search(query, max_results=5, min_score=0.4, owner=OWNER)
hits = search_resp.get("hits") or search_resp.get("memories") or []
for h in hits:
text = h.get("original_excerpt") or h.get("content") or h.get("summary", "")
Expand All @@ -225,9 +233,7 @@ def _do_recall(self, query: str) -> str:

return "\n".join(parts)

def sync_turn(
self, user_content: str, assistant_content: str, *, session_id: str = ""
) -> None:
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
"""Queue turn data for deferred ingest.

Hermes calls sync_all() BEFORE queue_prefetch_all(), so ingesting
Expand All @@ -239,8 +245,11 @@ def sync_turn(
if not self._bridge:
return
if _is_trivial(user_content) and _is_trivial(assistant_content):
logger.debug("sync_turn: skipping trivial turn (user=%r, assistant=%r)",
user_content[:80] if user_content else "", assistant_content[:80] if assistant_content else "")
logger.debug(
"sync_turn: skipping trivial turn (user=%r, assistant=%r)",
user_content[:80] if user_content else "",
assistant_content[:80] if assistant_content else "",
)
return
if _is_trivial(user_content):
user_content = ""
Expand All @@ -249,10 +258,10 @@ def sync_turn(
sid = session_id or self._session_id or "default"
self._pending_ingest = (user_content, assistant_content, sid)

def get_tool_schemas(self) -> List[Dict[str, Any]]:
def get_tool_schemas(self) -> list[dict[str, Any]]:
return [MEMORY_SEARCH_SCHEMA]

def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
def handle_tool_call(self, tool_name: str, args: dict[str, Any], **kwargs) -> str:
if tool_name != "memory_search":
return tool_error(f"Unknown tool: {tool_name}")

Expand All @@ -265,6 +274,7 @@ def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> st

try:
from config import OWNER

resp = self._bridge.search(query, max_results=8, owner=OWNER)
hits = resp.get("hits") or resp.get("memories") or []
if not hits:
Expand All @@ -285,6 +295,7 @@ def on_memory_write(self, action: str, target: str, content: str) -> None:
return

from config import OWNER

label = "user_profile" if target == "user" else "memory"
messages = [
{"role": "system", "content": f"[{label}] {content}"},
Expand All @@ -298,14 +309,16 @@ def _write():
owner=OWNER,
)
self._bridge.flush()
logger.info("MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content))
logger.info(
"MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content)
)
except Exception as e:
logger.warning("MemTensor on_memory_write failed: %s", e)

t = threading.Thread(target=_write, daemon=True, name="memtensor-memory-write")
t.start()

def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
def on_session_end(self, messages: list[dict[str, Any]]) -> None:
if not self._bridge:
return
# Flush any deferred ingest that hasn't been picked up by queue_prefetch
Expand All @@ -314,6 +327,7 @@ def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
if pending:
try:
from config import OWNER

user_content, assistant_content, sid = pending
msgs = []
if user_content:
Expand All @@ -334,10 +348,8 @@ def shutdown(self) -> None:
if t and t.is_alive():
t.join(timeout=5.0)
if self._bridge:
try:
with contextlib.suppress(Exception):
self._bridge.shutdown()
except Exception:
pass
self._bridge = None


Expand Down
32 changes: 20 additions & 12 deletions apps/memos-local-plugin/adapters/hermes/bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

from __future__ import annotations

import contextlib
import json
import logging
import os
import socket
import subprocess
import threading

from typing import Any

from config import find_bridge_script, get_bridge_config, get_daemon_port, OWNER, _get_plugin_root
from config import OWNER, _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_port


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,10 +48,8 @@ def send(self, data: str) -> str:

def close(self) -> None:
if self._sock:
try:
with contextlib.suppress(OSError):
self._sock.close()
except OSError:
pass
self._sock = None


Expand Down Expand Up @@ -133,20 +134,27 @@ def call(self, method: str, params: dict[str, Any] | None = None) -> Any:
raise RuntimeError(f"Bridge error: {resp['error']}")
return resp.get("result", {})

def search(self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER) -> dict:
return self.call("search", {
"query": query,
"maxResults": max_results,
"minScore": min_score,
"owner": owner,
})
def search(
self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER
) -> dict:
return self.call(
"search",
{
"query": query,
"maxResults": max_results,
"minScore": min_score,
"owner": owner,
},
)

def ingest(self, messages: list[dict], session_id: str = "default", owner: str = OWNER) -> None:
params: dict[str, Any] = {"messages": messages, "sessionId": session_id, "owner": owner}
self.call("ingest", params)

def build_prompt(self, query: str, max_results: int = 6, owner: str = OWNER) -> dict:
return self.call("build_prompt", {"query": query, "maxResults": max_results, "owner": owner})
return self.call(
"build_prompt", {"query": query, "maxResults": max_results, "owner": owner}
)

def flush(self) -> None:
self.call("flush")
Expand Down
5 changes: 4 additions & 1 deletion apps/memos-local-plugin/adapters/hermes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import json
import os

from pathlib import Path


DAEMON_PORT = 18992
VIEWER_PORT = 18901
OWNER = "hermes"
Expand Down Expand Up @@ -139,6 +141,7 @@ def _resolve_tsx(plugin_root: Path) -> str:
if local_tsx.exists():
return str(local_tsx)
import shutil

global_tsx = shutil.which("tsx")
if global_tsx:
return global_tsx
Expand Down Expand Up @@ -170,7 +173,7 @@ def find_bridge_script() -> list[str]:
return ["node", str(candidate)]
tsx = _resolve_tsx(candidate.parent)
if " " in tsx:
return tsx.split() + [str(candidate)]
return [*tsx.split(), str(candidate)]
return [tsx, str(candidate)]

raise FileNotFoundError(
Expand Down
Loading
Loading