diff --git a/apps/backend/agent_graphiti_test.py b/apps/backend/agent_graphiti_test.py new file mode 100644 index 0000000000..3148abd3c1 --- /dev/null +++ b/apps/backend/agent_graphiti_test.py @@ -0,0 +1,44 @@ +import asyncio +import os +from pathlib import Path +from datetime import datetime, timezone +from integrations.graphiti.memory import GraphitiMemory + +spec_dir = Path.home() / ".auto-claude" / "agent_style_spec" +spec_dir.mkdir(parents=True, exist_ok=True) +project_dir_env = os.getenv("PROJECT_DIR") +# Prefer a portable PROJECT_DIR override; otherwise fall back to the repository root +# (…/apps/backend/agent_graphiti_test.py -> parents[2] == repo root). +project_dir = ( + Path(project_dir_env).expanduser().resolve() + if project_dir_env + else Path(__file__).resolve().parents[2] +) +project_dir.mkdir(parents=True, exist_ok=True) + +msg = f"agent-style write {datetime.now(timezone.utc).isoformat()}" + +async def main(): + mem = GraphitiMemory(spec_dir=spec_dir, project_dir=project_dir) + print("is_enabled:", getattr(mem, "is_enabled", None)) + + # Write (Agent-style) – with a timeout so it doesn't hang forever + ok = await asyncio.wait_for( + mem.save_session_insights( + session_num=int(datetime.now(timezone.utc).timestamp()), + insights={"type":"agent_style_test","message":msg,"timestamp":datetime.now(timezone.utc).isoformat()}, + ), + timeout=120, + ) + print("write_ok:", ok) + + # Read (Agent-style) + hits = await asyncio.wait_for(mem.get_relevant_context(query=msg, num_results=20), timeout=60) + print("hits:", len(hits) if hits else 0) + if hits: + print("first_hit:", hits[0]) + + await mem.close() + print("spec_dir_used:", spec_dir) + +asyncio.run(main()) diff --git a/apps/backend/agents/memory_manager.py b/apps/backend/agents/memory_manager.py index 3ff8d951b2..85512a4144 100644 --- a/apps/backend/agents/memory_manager.py +++ b/apps/backend/agents/memory_manager.py @@ -114,7 +114,9 @@ async def get_graphiti_context( return None try: - from graphiti_memory import GraphitiMemory + # Prefer the stable in-package import. Some deployments (e.g. app bundles) + # don't ship a top-level `graphiti_memory` shim. + from integrations.graphiti.memory import GraphitiMemory # Create memory manager memory = GraphitiMemory(spec_dir, project_dir) @@ -286,7 +288,9 @@ async def save_session_memory( debug("memory", "Attempting PRIMARY storage: Graphiti") try: - from graphiti_memory import GraphitiMemory + # Prefer the stable in-package import. Some deployments (e.g. app bundles) + # don't ship a top-level `graphiti_memory` shim. + from integrations.graphiti.memory import GraphitiMemory memory = GraphitiMemory(spec_dir, project_dir) diff --git a/apps/backend/file_lock.py b/apps/backend/file_lock.py new file mode 100644 index 0000000000..18a6506997 --- /dev/null +++ b/apps/backend/file_lock.py @@ -0,0 +1,369 @@ +""" +File Locking Utilities (Single Source of Truth) +============================================== + +Thread-safe und process-safe File-Locking Utilities, die sowohl vom GitHub Runner +als auch von Graphiti/Memory-Queries genutzt werden. + +Wichtig: +- Dieses Modul ist die **einzige** Implementierung. +- `apps/backend/runners/github/file_lock.py` re-exportet von hier (Backward-Compat), + damit keine doppelte Logik gepflegt werden muss. + +Technik: +- Unix: `fcntl.flock()` +- Windows: `msvcrt.locking()` +""" + +from __future__ import annotations + +import asyncio +import json +import os +import tempfile +import time +import warnings +from collections.abc import Callable +from contextlib import asynccontextmanager, contextmanager +from pathlib import Path +from typing import Any + +_IS_WINDOWS = os.name == "nt" +_WINDOWS_LOCK_SIZE = 1024 * 1024 + +try: + import fcntl # type: ignore +except ImportError: # pragma: no cover + fcntl = None + +try: + import msvcrt # type: ignore +except ImportError: # pragma: no cover + msvcrt = None + + +def _try_lock(fd: int, exclusive: bool) -> None: + if _IS_WINDOWS: + if msvcrt is None: + raise FileLockError("msvcrt is required for file locking on Windows") + if not exclusive: + warnings.warn( + "Shared file locks are not supported on Windows; using exclusive lock", + RuntimeWarning, + stacklevel=3, + ) + msvcrt.locking(fd, msvcrt.LK_NBLCK, _WINDOWS_LOCK_SIZE) + return + + if fcntl is None: + raise FileLockError("fcntl is required for file locking on non-Windows platforms") + + lock_mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH + fcntl.flock(fd, lock_mode | fcntl.LOCK_NB) + + +def _unlock(fd: int) -> None: + if _IS_WINDOWS: + if msvcrt is None: + warnings.warn( + "msvcrt unavailable; cannot unlock file descriptor", + RuntimeWarning, + stacklevel=3, + ) + return + msvcrt.locking(fd, msvcrt.LK_UNLCK, _WINDOWS_LOCK_SIZE) + return + + if fcntl is None: + warnings.warn( + "fcntl unavailable; cannot unlock file descriptor", + RuntimeWarning, + stacklevel=3, + ) + return + fcntl.flock(fd, fcntl.LOCK_UN) + + +class FileLockError(Exception): + """Raised when file locking operations fail.""" + + +class FileLockTimeout(FileLockError): + """Raised when lock acquisition times out.""" + + +class FileLock: + """ + Cross-process file lock using platform-specific locking (fcntl.flock on Unix, + msvcrt.locking on Windows). + + Supports both sync and async context managers for flexible usage. + + Args: + filepath: Path to file to lock (lock file: sibling `*.lock`) + timeout: Maximum seconds to wait for lock (default: 10.0) + exclusive: Whether to use exclusive lock (default: True) + """ + + def __init__( + self, + filepath: str | Path, + timeout: float = 10.0, + exclusive: bool = True, + ): + self.filepath = Path(filepath) + self.timeout = timeout + self.exclusive = exclusive + self._lock_file: Path | None = None + self._fd: int | None = None + + def _get_lock_file(self) -> Path: + """Get lock file path (separate .lock file).""" + return self.filepath.parent / f"{self.filepath.name}.lock" + + def _acquire_lock(self) -> None: + """Acquire the file lock (blocking with timeout).""" + self._lock_file = self._get_lock_file() + self._lock_file.parent.mkdir(parents=True, exist_ok=True) + + try: + # Open lock file + self._fd = os.open(str(self._lock_file), os.O_CREAT | os.O_RDWR) + + # Try to acquire lock with timeout + start_time = time.time() + + while True: + try: + # Non-blocking lock attempt + _try_lock(self._fd, self.exclusive) + return # Lock acquired + except (BlockingIOError, OSError) as e: + # Lock held by another process + elapsed = time.time() - start_time + if elapsed >= self.timeout: + raise FileLockTimeout( + f"Failed to acquire lock on {self.filepath} within {self.timeout}s" + ) from e + + # Wait a bit before retrying + time.sleep(0.01) + except Exception: + # Ensure file descriptor is always closed on any failure during acquisition + if self._fd is not None: + try: + os.close(self._fd) + except Exception: + pass + finally: + self._fd = None + raise + + def _release_lock(self) -> None: + """Release the file lock.""" + if self._fd is not None: + try: + _unlock(self._fd) + os.close(self._fd) + except Exception: + pass # Best effort cleanup + finally: + self._fd = None + + # Clean up lock file + if self._lock_file and self._lock_file.exists(): + try: + self._lock_file.unlink() + except Exception: + pass # Best effort cleanup + + def __enter__(self): + """Synchronous context manager entry.""" + self._acquire_lock() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Synchronous context manager exit.""" + self._release_lock() + return False + + async def __aenter__(self): + """Async context manager entry.""" + # Run blocking lock acquisition in thread pool + await asyncio.get_running_loop().run_in_executor(None, self._acquire_lock) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await asyncio.get_running_loop().run_in_executor(None, self._release_lock) + return False + + +@contextmanager +def atomic_write(filepath: str | Path, mode: str = "w"): + """ + Atomic file write using temp file and rename. + + Writes to a temp file first, then atomically replaces target file + using os.replace() which is atomic on POSIX systems. + """ + filepath = Path(filepath) + filepath.parent.mkdir(parents=True, exist_ok=True) + + # Create temp file in same directory for atomic rename + fd, tmp_path = tempfile.mkstemp( + dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" + ) + + try: + with os.fdopen(fd, mode) as f: + yield f + os.replace(tmp_path, filepath) + except Exception: + try: + os.unlink(tmp_path) + except Exception: + pass + raise + + +@asynccontextmanager +async def locked_write( + filepath: str | Path, timeout: float = 10.0, mode: str = "w" +) -> Any: + """ + Async context manager combining file locking and atomic writes. + + Acquires exclusive lock, writes to temp file, atomically replaces target. + """ + filepath = Path(filepath) + + lock = FileLock(filepath, timeout=timeout, exclusive=True) + await lock.__aenter__() + + try: + fd, tmp_path = await asyncio.get_running_loop().run_in_executor( + None, + lambda: tempfile.mkstemp( + dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" + ), + ) + + try: + f = os.fdopen(fd, mode) + try: + yield f + finally: + f.close() + + await asyncio.get_running_loop().run_in_executor( + None, os.replace, tmp_path, filepath + ) + except Exception: + try: + await asyncio.get_running_loop().run_in_executor(None, os.unlink, tmp_path) + except Exception: + pass + raise + finally: + await lock.__aexit__(None, None, None) + + +@asynccontextmanager +async def locked_read(filepath: str | Path, timeout: float = 10.0) -> Any: + """ + Async context manager for locked file reading. + + Acquires shared lock for reading (multiple concurrent readers allowed). + """ + filepath = Path(filepath) + if not filepath.exists(): + raise FileNotFoundError(f"File not found: {filepath}") + + lock = FileLock(filepath, timeout=timeout, exclusive=False) + await lock.__aenter__() + + try: + with open(filepath) as f: + yield f + finally: + await lock.__aexit__(None, None, None) + + +async def locked_json_write( + filepath: str | Path, data: Any, timeout: float = 10.0, indent: int = 2 +) -> None: + """Write JSON with locking + atomicity.""" + async with locked_write(filepath, timeout=timeout) as f: + json.dump(data, f, indent=indent) + + +async def locked_json_read(filepath: str | Path, timeout: float = 10.0) -> Any: + """Read JSON with locking.""" + async with locked_read(filepath, timeout=timeout) as f: + return json.load(f) + + +async def locked_json_update( + filepath: str | Path, + updater: Callable[[Any], Any], + timeout: float = 10.0, + indent: int = 2, +) -> Any: + """ + Atomic read-modify-write update for JSON files. + """ + filepath = Path(filepath) + + lock = FileLock(filepath, timeout=timeout, exclusive=True) + await lock.__aenter__() + + try: + def _read_json(): + if filepath.exists(): + with open(filepath) as f: + return json.load(f) + return None + + data = await asyncio.get_running_loop().run_in_executor(None, _read_json) + updated_data = updater(data) + + fd, tmp_path = await asyncio.get_running_loop().run_in_executor( + None, + lambda: tempfile.mkstemp( + dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" + ), + ) + + try: + with os.fdopen(fd, "w") as f: + json.dump(updated_data, f, indent=indent) + + await asyncio.get_running_loop().run_in_executor( + None, os.replace, tmp_path, filepath + ) + except Exception: + try: + await asyncio.get_running_loop().run_in_executor(None, os.unlink, tmp_path) + except Exception: + pass + raise + + return updated_data + finally: + await lock.__aexit__(None, None, None) + + +__all__ = [ + "FileLock", + "FileLockError", + "FileLockTimeout", + "atomic_write", + "locked_write", + "locked_read", + "locked_json_write", + "locked_json_read", + "locked_json_update", +] + + diff --git a/apps/backend/graphiti_memory.py b/apps/backend/graphiti_memory.py new file mode 100644 index 0000000000..b1a65cf166 --- /dev/null +++ b/apps/backend/graphiti_memory.py @@ -0,0 +1,5 @@ +"""Backward compatibility shim - import from integrations.graphiti.memory instead.""" + +from integrations.graphiti.memory import * # noqa: F403 + + diff --git a/apps/backend/integrations/graphiti/queries_pkg/client.py b/apps/backend/integrations/graphiti/queries_pkg/client.py index c1961484ac..7173907609 100644 --- a/apps/backend/integrations/graphiti/queries_pkg/client.py +++ b/apps/backend/integrations/graphiti/queries_pkg/client.py @@ -5,6 +5,7 @@ Uses LadybugDB as the embedded graph database (no Docker required, Python 3.12+). """ +import asyncio import logging import sys from datetime import datetime, timezone @@ -72,6 +73,8 @@ def __init__(self, config: GraphitiConfig): self._llm_client = None self._embedder = None self._initialized = False + # Guards against concurrent initialize() calls racing and doing duplicate work. + self._init_lock = asyncio.Lock() @property def graphiti(self): @@ -83,9 +86,9 @@ def is_initialized(self) -> bool: """Check if client is initialized.""" return self._initialized - async def initialize(self, state: GraphitiState | None = None) -> bool: + async def _initialize_unlocked(self, state: GraphitiState | None = None) -> bool: """ - Initialize the Graphiti client with configured providers. + Initialize the Graphiti client with configured providers (caller must synchronize). Args: state: Optional GraphitiState for tracking initialization status @@ -93,12 +96,10 @@ async def initialize(self, state: GraphitiState | None = None) -> bool: Returns: True if initialization succeeded """ - if self._initialized: - return True - try: # Import Graphiti core from graphiti_core import Graphiti + from graphiti_core.cross_encoder.client import CrossEncoderClient # Import our provider factory from graphiti_providers import ( @@ -133,6 +134,22 @@ async def initialize(self, state: GraphitiState | None = None) -> bool: logger.warning(f"Embedder provider configuration error: {e}") return False + # graphiti-core defaults to OpenAI's reranker when cross_encoder=None, + # which hard-requires an OpenAI API key. If the user isn't using OpenAI, + # pass a safe no-op cross-encoder to prevent accidental OpenAI init. + cross_encoder = None + llm_provider = (self.config.llm_provider or "").lower() + embedder_provider = (self.config.embedder_provider or "").lower() + if llm_provider != "openai" and embedder_provider != "openai": + class _NoOpCrossEncoder(CrossEncoderClient): + async def rank( + self, query: str, passages: list[str] + ) -> list[tuple[str, float]]: + # Keep ordering stable; assign equal scores. + return [(p, 1.0) for p in passages] + + cross_encoder = _NoOpCrossEncoder() + # Apply LadybugDB monkeypatch to use it via graphiti's KuzuDriver if not _apply_ladybug_monkeypatch(): logger.error( @@ -172,20 +189,26 @@ async def initialize(self, state: GraphitiState | None = None) -> bool: graph_driver=self._driver, llm_client=self._llm_client, embedder=self._embedder, + cross_encoder=cross_encoder, ) - # Build indices (first time only) - if not state or not state.indices_built: - logger.info("Building Graphiti indices and constraints...") - await self._graphiti.build_indices_and_constraints() - - if state: - state.indices_built = True - state.initialized = True - state.database = self.config.database - state.created_at = datetime.now(timezone.utc).isoformat() - state.llm_provider = self.config.llm_provider - state.embedder_provider = self.config.embedder_provider + # Build indices and constraints. + # + # Even if our per-spec state says indices were built, the underlying DB + # may have been deleted/recreated (common in dev) which would make FTS + # indexes missing and break saves/searches with Binder errors. + # The operation is designed to be idempotent (or to skip "already exists"), + # so we run it unconditionally for robustness. + logger.info("Ensuring Graphiti indices and constraints exist...") + await self._graphiti.build_indices_and_constraints() + + if state: + state.indices_built = True + state.initialized = True + state.database = self.config.database + state.created_at = datetime.now(timezone.utc).isoformat() + state.llm_provider = self.config.llm_provider + state.embedder_provider = self.config.embedder_provider self._initialized = True logger.info( @@ -205,6 +228,29 @@ async def initialize(self, state: GraphitiState | None = None) -> bool: logger.warning(f"Failed to initialize Graphiti client: {e}") return False + async def initialize(self, state: GraphitiState | None = None) -> bool: + """ + Initialize the Graphiti client with configured providers. + + This method is concurrency-safe: concurrent callers will synchronize so that + initialization runs at most once. + + Args: + state: Optional GraphitiState for tracking initialization status + + Returns: + True if initialization succeeded + """ + # Fast-path: common case when already initialized. + if self._initialized: + return True + + async with self._init_lock: + # Double-check after acquiring lock (another task may have initialized). + if self._initialized: + return True + return await self._initialize_unlocked(state) + async def close(self) -> None: """ Close the Graphiti client and clean up connections. diff --git a/apps/backend/integrations/graphiti/queries_pkg/queries.py b/apps/backend/integrations/graphiti/queries_pkg/queries.py index 3ec5db707f..13fa4806db 100644 --- a/apps/backend/integrations/graphiti/queries_pkg/queries.py +++ b/apps/backend/integrations/graphiti/queries_pkg/queries.py @@ -6,8 +6,11 @@ import json import logging +import os from datetime import datetime, timezone +from file_lock import FileLock + from .schema import ( EPISODE_TYPE_CODEBASE_DISCOVERY, EPISODE_TYPE_GOTCHA, @@ -40,6 +43,20 @@ def __init__(self, client, group_id: str, spec_context_id: str): self.group_id = group_id self.spec_context_id = spec_context_id + def _lock_timeout_seconds(self) -> float: + try: + return float(os.environ.get("AUTO_CLAUDE_MEMORY_LOCK_TIMEOUT", "10")) + except Exception: + return 10.0 + + def _db_lock_path(self): + # Lock on the DB path (file/dir) used by the driver + return self.client.config.get_db_path() + + async def _add_episode_locked(self, **kwargs) -> None: + async with FileLock(self._db_lock_path(), timeout=self._lock_timeout_seconds()): + await self.client.graphiti.add_episode(**kwargs) + async def add_session_insight( self, session_num: int, @@ -66,7 +83,7 @@ async def add_session_insight( **insights, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"session_{session_num:03d}_{self.spec_context_id}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -110,7 +127,7 @@ async def add_codebase_discoveries( "files": discoveries, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"codebase_discovery_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -146,7 +163,7 @@ async def add_pattern(self, pattern: str) -> bool: "pattern": pattern, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"pattern_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -182,7 +199,7 @@ async def add_gotcha(self, gotcha: str) -> bool: "gotcha": gotcha, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"gotcha_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -230,7 +247,7 @@ async def add_task_outcome( **(metadata or {}), } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"task_outcome_{task_id}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -281,7 +298,7 @@ async def add_structured_insights(self, insights: dict) -> bool: "gotchas": file_insight.get("gotchas", []), } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"file_insight_{file_insight.get('path', 'unknown').replace('/', '_')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -324,7 +341,7 @@ async def add_structured_insights(self, insights: dict) -> bool: "example": example, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"pattern_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S%f')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -365,7 +382,7 @@ async def add_structured_insights(self, insights: dict) -> bool: "solution": solution, } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"gotcha_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S%f')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -402,7 +419,7 @@ async def add_structured_insights(self, insights: dict) -> bool: "changed_files": insights.get("changed_files", []), } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"task_outcome_{subtask_id}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, @@ -435,7 +452,7 @@ async def add_structured_insights(self, insights: dict) -> bool: "success": insights.get("success", False), } - await self.client.graphiti.add_episode( + await self._add_episode_locked( name=f"recommendations_{insights.get('subtask_id', 'unknown')}", episode_body=json.dumps(episode_content), source=EpisodeType.text, diff --git a/apps/backend/memory/graphiti_helpers.py b/apps/backend/memory/graphiti_helpers.py index c74eb92a88..c7dcca45a9 100644 --- a/apps/backend/memory/graphiti_helpers.py +++ b/apps/backend/memory/graphiti_helpers.py @@ -49,7 +49,9 @@ def get_graphiti_memory(spec_dir: Path, project_dir: Path | None = None): return None try: - from graphiti_memory import GraphitiMemory + # Prefer the stable in-package import. Some deployments (e.g. app bundles) + # don't ship a top-level `graphiti_memory` shim. + from integrations.graphiti.memory import GraphitiMemory if project_dir is None: project_dir = spec_dir.parent.parent diff --git a/apps/backend/query_memory.py b/apps/backend/query_memory.py index c16f82d943..af6468de65 100644 --- a/apps/backend/query_memory.py +++ b/apps/backend/query_memory.py @@ -25,6 +25,8 @@ from datetime import datetime from pathlib import Path +from file_lock import FileLock, FileLockTimeout + # Apply LadybugDB monkeypatch BEFORE any graphiti imports def apply_monkeypatch(): @@ -61,6 +63,34 @@ def serialize_value(val): return val +def result_to_rows(result) -> list[dict]: + """ + Convert a kuzu/LadybugDB query result into a list of dictionaries. + + Avoids requiring optional pandas dependency (get_as_df()) which is not + guaranteed to be present in the bundled Python environment. + """ + try: + rows = list(result.rows_as_dict()) + # Normalize any non-serializable values + return [ + {k: serialize_value(v) for k, v in (row or {}).items()} for row in rows + ] + except Exception: + # Best-effort fallback + rows = [] + try: + while result.has_next(): + row = result.get_next() + if isinstance(row, dict): + rows.append({k: serialize_value(v) for k, v in row.items()}) + else: + rows.append({"value": serialize_value(row)}) + except Exception: + pass + return rows + + def output_json(success: bool, data=None, error: str = None): """Output JSON result to stdout and exit.""" result = {"success": success} @@ -79,8 +109,24 @@ def output_error(message: str): output_json(False, error=message) -def get_db_connection(db_path: str, database: str): - """Get a database connection.""" +def _get_db_full_path(db_path: str, database: str) -> Path: + # Expand ~ for UI-provided paths + return Path(db_path).expanduser() / database + + +def _lock_timeout_seconds() -> float: + try: + return float(os.environ.get("AUTO_CLAUDE_MEMORY_LOCK_TIMEOUT", "10")) + except Exception: + return 10.0 + + +def get_db_connection(db_path: str, database: str, *, create_if_missing: bool = False): + """Get a database connection. + + If create_if_missing is True, this will create/open the database and + best-effort initialize the schema required for basic queries. + """ try: # Try to import kuzu (might be real_ladybug via monkeypatch or native) try: @@ -88,12 +134,33 @@ def get_db_connection(db_path: str, database: str): except ImportError: import real_ladybug as kuzu - full_path = Path(db_path) / database + full_path = _get_db_full_path(db_path, database) if not full_path.exists(): - return None, f"Database not found at {full_path}" + if not create_if_missing: + return None, f"Database not found at {full_path}" + full_path.parent.mkdir(parents=True, exist_ok=True) db = kuzu.Database(str(full_path)) conn = kuzu.Connection(db) + + # Best-effort schema init when creating/opening fresh DB. + if create_if_missing: + try: + conn.execute("INSTALL fts") + except Exception: + pass + try: + conn.execute("LOAD EXTENSION fts") + except Exception: + pass + try: + from graphiti_core.driver.kuzu_driver import SCHEMA_QUERIES + + conn.execute(SCHEMA_QUERIES) + except Exception: + # Still allow DB to be used for whatever queries are possible + pass + return conn, None except Exception as e: return None, str(e) @@ -132,18 +199,22 @@ def cmd_get_status(args): continue databases.append(item.name) - # Try to connect and verify - conn, error = get_db_connection(str(db_path), database) + # Try to connect and verify (non-destructive: do NOT create DB here) + conn, error = get_db_connection(str(db_path), database, create_if_missing=False) connected = conn is not None if connected: try: # Test query - result = conn.execute("RETURN 1 as test") - _ = result.get_as_df() + conn.execute("RETURN 1 as test") except Exception as e: connected = False error = str(e) + finally: + try: + conn.close() + except Exception: + pass output_json( True, @@ -166,58 +237,72 @@ def cmd_get_memories(args): output_error("Neither kuzu nor LadybugDB is installed") return - conn, error = get_db_connection(args.db_path, args.database) - if not conn: - output_error(error or "Failed to connect to database") - return - + full_path = _get_db_full_path(args.db_path, args.database) try: - limit = args.limit or 20 - - # Query episodic nodes with parameterized query - query = """ - MATCH (e:Episodic) - RETURN e.uuid as uuid, e.name as name, e.created_at as created_at, - e.content as content, e.source_description as description, - e.group_id as group_id - ORDER BY e.created_at DESC - LIMIT $limit - """ - - result = conn.execute(query, parameters={"limit": limit}) - df = result.get_as_df() - - memories = [] - for _, row in df.iterrows(): - memory = { - "id": row.get("uuid") or row.get("name", "unknown"), - "name": row.get("name", ""), - "type": infer_episode_type(row.get("name", ""), row.get("content", "")), - "timestamp": row.get("created_at") or datetime.now().isoformat(), - "content": row.get("content") - or row.get("description") - or row.get("name", ""), - "description": row.get("description", ""), - "group_id": row.get("group_id", ""), - } + with FileLock(full_path, timeout=_lock_timeout_seconds()): + conn, error = get_db_connection( + args.db_path, args.database, create_if_missing=True + ) + if not conn: + output_error(error or "Failed to connect to database") + return + + try: + limit = args.limit or 20 + + # Query episodic nodes with parameterized query + query = """ + MATCH (e:Episodic) + RETURN e.uuid as uuid, e.name as name, e.created_at as created_at, + e.content as content, e.source_description as description, + e.group_id as group_id + ORDER BY e.created_at DESC + LIMIT $limit + """ + + result = conn.execute(query, parameters={"limit": limit}) + rows = result_to_rows(result) + + memories = [] + for row in rows: + memory = { + "id": row.get("uuid") or row.get("name", "unknown"), + "name": row.get("name", ""), + "type": infer_episode_type( + row.get("name", ""), row.get("content", "") + ), + "timestamp": row.get("created_at") or datetime.now().isoformat(), + "content": row.get("content") + or row.get("description") + or row.get("name", ""), + "description": row.get("description", ""), + "group_id": row.get("group_id", ""), + } - # Extract session number if present - session_num = extract_session_number(row.get("name", "")) - if session_num: - memory["session_number"] = session_num + # Extract session number if present + session_num = extract_session_number(row.get("name", "")) + if session_num: + memory["session_number"] = session_num - memories.append(memory) + memories.append(memory) - output_json(True, data={"memories": memories, "count": len(memories)}) + output_json(True, data={"memories": memories, "count": len(memories)}) - except Exception as e: - # Table might not exist yet - if "Episodic" in str(e) and ( - "not exist" in str(e).lower() or "cannot" in str(e).lower() - ): - output_json(True, data={"memories": [], "count": 0}) - else: - output_error(f"Query failed: {e}") + except Exception as e: + # Table might not exist yet + if "Episodic" in str(e) and ( + "not exist" in str(e).lower() or "cannot" in str(e).lower() + ): + output_json(True, data={"memories": [], "count": 0}) + else: + output_error(f"Query failed: {e}") + finally: + try: + conn.close() + except Exception: + pass + except FileLockTimeout as e: + output_error(str(e)) def cmd_search(args): @@ -226,66 +311,86 @@ def cmd_search(args): output_error("Neither kuzu nor LadybugDB is installed") return - conn, error = get_db_connection(args.db_path, args.database) - if not conn: - output_error(error or "Failed to connect to database") - return - + full_path = _get_db_full_path(args.db_path, args.database) try: - limit = args.limit or 20 - search_query = args.query.lower() - - # Search in episodic nodes using CONTAINS with parameterized query - query = """ - MATCH (e:Episodic) - WHERE toLower(e.name) CONTAINS $search_query - OR toLower(e.content) CONTAINS $search_query - OR toLower(e.source_description) CONTAINS $search_query - RETURN e.uuid as uuid, e.name as name, e.created_at as created_at, - e.content as content, e.source_description as description, - e.group_id as group_id - ORDER BY e.created_at DESC - LIMIT $limit - """ - - result = conn.execute( - query, parameters={"search_query": search_query, "limit": limit} - ) - df = result.get_as_df() - - memories = [] - for _, row in df.iterrows(): - memory = { - "id": row.get("uuid") or row.get("name", "unknown"), - "name": row.get("name", ""), - "type": infer_episode_type(row.get("name", ""), row.get("content", "")), - "timestamp": row.get("created_at") or datetime.now().isoformat(), - "content": row.get("content") - or row.get("description") - or row.get("name", ""), - "description": row.get("description", ""), - "group_id": row.get("group_id", ""), - "score": 1.0, # Keyword match score - } - - session_num = extract_session_number(row.get("name", "")) - if session_num: - memory["session_number"] = session_num - - memories.append(memory) - - output_json( - True, - data={"memories": memories, "count": len(memories), "query": args.query}, - ) + with FileLock(full_path, timeout=_lock_timeout_seconds()): + conn, error = get_db_connection( + args.db_path, args.database, create_if_missing=True + ) + if not conn: + output_error(error or "Failed to connect to database") + return + + try: + limit = args.limit or 20 + search_query = args.query.lower() + + # Search in episodic nodes using CONTAINS with parameterized query + query = """ + MATCH (e:Episodic) + WHERE toLower(e.name) CONTAINS $search_query + OR toLower(e.content) CONTAINS $search_query + OR toLower(e.source_description) CONTAINS $search_query + RETURN e.uuid as uuid, e.name as name, e.created_at as created_at, + e.content as content, e.source_description as description, + e.group_id as group_id + ORDER BY e.created_at DESC + LIMIT $limit + """ + + result = conn.execute( + query, parameters={"search_query": search_query, "limit": limit} + ) + rows = result_to_rows(result) + + memories = [] + for row in rows: + memory = { + "id": row.get("uuid") or row.get("name", "unknown"), + "name": row.get("name", ""), + "type": infer_episode_type( + row.get("name", ""), row.get("content", "") + ), + "timestamp": row.get("created_at") or datetime.now().isoformat(), + "content": row.get("content") + or row.get("description") + or row.get("name", ""), + "description": row.get("description", ""), + "group_id": row.get("group_id", ""), + "score": 1.0, # Keyword match score + } - except Exception as e: - if "Episodic" in str(e) and ( - "not exist" in str(e).lower() or "cannot" in str(e).lower() - ): - output_json(True, data={"memories": [], "count": 0, "query": args.query}) - else: - output_error(f"Search failed: {e}") + session_num = extract_session_number(row.get("name", "")) + if session_num: + memory["session_number"] = session_num + + memories.append(memory) + + output_json( + True, + data={ + "memories": memories, + "count": len(memories), + "query": args.query, + }, + ) + + except Exception as e: + if "Episodic" in str(e) and ( + "not exist" in str(e).lower() or "cannot" in str(e).lower() + ): + output_json( + True, data={"memories": [], "count": 0, "query": args.query} + ) + else: + output_error(f"Search failed: {e}") + finally: + try: + conn.close() + except Exception: + pass + except FileLockTimeout as e: + output_error(str(e)) def cmd_semantic_search(args): @@ -304,14 +409,16 @@ def cmd_semantic_search(args): # No embedder configured, fall back to keyword search return cmd_search(args) - # Try semantic search + # Try semantic search (serialize access with DB lock) try: - result = asyncio.run(_async_semantic_search(args)) - if result.get("success"): - output_json(True, data=result.get("data")) - else: - # Semantic search failed, fall back to keyword search - return cmd_search(args) + full_path = _get_db_full_path(args.db_path, args.database) + with FileLock(full_path, timeout=_lock_timeout_seconds()): + result = asyncio.run(_async_semantic_search(args)) + if result.get("success"): + output_json(True, data=result.get("data")) + else: + # Semantic search failed, fall back to keyword search + return cmd_search(args) except Exception as e: # Any error, fall back to keyword search sys.stderr.write(f"Semantic search failed, falling back to keyword: {e}\n") @@ -443,49 +550,61 @@ def cmd_get_entities(args): output_error("Neither kuzu nor LadybugDB is installed") return - conn, error = get_db_connection(args.db_path, args.database) - if not conn: - output_error(error or "Failed to connect to database") - return - + full_path = _get_db_full_path(args.db_path, args.database) try: - limit = args.limit or 20 - - # Query entity nodes with parameterized query - query = """ - MATCH (e:Entity) - RETURN e.uuid as uuid, e.name as name, e.summary as summary, - e.created_at as created_at - ORDER BY e.created_at DESC - LIMIT $limit - """ - - result = conn.execute(query, parameters={"limit": limit}) - df = result.get_as_df() - - entities = [] - for _, row in df.iterrows(): - if not row.get("summary"): - continue - - entity = { - "id": row.get("uuid") or row.get("name", "unknown"), - "name": row.get("name", ""), - "type": infer_entity_type(row.get("name", "")), - "timestamp": row.get("created_at") or datetime.now().isoformat(), - "content": row.get("summary", ""), - } - entities.append(entity) + with FileLock(full_path, timeout=_lock_timeout_seconds()): + conn, error = get_db_connection( + args.db_path, args.database, create_if_missing=True + ) + if not conn: + output_error(error or "Failed to connect to database") + return + + try: + limit = args.limit or 20 + + # Query entity nodes with parameterized query + query = """ + MATCH (e:Entity) + RETURN e.uuid as uuid, e.name as name, e.summary as summary, + e.created_at as created_at + ORDER BY e.created_at DESC + LIMIT $limit + """ + + result = conn.execute(query, parameters={"limit": limit}) + rows = result_to_rows(result) + + entities = [] + for row in rows: + if not row.get("summary"): + continue + + entity = { + "id": row.get("uuid") or row.get("name", "unknown"), + "name": row.get("name", ""), + "type": infer_entity_type(row.get("name", "")), + "timestamp": row.get("created_at") or datetime.now().isoformat(), + "content": row.get("summary", ""), + } + entities.append(entity) - output_json(True, data={"entities": entities, "count": len(entities)}) + output_json(True, data={"entities": entities, "count": len(entities)}) - except Exception as e: - if "Entity" in str(e) and ( - "not exist" in str(e).lower() or "cannot" in str(e).lower() - ): - output_json(True, data={"entities": [], "count": 0}) - else: - output_error(f"Query failed: {e}") + except Exception as e: + if "Entity" in str(e) and ( + "not exist" in str(e).lower() or "cannot" in str(e).lower() + ): + output_json(True, data={"entities": [], "count": 0}) + else: + output_error(f"Query failed: {e}") + finally: + try: + conn.close() + except Exception: + pass + except FileLockTimeout as e: + output_error(str(e)) def infer_episode_type(name: str, content: str = "") -> str: diff --git a/apps/backend/runners/github/file_lock.py b/apps/backend/runners/github/file_lock.py index 065d2028e0..891132547a 100644 --- a/apps/backend/runners/github/file_lock.py +++ b/apps/backend/runners/github/file_lock.py @@ -1,481 +1,38 @@ """ -File Locking for Concurrent Operations -===================================== +Backward-Compatibility Re-Exports / Rückwärtskompatible Re-Exports +================================================================= -Thread-safe and process-safe file locking utilities for GitHub automation. -Uses fcntl.flock() on Unix systems and msvcrt.locking() on Windows for proper -cross-process locking. - -Example Usage: - # Simple file locking - async with FileLock("path/to/file.json", timeout=5.0): - # Do work with locked file - pass - - # Atomic write with locking - async with locked_write("path/to/file.json", timeout=5.0) as f: - json.dump(data, f) +Die Implementierung wurde nach `apps/backend/file_lock.py` verschoben, um +Code-Duplikation zu vermeiden. Dieser Modulpfad bleibt bestehen, damit bestehende +Imports im GitHub Runner nicht brechen. +The implementation was moved to `apps/backend/file_lock.py` to avoid code +duplication. This module path remains in place so existing imports in the GitHub +Runner do not break. """ from __future__ import annotations -import asyncio -import json -import os -import tempfile -import time -import warnings -from collections.abc import Callable -from contextlib import asynccontextmanager, contextmanager -from pathlib import Path -from typing import Any - -_IS_WINDOWS = os.name == "nt" -_WINDOWS_LOCK_SIZE = 1024 * 1024 - -try: - import fcntl # type: ignore -except ImportError: # pragma: no cover - fcntl = None - -try: - import msvcrt # type: ignore -except ImportError: # pragma: no cover - msvcrt = None - - -def _try_lock(fd: int, exclusive: bool) -> None: - if _IS_WINDOWS: - if msvcrt is None: - raise FileLockError("msvcrt is required for file locking on Windows") - if not exclusive: - warnings.warn( - "Shared file locks are not supported on Windows; using exclusive lock", - RuntimeWarning, - stacklevel=3, - ) - msvcrt.locking(fd, msvcrt.LK_NBLCK, _WINDOWS_LOCK_SIZE) - return - - if fcntl is None: - raise FileLockError( - "fcntl is required for file locking on non-Windows platforms" - ) - - lock_mode = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH - fcntl.flock(fd, lock_mode | fcntl.LOCK_NB) - - -def _unlock(fd: int) -> None: - if _IS_WINDOWS: - if msvcrt is None: - warnings.warn( - "msvcrt unavailable; cannot unlock file descriptor", - RuntimeWarning, - stacklevel=3, - ) - return - msvcrt.locking(fd, msvcrt.LK_UNLCK, _WINDOWS_LOCK_SIZE) - return - - if fcntl is None: - warnings.warn( - "fcntl unavailable; cannot unlock file descriptor", - RuntimeWarning, - stacklevel=3, - ) - return - fcntl.flock(fd, fcntl.LOCK_UN) - - -class FileLockError(Exception): - """Raised when file locking operations fail.""" - - pass - - -class FileLockTimeout(FileLockError): - """Raised when lock acquisition times out.""" - - pass - - -class FileLock: - """ - Cross-process file lock using platform-specific locking (fcntl.flock on Unix, - msvcrt.locking on Windows). - - Supports both sync and async context managers for flexible usage. - - Args: - filepath: Path to file to lock (will be created if needed) - timeout: Maximum seconds to wait for lock (default: 5.0) - exclusive: Whether to use exclusive lock (default: True) - - Example: - # Synchronous usage - with FileLock("/path/to/file.json"): - # File is locked - pass - - # Asynchronous usage - async with FileLock("/path/to/file.json"): - # File is locked - pass - """ - - def __init__( - self, - filepath: str | Path, - timeout: float = 5.0, - exclusive: bool = True, - ): - self.filepath = Path(filepath) - self.timeout = timeout - self.exclusive = exclusive - self._lock_file: Path | None = None - self._fd: int | None = None - - def _get_lock_file(self) -> Path: - """Get lock file path (separate .lock file).""" - return self.filepath.parent / f"{self.filepath.name}.lock" - - def _acquire_lock(self) -> None: - """Acquire the file lock (blocking with timeout).""" - self._lock_file = self._get_lock_file() - self._lock_file.parent.mkdir(parents=True, exist_ok=True) - - # Open lock file - self._fd = os.open(str(self._lock_file), os.O_CREAT | os.O_RDWR) - - # Try to acquire lock with timeout - start_time = time.time() - - while True: - try: - # Non-blocking lock attempt - _try_lock(self._fd, self.exclusive) - return # Lock acquired - except (BlockingIOError, OSError): - # Lock held by another process - elapsed = time.time() - start_time - if elapsed >= self.timeout: - os.close(self._fd) - self._fd = None - raise FileLockTimeout( - f"Failed to acquire lock on {self.filepath} within " - f"{self.timeout}s" - ) - - # Wait a bit before retrying - time.sleep(0.01) - - def _release_lock(self) -> None: - """Release the file lock.""" - if self._fd is not None: - try: - _unlock(self._fd) - os.close(self._fd) - except Exception: - pass # Best effort cleanup - finally: - self._fd = None - - # Clean up lock file - if self._lock_file and self._lock_file.exists(): - try: - self._lock_file.unlink() - except Exception: - pass # Best effort cleanup - - def __enter__(self): - """Synchronous context manager entry.""" - self._acquire_lock() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Synchronous context manager exit.""" - self._release_lock() - return False - - async def __aenter__(self): - """Async context manager entry.""" - # Run blocking lock acquisition in thread pool - await asyncio.get_running_loop().run_in_executor(None, self._acquire_lock) - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Async context manager exit.""" - await asyncio.get_running_loop().run_in_executor(None, self._release_lock) - return False - - -@contextmanager -def atomic_write(filepath: str | Path, mode: str = "w"): - """ - Atomic file write using temp file and rename. - - Writes to .tmp file first, then atomically replaces target file - using os.replace() which is atomic on POSIX systems. - - Args: - filepath: Target file path - mode: File open mode (default: "w") - - Example: - with atomic_write("/path/to/file.json") as f: - json.dump(data, f) - """ - filepath = Path(filepath) - filepath.parent.mkdir(parents=True, exist_ok=True) - - # Create temp file in same directory for atomic rename - fd, tmp_path = tempfile.mkstemp( - dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" - ) - - try: - # Open temp file with requested mode - with os.fdopen(fd, mode) as f: - yield f - - # Atomic replace - succeeds or fails completely - os.replace(tmp_path, filepath) - - except Exception: - # Clean up temp file on error - try: - os.unlink(tmp_path) - except Exception: - pass - raise - - -@asynccontextmanager -async def locked_write( - filepath: str | Path, timeout: float = 5.0, mode: str = "w" -) -> Any: - """ - Async context manager combining file locking and atomic writes. - - Acquires exclusive lock, writes to temp file, atomically replaces target. - This is the recommended way to safely write shared state files. - - Args: - filepath: Target file path - timeout: Lock timeout in seconds (default: 5.0) - mode: File open mode (default: "w") - - Example: - async with locked_write("/path/to/file.json", timeout=5.0) as f: - json.dump(data, f, indent=2) - - Raises: - FileLockTimeout: If lock cannot be acquired within timeout - """ - filepath = Path(filepath) - - # Acquire lock - lock = FileLock(filepath, timeout=timeout, exclusive=True) - await lock.__aenter__() - - try: - # Atomic write in thread pool (since it uses sync file I/O) - fd, tmp_path = await asyncio.get_running_loop().run_in_executor( - None, - lambda: tempfile.mkstemp( - dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" - ), - ) - - try: - # Open temp file and yield to caller - f = os.fdopen(fd, mode) - try: - yield f - finally: - f.close() - - # Atomic replace - await asyncio.get_running_loop().run_in_executor( - None, os.replace, tmp_path, filepath - ) - - except Exception: - # Clean up temp file on error - try: - await asyncio.get_running_loop().run_in_executor( - None, os.unlink, tmp_path - ) - except Exception: - pass - raise - - finally: - # Release lock - await lock.__aexit__(None, None, None) - - -@asynccontextmanager -async def locked_read(filepath: str | Path, timeout: float = 5.0) -> Any: - """ - Async context manager for locked file reading. - - Acquires shared lock for reading, allowing multiple concurrent readers - but blocking writers. - - Args: - filepath: File path to read - timeout: Lock timeout in seconds (default: 5.0) - - Example: - async with locked_read("/path/to/file.json", timeout=5.0) as f: - data = json.load(f) - - Raises: - FileLockTimeout: If lock cannot be acquired within timeout - FileNotFoundError: If file doesn't exist - """ - filepath = Path(filepath) - - if not filepath.exists(): - raise FileNotFoundError(f"File not found: {filepath}") - - # Acquire shared lock (allows multiple readers) - lock = FileLock(filepath, timeout=timeout, exclusive=False) - await lock.__aenter__() - - try: - # Open file for reading - with open(filepath) as f: - yield f - finally: - # Release lock - await lock.__aexit__(None, None, None) - - -async def locked_json_write( - filepath: str | Path, data: Any, timeout: float = 5.0, indent: int = 2 -) -> None: - """ - Helper function for writing JSON with locking and atomicity. - - Args: - filepath: Target file path - data: Data to serialize as JSON - timeout: Lock timeout in seconds (default: 5.0) - indent: JSON indentation (default: 2) - - Example: - await locked_json_write("/path/to/file.json", {"key": "value"}) - - Raises: - FileLockTimeout: If lock cannot be acquired within timeout - """ - async with locked_write(filepath, timeout=timeout) as f: - json.dump(data, f, indent=indent) - - -async def locked_json_read(filepath: str | Path, timeout: float = 5.0) -> Any: - """ - Helper function for reading JSON with locking. - - Args: - filepath: File path to read - timeout: Lock timeout in seconds (default: 5.0) - - Returns: - Parsed JSON data - - Example: - data = await locked_json_read("/path/to/file.json") - - Raises: - FileLockTimeout: If lock cannot be acquired within timeout - FileNotFoundError: If file doesn't exist - json.JSONDecodeError: If file contains invalid JSON - """ - async with locked_read(filepath, timeout=timeout) as f: - return json.load(f) - - -async def locked_json_update( - filepath: str | Path, - updater: Callable[[Any], Any], - timeout: float = 5.0, - indent: int = 2, -) -> Any: - """ - Helper for atomic read-modify-write of JSON files. - - Acquires exclusive lock, reads current data, applies updater function, - writes updated data atomically. - - Args: - filepath: File path to update - updater: Function that takes current data and returns updated data - timeout: Lock timeout in seconds (default: 5.0) - indent: JSON indentation (default: 2) - - Returns: - Updated data - - Example: - def add_item(data): - data["items"].append({"new": "item"}) - return data - - updated = await locked_json_update("/path/to/file.json", add_item) - - Raises: - FileLockTimeout: If lock cannot be acquired within timeout - """ - filepath = Path(filepath) - - # Acquire exclusive lock - lock = FileLock(filepath, timeout=timeout, exclusive=True) - await lock.__aenter__() - - try: - # Read current data - def _read_json(): - if filepath.exists(): - with open(filepath) as f: - return json.load(f) - return None - - data = await asyncio.get_running_loop().run_in_executor(None, _read_json) - - # Apply update function - updated_data = updater(data) - - # Write atomically - fd, tmp_path = await asyncio.get_running_loop().run_in_executor( - None, - lambda: tempfile.mkstemp( - dir=filepath.parent, prefix=f".{filepath.name}.tmp.", suffix="" - ), - ) - - try: - with os.fdopen(fd, "w") as f: - json.dump(updated_data, f, indent=indent) - - await asyncio.get_running_loop().run_in_executor( - None, os.replace, tmp_path, filepath - ) - - except Exception: - try: - await asyncio.get_running_loop().run_in_executor( - None, os.unlink, tmp_path - ) - except Exception: - pass - raise - - return updated_data - - finally: - await lock.__aexit__(None, None, None) +from ...file_lock import ( # noqa: F401 + FileLock, + FileLockError, + FileLockTimeout, + atomic_write, + locked_json_read, + locked_json_update, + locked_json_write, + locked_read, + locked_write, +) + +__all__ = [ + "FileLock", + "FileLockError", + "FileLockTimeout", + "atomic_write", + "locked_write", + "locked_read", + "locked_json_write", + "locked_json_read", + "locked_json_update", +] diff --git a/apps/frontend/src/main/agent/agent-process.ts b/apps/frontend/src/main/agent/agent-process.ts index 553f2e2907..45e8aed0bb 100644 --- a/apps/frontend/src/main/agent/agent-process.ts +++ b/apps/frontend/src/main/agent/agent-process.ts @@ -1,16 +1,21 @@ -import { spawn } from 'child_process'; -import path from 'path'; -import { existsSync, readFileSync } from 'fs'; -import { app } from 'electron'; -import { EventEmitter } from 'events'; -import { AgentState } from './agent-state'; -import { AgentEvents } from './agent-events'; -import { ProcessType, ExecutionProgressData } from './types'; -import { detectRateLimit, createSDKRateLimitInfo, getProfileEnv, detectAuthFailure } from '../rate-limit-detector'; -import { projectStore } from '../project-store'; -import { getClaudeProfileManager } from '../claude-profile-manager'; -import { parsePythonCommand, validatePythonPath } from '../python-detector'; -import { pythonEnvManager, getConfiguredPythonPath } from '../python-env-manager'; +import { spawn } from "child_process"; +import path from "path"; +import { existsSync, readFileSync } from "fs"; +import { app } from "electron"; +import { EventEmitter } from "events"; +import { AgentState } from "./agent-state"; +import { AgentEvents } from "./agent-events"; +import { ProcessType, ExecutionProgressData } from "./types"; +import { + detectRateLimit, + createSDKRateLimitInfo, + getProfileEnv, + detectAuthFailure, +} from "../rate-limit-detector"; +import { projectStore } from "../project-store"; +import { getClaudeProfileManager } from "../claude-profile-manager"; +import { parsePythonCommand, validatePythonPath } from "../python-detector"; +import { pythonEnvManager, getConfiguredPythonPath } from "../python-env-manager"; /** * Process spawning and lifecycle management @@ -22,7 +27,7 @@ export class AgentProcessManager { // Python path will be configured by pythonEnvManager after venv is ready // Use null to indicate not yet configured - getPythonPath() will use fallback private _pythonPath: string | null = null; - private autoBuildSourcePath: string = ''; + private autoBuildSourcePath: string = ""; constructor(state: AgentState, events: AgentEvents, emitter: EventEmitter) { this.state = state; @@ -36,8 +41,12 @@ export class AgentProcessManager { if (validation.valid) { this._pythonPath = validation.sanitizedPath || pythonPath; } else { - console.error(`[AgentProcess] Invalid Python path rejected: ${validation.reason}`); - console.error(`[AgentProcess] Falling back to getConfiguredPythonPath()`); + console.error( + `[AgentProcess] Invalid Python path rejected: ${validation.reason}` + ); + console.error( + `[AgentProcess] Falling back to getConfiguredPythonPath()` + ); // Don't set _pythonPath - let getPythonPath() use getConfiguredPythonPath() fallback } } @@ -54,26 +63,70 @@ export class AgentProcessManager { ...process.env, ...extraEnv, ...profileEnv, - PYTHONUNBUFFERED: '1', - PYTHONIOENCODING: 'utf-8', - PYTHONUTF8: '1' + PYTHONUNBUFFERED: "1", + PYTHONIOENCODING: "utf-8", + PYTHONUTF8: "1", } as NodeJS.ProcessEnv; } + /** + * Load environment variables from a .env file (simple KEY=VALUE parser). + * This is intentionally lightweight (no dotenv dependency in the Electron main bundle path). + */ + private loadEnvFile(envPath: string): Record { + if (!existsSync(envPath)) return {}; + try { + const envContent = readFileSync(envPath, "utf-8"); + const envVars: Record = {}; + + // Handle both Unix (\n) and Windows (\r\n) line endings + for (const line of envContent.split(/\r?\n/)) { + const trimmed = line.trim(); + // Skip comments and empty lines + if (!trimmed || trimmed.startsWith("#")) { + continue; + } + + const eqIndex = trimmed.indexOf("="); + if (eqIndex > 0) { + const key = trimmed.substring(0, eqIndex).trim(); + let value = trimmed.substring(eqIndex + 1).trim(); + + // Remove quotes if present + if ( + (value.startsWith('"') && value.endsWith('"')) || + (value.startsWith("'") && value.endsWith("'")) + ) { + value = value.slice(1, -1); + } + + envVars[key] = value; + } + } + + return envVars; + } catch { + return {}; + } + } + private handleProcessFailure( taskId: string, allOutput: string, processType: ProcessType ): boolean { - console.log('[AgentProcess] Checking for rate limit in output (last 500 chars):', allOutput.slice(-500)); + console.log( + "[AgentProcess] Checking for rate limit in output (last 500 chars):", + allOutput.slice(-500) + ); const rateLimitDetection = detectRateLimit(allOutput); - console.log('[AgentProcess] Rate limit detection result:', { + console.log("[AgentProcess] Rate limit detection result:", { isRateLimited: rateLimitDetection.isRateLimited, resetTime: rateLimitDetection.resetTime, limitType: rateLimitDetection.limitType, profileId: rateLimitDetection.profileId, - suggestedProfile: rateLimitDetection.suggestedProfile + suggestedProfile: rateLimitDetection.suggestedProfile, }); if (rateLimitDetection.isRateLimited) { @@ -84,10 +137,15 @@ export class AgentProcessManager { ); if (wasHandled) return true; - const source = processType === 'spec-creation' ? 'roadmap' : 'task'; - const rateLimitInfo = createSDKRateLimitInfo(source, rateLimitDetection, { taskId }); - console.log('[AgentProcess] Emitting sdk-rate-limit event (manual):', rateLimitInfo); - this.emitter.emit('sdk-rate-limit', rateLimitInfo); + const source = processType === "spec-creation" ? "roadmap" : "task"; + const rateLimitInfo = createSDKRateLimitInfo(source, rateLimitDetection, { + taskId, + }); + console.log( + "[AgentProcess] Emitting sdk-rate-limit event (manual):", + rateLimitInfo + ); + this.emitter.emit("sdk-rate-limit", rateLimitInfo); return true; } @@ -102,63 +160,97 @@ export class AgentProcessManager { const profileManager = getClaudeProfileManager(); const autoSwitchSettings = profileManager.getAutoSwitchSettings(); - console.log('[AgentProcess] Auto-switch settings:', { + console.log("[AgentProcess] Auto-switch settings:", { enabled: autoSwitchSettings.enabled, autoSwitchOnRateLimit: autoSwitchSettings.autoSwitchOnRateLimit, - proactiveSwapEnabled: autoSwitchSettings.proactiveSwapEnabled + proactiveSwapEnabled: autoSwitchSettings.proactiveSwapEnabled, }); - if (!autoSwitchSettings.enabled || !autoSwitchSettings.autoSwitchOnRateLimit) { - console.log('[AgentProcess] Auto-switch disabled - showing manual modal'); + if ( + !autoSwitchSettings.enabled || + !autoSwitchSettings.autoSwitchOnRateLimit + ) { + console.log("[AgentProcess] Auto-switch disabled - showing manual modal"); return false; } const currentProfileId = rateLimitDetection.profileId; - const bestProfile = profileManager.getBestAvailableProfile(currentProfileId); - - console.log('[AgentProcess] Best available profile:', bestProfile ? { - id: bestProfile.id, - name: bestProfile.name - } : 'NONE'); + const bestProfile = + profileManager.getBestAvailableProfile(currentProfileId); + + console.log( + "[AgentProcess] Best available profile:", + bestProfile + ? { + id: bestProfile.id, + name: bestProfile.name, + } + : "NONE" + ); if (!bestProfile) { - console.log('[AgentProcess] No alternative profile available - falling back to manual modal'); + console.log( + "[AgentProcess] No alternative profile available - falling back to manual modal" + ); return false; } - console.log('[AgentProcess] AUTO-SWAP: Switching from', currentProfileId, 'to', bestProfile.id); + console.log( + "[AgentProcess] AUTO-SWAP: Switching from", + currentProfileId, + "to", + bestProfile.id + ); profileManager.setActiveProfile(bestProfile.id); - const source = processType === 'spec-creation' ? 'roadmap' : 'task'; - const rateLimitInfo = createSDKRateLimitInfo(source, rateLimitDetection, { taskId }); + const source = processType === "spec-creation" ? "roadmap" : "task"; + const rateLimitInfo = createSDKRateLimitInfo(source, rateLimitDetection, { + taskId, + }); rateLimitInfo.wasAutoSwapped = true; - rateLimitInfo.swappedToProfile = { id: bestProfile.id, name: bestProfile.name }; - rateLimitInfo.swapReason = 'reactive'; - - console.log('[AgentProcess] Emitting sdk-rate-limit event (auto-swapped):', rateLimitInfo); - this.emitter.emit('sdk-rate-limit', rateLimitInfo); - - console.log('[AgentProcess] Emitting auto-swap-restart-task event for task:', taskId); - this.emitter.emit('auto-swap-restart-task', taskId, bestProfile.id); + rateLimitInfo.swappedToProfile = { + id: bestProfile.id, + name: bestProfile.name, + }; + rateLimitInfo.swapReason = "reactive"; + + console.log( + "[AgentProcess] Emitting sdk-rate-limit event (auto-swapped):", + rateLimitInfo + ); + this.emitter.emit("sdk-rate-limit", rateLimitInfo); + + console.log( + "[AgentProcess] Emitting auto-swap-restart-task event for task:", + taskId + ); + this.emitter.emit("auto-swap-restart-task", taskId, bestProfile.id); return true; } private handleAuthFailure(taskId: string, allOutput: string): boolean { - console.log('[AgentProcess] No rate limit detected - checking for auth failure'); + console.log( + "[AgentProcess] No rate limit detected - checking for auth failure" + ); const authFailureDetection = detectAuthFailure(allOutput); if (authFailureDetection.isAuthFailure) { - console.log('[AgentProcess] Auth failure detected:', authFailureDetection); - this.emitter.emit('auth-failure', taskId, { + console.log( + "[AgentProcess] Auth failure detected:", + authFailureDetection + ); + this.emitter.emit("auth-failure", taskId, { profileId: authFailureDetection.profileId, failureType: authFailureDetection.failureType, message: authFailureDetection.message, - originalError: authFailureDetection.originalError + originalError: authFailureDetection.originalError, }); return true; } - console.log('[AgentProcess] Process failed but no rate limit or auth failure detected'); + console.log( + "[AgentProcess] Process failed but no rate limit or auth failure detected" + ); return false; } @@ -172,6 +264,24 @@ export class AgentProcessManager { if (this._pythonPath) { return this._pythonPath; } + + // Dev quality-of-life: Prefer the repo backend venv if available. + // This avoids accidentally running against an older bundled backend that + // may not include recent shim modules (e.g. graphiti_memory). + if (!app.isPackaged) { + const autoBuildSource = this.getAutoBuildSourcePath(); + if (autoBuildSource) { + const venvPython = + process.platform === "win32" + ? path.join(autoBuildSource, ".venv", "Scripts", "python.exe") + : path.join(autoBuildSource, ".venv", "bin", "python"); + const validation = validatePythonPath(venvPython); + if (validation.valid) { + return validation.sanitizedPath || venvPython; + } + } + } + // Otherwise use the global configured path (venv if ready, else bundled/system) return getConfiguredPythonPath(); } @@ -182,7 +292,9 @@ export class AgentProcessManager { getAutoBuildSourcePath(): string | null { // Use runners/spec_runner.py as the validation marker - this is the file actually needed const validatePath = (p: string): boolean => { - return existsSync(p) && existsSync(path.join(p, 'runners', 'spec_runner.py')); + return ( + existsSync(p) && existsSync(path.join(p, "runners", "spec_runner.py")) + ); }; // If manually configured AND valid, use that @@ -192,16 +304,23 @@ export class AgentProcessManager { // Auto-detect from app location (configured path was invalid or not set) const possiblePaths = [ - // Dev mode: from dist/main -> ../../backend (apps/frontend/out/main -> apps/backend) - path.resolve(__dirname, '..', '..', '..', 'backend'), + // Dev mode: from dist/main -> ../../../backend (apps/frontend/out/main -> apps/backend) + // out/main -> out -> frontend -> apps -> backend + path.resolve(__dirname, "..", "..", "..", "..", "backend"), + // Dev mode (alternative cwd): if cwd is apps/frontend, ../backend is apps/backend + path.resolve(process.cwd(), "..", "backend"), + // Dev mode (repo root): apps/backend + path.resolve(process.cwd(), "apps", "backend"), // Alternative: from app root -> apps/backend - path.resolve(app.getAppPath(), '..', 'backend'), - // If running from repo root with apps structure - path.resolve(process.cwd(), 'apps', 'backend') + path.resolve(app.getAppPath(), "..", "backend"), ]; for (const p of possiblePaths) { if (validatePath(p)) { + // Helpful breadcrumb in dev when debugging backend selection. + if (!app.isPackaged) { + console.warn("[AgentProcess] Auto-build source selected:", p); + } return p; } } @@ -221,8 +340,9 @@ export class AgentProcessManager { if (project?.settings) { // Graphiti MCP integration if (project.settings.graphitiMcpEnabled) { - const graphitiUrl = project.settings.graphitiMcpUrl || 'http://localhost:8000/mcp/'; - env['GRAPHITI_MCP_URL'] = graphitiUrl; + const graphitiUrl = + project.settings.graphitiMcpUrl || "http://localhost:8000/mcp/"; + env["GRAPHITI_MCP_URL"] = graphitiUrl; } // CLAUDE.md integration (enabled by default) @@ -231,7 +351,14 @@ export class AgentProcessManager { } } - return env; + // Project-scoped .env (what users expect to configure per-project) + // Example: /projects/snapper-desktop/.auto-claude/.env + const projectAutoBuildDir = project?.autoBuildPath || ".auto-claude"; + const projectEnvPath = path.join(projectPath, projectAutoBuildDir, ".env"); + const projectEnv = this.loadEnvFile(projectEnvPath); + + // Merge: file-based env first, then explicit computed env overrides + return { ...projectEnv, ...env }; } /** @@ -247,42 +374,8 @@ export class AgentProcessManager { return {}; } - const envPath = path.join(projectPath, project.autoBuildPath, '.env'); - if (!existsSync(envPath)) { - return {}; - } - - try { - const envContent = readFileSync(envPath, 'utf-8'); - const envVars: Record = {}; - - // Handle both Unix (\n) and Windows (\r\n) line endings - for (const line of envContent.split(/\r?\n/)) { - const trimmed = line.trim(); - // Skip comments and empty lines - if (!trimmed || trimmed.startsWith('#')) { - continue; - } - - const eqIndex = trimmed.indexOf('='); - if (eqIndex > 0) { - const key = trimmed.substring(0, eqIndex).trim(); - let value = trimmed.substring(eqIndex + 1).trim(); - - // Remove quotes if present - if ((value.startsWith('"') && value.endsWith('"')) || - (value.startsWith("'") && value.endsWith("'"))) { - value = value.slice(1, -1); - } - - envVars[key] = value; - } - } - - return envVars; - } catch { - return {}; - } + const envPath = path.join(projectPath, project.autoBuildPath, ".env"); + return this.loadEnvFile(envPath); } /** @@ -294,42 +387,8 @@ export class AgentProcessManager { return {}; } - const envPath = path.join(autoBuildSource, '.env'); - if (!existsSync(envPath)) { - return {}; - } - - try { - const envContent = readFileSync(envPath, 'utf-8'); - const envVars: Record = {}; - - // Handle both Unix (\n) and Windows (\r\n) line endings - for (const line of envContent.split(/\r?\n/)) { - const trimmed = line.trim(); - // Skip comments and empty lines - if (!trimmed || trimmed.startsWith('#')) { - continue; - } - - const eqIndex = trimmed.indexOf('='); - if (eqIndex > 0) { - const key = trimmed.substring(0, eqIndex).trim(); - let value = trimmed.substring(eqIndex + 1).trim(); - - // Remove quotes if present - if ((value.startsWith('"') && value.endsWith('"')) || - (value.startsWith("'") && value.endsWith("'"))) { - value = value.slice(1, -1); - } - - envVars[key] = value; - } - } - - return envVars; - } catch { - return {}; - } + const envPath = path.join(autoBuildSource, ".env"); + return this.loadEnvFile(envPath); } spawnProcess( @@ -337,9 +396,9 @@ export class AgentProcessManager { cwd: string, args: string[], extraEnv: Record = {}, - processType: ProcessType = 'task-execution' + processType: ProcessType = "task-execution" ): void { - const isSpecRunner = processType === 'spec-creation'; + const isSpecRunner = processType === "spec-creation"; this.killProcess(taskId); const spawnId = this.state.generateSpawnId(); @@ -349,50 +408,72 @@ export class AgentProcessManager { const pythonEnv = pythonEnvManager.getPythonEnv(); // Parse Python command to handle space-separated commands like "py -3" - const [pythonCommand, pythonBaseArgs] = parsePythonCommand(this.getPythonPath()); + const [pythonCommand, pythonBaseArgs] = parsePythonCommand( + this.getPythonPath() + ); const childProcess = spawn(pythonCommand, [...pythonBaseArgs, ...args], { cwd, env: { ...env, // Already includes process.env, extraEnv, profileEnv, PYTHONUNBUFFERED, PYTHONUTF8 - ...pythonEnv // Include Python environment (PYTHONPATH for bundled packages) - } + ...pythonEnv, // Include Python environment (PYTHONPATH for bundled packages) + }, }); this.state.addProcess(taskId, { taskId, process: childProcess, startedAt: new Date(), - spawnId + spawnId, }); - let currentPhase: ExecutionProgressData['phase'] = isSpecRunner ? 'planning' : 'planning'; + let currentPhase: ExecutionProgressData["phase"] = isSpecRunner + ? "planning" + : "planning"; let phaseProgress = 0; let currentSubtask: string | undefined; let lastMessage: string | undefined; - let allOutput = ''; - let stdoutBuffer = ''; - let stderrBuffer = ''; + let allOutput = ""; + let stdoutBuffer = ""; + let stderrBuffer = ""; let sequenceNumber = 0; - this.emitter.emit('execution-progress', taskId, { + this.emitter.emit("execution-progress", taskId, { phase: currentPhase, phaseProgress: 0, overallProgress: this.events.calculateOverallProgress(currentPhase, 0), - message: isSpecRunner ? 'Starting spec creation...' : 'Starting build process...', - sequenceNumber: ++sequenceNumber + message: isSpecRunner + ? "Starting spec creation..." + : "Starting build process...", + sequenceNumber: ++sequenceNumber, }); - const isDebug = ['true', '1', 'yes', 'on'].includes(process.env.DEBUG?.toLowerCase() ?? ''); + const isDebug = ["true", "1", "yes", "on"].includes( + process.env.DEBUG?.toLowerCase() ?? "" + ); + // Mirror ALL Python subprocess output to Electron main console (so it shows up in `npm run dev`) + // This is intentionally separate from DEBUG to keep normal logs clean. + const mirrorPythonLogs = ["true", "1", "yes", "on"].includes( + (process.env.AUTO_CLAUDE_PYTHON_CONSOLE_LOGS ?? "").toLowerCase() + ); const processLog = (line: string) => { allOutput = (allOutput + line).slice(-10000); - const hasMarker = line.includes('__EXEC_PHASE__'); + const hasMarker = line.includes("__EXEC_PHASE__"); if (isDebug && hasMarker) { - console.log(`[PhaseDebug:${taskId}] Found marker in line: "${line.substring(0, 200)}"`); + console.log( + `[PhaseDebug:${taskId}] Found marker in line: "${line.substring( + 0, + 200 + )}"` + ); } - const phaseUpdate = this.events.parseExecutionPhase(line, currentPhase, isSpecRunner); + const phaseUpdate = this.events.parseExecutionPhase( + line, + currentPhase, + isSpecRunner + ); if (isDebug && hasMarker) { console.log(`[PhaseDebug:${taskId}] Parse result:`, phaseUpdate); @@ -402,7 +483,9 @@ export class AgentProcessManager { const phaseChanged = phaseUpdate.phase !== currentPhase; if (isDebug) { - console.log(`[PhaseDebug:${taskId}] Phase update: ${currentPhase} -> ${phaseUpdate.phase} (changed: ${phaseChanged})`); + console.log( + `[PhaseDebug:${taskId}] Phase update: ${currentPhase} -> ${phaseUpdate.phase} (changed: ${phaseChanged})` + ); } currentPhase = phaseUpdate.phase; @@ -420,41 +503,68 @@ export class AgentProcessManager { phaseProgress = Math.min(90, phaseProgress + 5); } - const overallProgress = this.events.calculateOverallProgress(currentPhase, phaseProgress); + const overallProgress = this.events.calculateOverallProgress( + currentPhase, + phaseProgress + ); if (isDebug) { - console.log(`[PhaseDebug:${taskId}] Emitting execution-progress:`, { phase: currentPhase, phaseProgress, overallProgress }); + console.log(`[PhaseDebug:${taskId}] Emitting execution-progress:`, { + phase: currentPhase, + phaseProgress, + overallProgress, + }); } - this.emitter.emit('execution-progress', taskId, { + this.emitter.emit("execution-progress", taskId, { phase: currentPhase, phaseProgress, overallProgress, currentSubtask, message: lastMessage, - sequenceNumber: ++sequenceNumber + sequenceNumber: ++sequenceNumber, }); } }; - const processBufferedOutput = (buffer: string, newData: string): string => { - if (isDebug && newData.includes('__EXEC_PHASE__')) { - console.log(`[PhaseDebug:${taskId}] Raw chunk with marker (${newData.length} bytes): "${newData.substring(0, 300)}"`); - console.log(`[PhaseDebug:${taskId}] Current buffer before append (${buffer.length} bytes): "${buffer.substring(0, 100)}"`); + const processBufferedOutput = ( + buffer: string, + newData: string, + stream: "stdout" | "stderr" + ): string => { + if (isDebug && newData.includes("__EXEC_PHASE__")) { + console.log( + `[PhaseDebug:${taskId}] Raw chunk with marker (${ + newData.length + } bytes): "${newData.substring(0, 300)}"` + ); + console.log( + `[PhaseDebug:${taskId}] Current buffer before append (${ + buffer.length + } bytes): "${buffer.substring(0, 100)}"` + ); } buffer += newData; - const lines = buffer.split('\n'); - const remaining = lines.pop() || ''; - - if (isDebug && newData.includes('__EXEC_PHASE__')) { - console.log(`[PhaseDebug:${taskId}] Split into ${lines.length} complete lines, remaining buffer: "${remaining.substring(0, 100)}"`); + const lines = buffer.split("\n"); + const remaining = lines.pop() || ""; + + if (isDebug && newData.includes("__EXEC_PHASE__")) { + console.log( + `[PhaseDebug:${taskId}] Split into ${ + lines.length + } complete lines, remaining buffer: "${remaining.substring(0, 100)}"` + ); } for (const line of lines) { if (line.trim()) { - this.emitter.emit('log', taskId, line + '\n'); + this.emitter.emit("log", taskId, line + "\n"); processLog(line); + if (mirrorPythonLogs) { + // Prefix so it's easy to grep in the `npm run dev` output + console.warn(`[Py:${taskId}:${stream}] ${line}`); + } if (isDebug) { console.log(`[Agent:${taskId}] ${line}`); } @@ -464,21 +574,29 @@ export class AgentProcessManager { return remaining; }; - childProcess.stdout?.on('data', (data: Buffer) => { - stdoutBuffer = processBufferedOutput(stdoutBuffer, data.toString('utf8')); + childProcess.stdout?.on("data", (data: Buffer) => { + stdoutBuffer = processBufferedOutput( + stdoutBuffer, + data.toString("utf8"), + "stdout" + ); }); - childProcess.stderr?.on('data', (data: Buffer) => { - stderrBuffer = processBufferedOutput(stderrBuffer, data.toString('utf8')); + childProcess.stderr?.on("data", (data: Buffer) => { + stderrBuffer = processBufferedOutput( + stderrBuffer, + data.toString("utf8"), + "stderr" + ); }); - childProcess.on('exit', (code: number | null) => { + childProcess.on("exit", (code: number | null) => { if (stdoutBuffer.trim()) { - this.emitter.emit('log', taskId, stdoutBuffer + '\n'); + this.emitter.emit("log", taskId, stdoutBuffer + "\n"); processLog(stdoutBuffer); } if (stderrBuffer.trim()) { - this.emitter.emit('log', taskId, stderrBuffer + '\n'); + this.emitter.emit("log", taskId, stderrBuffer + "\n"); processLog(stderrBuffer); } @@ -490,41 +608,57 @@ export class AgentProcessManager { } if (code !== 0) { - console.log('[AgentProcess] Process failed with code:', code, 'for task:', taskId); - const wasHandled = this.handleProcessFailure(taskId, allOutput, processType); + console.log( + "[AgentProcess] Process failed with code:", + code, + "for task:", + taskId + ); + const wasHandled = this.handleProcessFailure( + taskId, + allOutput, + processType + ); if (wasHandled) { - this.emitter.emit('exit', taskId, code, processType); + this.emitter.emit("exit", taskId, code, processType); return; } } - if (code !== 0 && currentPhase !== 'complete' && currentPhase !== 'failed') { - this.emitter.emit('execution-progress', taskId, { - phase: 'failed', + if ( + code !== 0 && + currentPhase !== "complete" && + currentPhase !== "failed" + ) { + this.emitter.emit("execution-progress", taskId, { + phase: "failed", phaseProgress: 0, - overallProgress: this.events.calculateOverallProgress(currentPhase, phaseProgress), + overallProgress: this.events.calculateOverallProgress( + currentPhase, + phaseProgress + ), message: `Process exited with code ${code}`, - sequenceNumber: ++sequenceNumber + sequenceNumber: ++sequenceNumber, }); } - this.emitter.emit('exit', taskId, code, processType); + this.emitter.emit("exit", taskId, code, processType); }); // Handle process error - childProcess.on('error', (err: Error) => { - console.error('[AgentProcess] Process error:', err.message); + childProcess.on("error", (err: Error) => { + console.error("[AgentProcess] Process error:", err.message); this.state.deleteProcess(taskId); - this.emitter.emit('execution-progress', taskId, { - phase: 'failed', + this.emitter.emit("execution-progress", taskId, { + phase: "failed", phaseProgress: 0, overallProgress: 0, message: `Error: ${err.message}`, - sequenceNumber: ++sequenceNumber + sequenceNumber: ++sequenceNumber, }); - this.emitter.emit('error', taskId, err.message); + this.emitter.emit("error", taskId, err.message); }); } @@ -539,12 +673,12 @@ export class AgentProcessManager { this.state.markSpawnAsKilled(agentProcess.spawnId); // Send SIGTERM first for graceful shutdown - agentProcess.process.kill('SIGTERM'); + agentProcess.process.kill("SIGTERM"); // Force kill after timeout setTimeout(() => { if (!agentProcess.process.killed) { - agentProcess.process.kill('SIGKILL'); + agentProcess.process.kill("SIGKILL"); } }, 5000); diff --git a/apps/frontend/src/main/index.ts b/apps/frontend/src/main/index.ts index 7cd856a0fe..0f5eddbd1f 100644 --- a/apps/frontend/src/main/index.ts +++ b/apps/frontend/src/main/index.ts @@ -31,17 +31,17 @@ function getIconPath(): string { // In dev mode, __dirname is out/main, so we go up to project root then into resources // In production, resources are in the app's resources folder const resourcesPath = is.dev - ? join(__dirname, '../../resources') + ? join(__dirname, "../../resources") : join(process.resourcesPath); let iconName: string; - if (process.platform === 'darwin') { + if (process.platform === "darwin") { // Use PNG in dev mode (works better), ICNS in production - iconName = is.dev ? 'icon-256.png' : 'icon.icns'; - } else if (process.platform === 'win32') { - iconName = 'icon.ico'; + iconName = is.dev ? "icon-256.png" : "icon.icns"; + } else if (process.platform === "win32") { + iconName = "icon.ico"; } else { - iconName = 'icon.png'; + iconName = "icon.png"; } const iconPath = join(resourcesPath, iconName); @@ -62,61 +62,61 @@ function createWindow(): void { minHeight: 700, show: false, autoHideMenuBar: true, - titleBarStyle: 'hiddenInset', + titleBarStyle: "hiddenInset", trafficLightPosition: { x: 15, y: 10 }, icon: getIconPath(), webPreferences: { - preload: join(__dirname, '../preload/index.mjs'), + preload: join(__dirname, "../preload/index.mjs"), sandbox: false, contextIsolation: true, nodeIntegration: false, - backgroundThrottling: false // Prevent terminal lag when window loses focus - } + backgroundThrottling: false, // Prevent terminal lag when window loses focus + }, }); // Show window when ready to avoid visual flash - mainWindow.on('ready-to-show', () => { + mainWindow.on("ready-to-show", () => { mainWindow?.show(); }); // Handle external links mainWindow.webContents.setWindowOpenHandler((details) => { shell.openExternal(details.url); - return { action: 'deny' }; + return { action: "deny" }; }); // Load the renderer - if (is.dev && process.env['ELECTRON_RENDERER_URL']) { - mainWindow.loadURL(process.env['ELECTRON_RENDERER_URL']); + if (is.dev && process.env["ELECTRON_RENDERER_URL"]) { + mainWindow.loadURL(process.env["ELECTRON_RENDERER_URL"]); } else { - mainWindow.loadFile(join(__dirname, '../renderer/index.html')); + mainWindow.loadFile(join(__dirname, "../renderer/index.html")); } // Open DevTools in development if (is.dev) { - mainWindow.webContents.openDevTools({ mode: 'right' }); + mainWindow.webContents.openDevTools({ mode: "right" }); } // Clean up on close - mainWindow.on('closed', () => { + mainWindow.on("closed", () => { mainWindow = null; }); } // Set app name before ready (for dock tooltip on macOS in dev mode) -app.setName('Auto Claude'); -if (process.platform === 'darwin') { +app.setName("Auto Claude"); +if (process.platform === "darwin") { // Force the name to appear in dock on macOS - app.name = 'Auto Claude'; + app.name = "Auto Claude"; } // Initialize the application app.whenReady().then(() => { // Set app user model id for Windows - electronApp.setAppUserModelId('com.autoclaude.ui'); + electronApp.setAppUserModelId("com.autoclaude.ui"); // Set dock icon on macOS - if (process.platform === 'darwin') { + if (process.platform === "darwin") { const iconPath = getIconPath(); try { const icon = nativeImage.createFromPath(iconPath); @@ -124,13 +124,13 @@ app.whenReady().then(() => { app.dock?.setIcon(icon); } } catch (e) { - console.warn('Could not set dock icon:', e); + console.warn("Could not set dock icon:", e); } } // Default open or close DevTools by F12 in development // and ignore CommandOrControl + R in production. - app.on('browser-window-created', (_, window) => { + app.on("browser-window-created", (_, window) => { optimizer.watchWindowShortcuts(window); }); @@ -197,6 +197,50 @@ app.whenReady().then(() => { } } + // Dev safeguard: It's very easy to have settings.autoBuildPath pointing at the + // installed /Applications/Auto-Claude.app bundle. That makes `npm run dev` + // run against the bundled backend, ignoring repo code changes. + // + // Default behavior in dev: ignore packaged autoBuildPath and let auto-detection + // pick the repo backend. To force using the packaged backend in dev, set: + // AUTO_CLAUDE_DEV_USE_PACKAGED_AUTOBUILD=1 + // NOTE: Use `!app.isPackaged` instead of `is.dev`. In some electron-vite + // setups `is.dev` can be false even though we're running in development. + const debugSourcePath = ['true', '1', 'yes', 'on'].includes( + (process.env.AUTO_CLAUDE_DEBUG_SOURCE_PATH ?? '').toLowerCase() + ); + const devUsePackagedOverride = ['true', '1', 'yes', 'on'].includes( + (process.env.AUTO_CLAUDE_DEV_USE_PACKAGED_AUTOBUILD ?? '').toLowerCase() + ); + + if (debugSourcePath) { + console.warn('[main] AutoBuildPath decision (startup)', { + appIsPackaged: app.isPackaged, + settingsAutoBuildPath: settings.autoBuildPath, + validatedAutoBuildPath: validAutoBuildPath, + devUsePackagedOverride + }); + } + + if (!app.isPackaged && validAutoBuildPath && !devUsePackagedOverride) { + const looksLikeMacAppBundle = + validAutoBuildPath.includes('/Applications/Auto-Claude.app/') || + validAutoBuildPath.includes('/Applications/Auto Claude.app/'); + if (debugSourcePath) { + console.warn('[main] AutoBuildPath decision (dev filter)', { + validatedAutoBuildPath: validAutoBuildPath, + looksLikeMacAppBundle + }); + } + if (looksLikeMacAppBundle) { + console.warn( + '[main] Dev mode: ignoring settings.autoBuildPath pointing at installed app bundle; using auto-detection instead:', + validAutoBuildPath + ); + validAutoBuildPath = undefined; + } + } + if (settings.pythonPath || validAutoBuildPath) { console.warn('[main] Configuring AgentManager with settings:', { pythonPath: settings.pythonPath, @@ -217,7 +261,12 @@ app.whenReady().then(() => { terminalManager = new TerminalManager(() => mainWindow); // Setup IPC handlers (pass pythonEnvManager for Python path management) - setupIpcHandlers(agentManager, terminalManager, () => mainWindow, pythonEnvManager); + setupIpcHandlers( + agentManager, + terminalManager, + () => mainWindow, + pythonEnvManager + ); // Create window createWindow(); @@ -230,41 +279,45 @@ app.whenReady().then(() => { // Start the usage monitor const usageMonitor = getUsageMonitor(); usageMonitor.start(); - console.warn('[main] Usage monitor initialized and started'); + console.warn("[main] Usage monitor initialized and started"); // Log debug mode status - const isDebugMode = process.env.DEBUG === 'true'; + const isDebugMode = process.env.DEBUG === "true"; if (isDebugMode) { - console.warn('[main] ========================================'); - console.warn('[main] DEBUG MODE ENABLED (DEBUG=true)'); - console.warn('[main] ========================================'); + console.warn("[main] ========================================"); + console.warn("[main] DEBUG MODE ENABLED (DEBUG=true)"); + console.warn("[main] ========================================"); } // Initialize app auto-updater (only in production, or when DEBUG_UPDATER is set) - const forceUpdater = process.env.DEBUG_UPDATER === 'true'; + const forceUpdater = process.env.DEBUG_UPDATER === "true"; if (app.isPackaged || forceUpdater) { // Load settings to get beta updates preference const settings = loadSettingsSync(); const betaUpdates = settings.betaUpdates ?? false; initializeAppUpdater(mainWindow, betaUpdates); - console.warn('[main] App auto-updater initialized'); - console.warn(`[main] Beta updates: ${betaUpdates ? 'enabled' : 'disabled'}`); + console.warn("[main] App auto-updater initialized"); + console.warn( + `[main] Beta updates: ${betaUpdates ? "enabled" : "disabled"}` + ); if (forceUpdater && !app.isPackaged) { - console.warn('[main] Updater forced in dev mode via DEBUG_UPDATER=true'); - console.warn('[main] Note: Updates won\'t actually work in dev mode'); + console.warn( + "[main] Updater forced in dev mode via DEBUG_UPDATER=true" + ); + console.warn("[main] Note: Updates won't actually work in dev mode"); } } else { - console.warn('[main] ========================================'); - console.warn('[main] App auto-updater DISABLED (development mode)'); - console.warn('[main] To test updater logging, set DEBUG_UPDATER=true'); - console.warn('[main] Note: Actual updates only work in packaged builds'); - console.warn('[main] ========================================'); + console.warn("[main] ========================================"); + console.warn("[main] App auto-updater DISABLED (development mode)"); + console.warn("[main] To test updater logging, set DEBUG_UPDATER=true"); + console.warn("[main] Note: Actual updates only work in packaged builds"); + console.warn("[main] ========================================"); } } // macOS: re-create window when dock icon is clicked - app.on('activate', () => { + app.on("activate", () => { if (BrowserWindow.getAllWindows().length === 0) { createWindow(); } @@ -272,18 +325,18 @@ app.whenReady().then(() => { }); // Quit when all windows are closed (except on macOS) -app.on('window-all-closed', () => { - if (process.platform !== 'darwin') { +app.on("window-all-closed", () => { + if (process.platform !== "darwin") { app.quit(); } }); // Cleanup before quit -app.on('before-quit', async () => { +app.on("before-quit", async () => { // Stop usage monitor const usageMonitor = getUsageMonitor(); usageMonitor.stop(); - console.warn('[main] Usage monitor stopped'); + console.warn("[main] Usage monitor stopped"); // Kill all running agent processes if (agentManager) { diff --git a/apps/frontend/src/main/ipc-handlers/agent-events-handlers.ts b/apps/frontend/src/main/ipc-handlers/agent-events-handlers.ts index 21986aa2d9..43f3d49e0b 100644 --- a/apps/frontend/src/main/ipc-handlers/agent-events-handlers.ts +++ b/apps/frontend/src/main/ipc-handlers/agent-events-handlers.ts @@ -88,6 +88,14 @@ export function registerAgenteventsHandlers( if (code === 0) { notificationService.notifyReviewNeeded(taskTitle, project.id, taskId); + // Ensure the UI doesn't get stuck in 'in_progress' if the final + // execution-progress marker ("complete") wasn't emitted or parsed + // before process exit. + mainWindow.webContents.send( + IPC_CHANNELS.TASK_STATUS_CHANGE, + taskId, + 'human_review' as TaskStatus + ); } else { notificationService.notifyTaskFailed(taskTitle, project.id, taskId); mainWindow.webContents.send( diff --git a/apps/frontend/src/main/updater/path-resolver.ts b/apps/frontend/src/main/updater/path-resolver.ts index 6c149a5b5a..d8990249bc 100644 --- a/apps/frontend/src/main/updater/path-resolver.ts +++ b/apps/frontend/src/main/updater/path-resolver.ts @@ -61,6 +61,48 @@ export function getEffectiveSourcePath(): string { if (existsSync(settingsPath)) { const settings = JSON.parse(readFileSync(settingsPath, 'utf-8')); if (settings.autoBuildPath && existsSync(settings.autoBuildPath)) { + const debugSourcePath = ['true', '1', 'yes', 'on'].includes( + (process.env.AUTO_CLAUDE_DEBUG_SOURCE_PATH ?? '').toLowerCase() + ); + const devUsePackagedOverride = ['true', '1', 'yes', 'on'].includes( + (process.env.AUTO_CLAUDE_DEV_USE_PACKAGED_AUTOBUILD ?? '').toLowerCase() + ); + if (debugSourcePath) { + console.warn('[path-resolver] EffectiveSourcePath decision', { + appIsPackaged: app.isPackaged, + settingsAutoBuildPath: settings.autoBuildPath, + devUsePackagedOverride + }); + } + + // Dev safeguard: avoid accidentally using the installed /Applications app bundle + // as backend source when running `npm run dev`. + // + // To force using the packaged backend path in dev, set: + // AUTO_CLAUDE_DEV_USE_PACKAGED_AUTOBUILD=1 + if ( + !app.isPackaged && + !devUsePackagedOverride + ) { + const looksLikeMacAppBundle = + settings.autoBuildPath.includes('/Applications/Auto-Claude.app/') || + settings.autoBuildPath.includes('/Applications/Auto Claude.app/'); + if (debugSourcePath) { + console.warn('[path-resolver] Dev filter', { + looksLikeMacAppBundle, + settingsAutoBuildPath: settings.autoBuildPath + }); + } + if (looksLikeMacAppBundle) { + console.warn( + '[path-resolver] Dev mode: ignoring settings.autoBuildPath pointing at installed app bundle; using repo auto-detection instead:', + settings.autoBuildPath + ); + // Fall through to bundled source detection for dev (repo backend). + return getBundledSourcePath(); + } + } + // Validate it's a proper backend source (must have runners/spec_runner.py) const markerPath = path.join(settings.autoBuildPath, 'runners', 'spec_runner.py'); if (existsSync(markerPath)) { diff --git a/tests/test_file_lock.py b/tests/test_file_lock.py new file mode 100644 index 0000000000..ffeb947e1e --- /dev/null +++ b/tests/test_file_lock.py @@ -0,0 +1,33 @@ +import pytest + + +def test_acquire_lock_closes_fd_on_filelockerror(tmp_path, monkeypatch): + """ + Regression test: if lock acquisition raises FileLockError, the opened file + descriptor must be closed to avoid leaks. + """ + import file_lock + + target = tmp_path / "data.json" + lock = file_lock.FileLock(target, timeout=0.01, exclusive=True) + + closed_fds: list[int] = [] + original_close = file_lock.os.close + + def tracking_close(fd: int) -> None: + closed_fds.append(fd) + original_close(fd) + + def boom(_fd: int, _exclusive: bool) -> None: + raise file_lock.FileLockError("simulated failure") + + monkeypatch.setattr(file_lock.os, "close", tracking_close) + monkeypatch.setattr(file_lock, "_try_lock", boom) + + with pytest.raises(file_lock.FileLockError): + lock._acquire_lock() + + assert lock._fd is None + assert len(closed_fds) == 1 + + diff --git a/tests/test_graphiti_client_initialize_lock.py b/tests/test_graphiti_client_initialize_lock.py new file mode 100644 index 0000000000..99fee7a397 --- /dev/null +++ b/tests/test_graphiti_client_initialize_lock.py @@ -0,0 +1,58 @@ +import asyncio +from unittest.mock import MagicMock + +import pytest + +from integrations.graphiti.queries_pkg.client import GraphitiClient + + +class _TestClient(GraphitiClient): + def __init__(self): + super().__init__(config=MagicMock()) + self.unlocked_calls = 0 + + async def _initialize_unlocked(self, state=None) -> bool: # type: ignore[override] + self.unlocked_calls += 1 + # Simulate expensive async initialization to amplify races. + await asyncio.sleep(0.05) + self._initialized = True + return True + + +class _FailOnceClient(GraphitiClient): + def __init__(self): + super().__init__(config=MagicMock()) + self.unlocked_calls = 0 + + async def _initialize_unlocked(self, state=None) -> bool: # type: ignore[override] + self.unlocked_calls += 1 + if self.unlocked_calls == 1: + # Fail without marking initialized (should allow retry). + return False + self._initialized = True + return True + + +@pytest.mark.asyncio +async def test_initialize_concurrent_calls_only_run_once(): + client = _TestClient() + results = await asyncio.gather(*[client.initialize() for _ in range(25)]) + assert all(results) + assert client.unlocked_calls == 1 + + +@pytest.mark.asyncio +async def test_initialize_can_retry_after_failure(): + client = _FailOnceClient() + + first = await client.initialize() + assert first is False + assert client.is_initialized is False + assert client.unlocked_calls == 1 + + second = await client.initialize() + assert second is True + assert client.is_initialized is True + assert client.unlocked_calls == 2 + +