From 4691c880abd315d0884cce30fe855dff5e38efca Mon Sep 17 00:00:00 2001 From: Shamim Rehman Date: Thu, 4 Jun 2026 13:28:34 -0400 Subject: [PATCH] Index Forge incidents for listing and stats --- forge_cli/cli.py | 34 ++- forge_cli/incident_store.py | 404 ++++++++++++++++++++++++++++++++--- forge_cli/mcp_server.py | 59 ++--- tests/test_incident_store.py | 78 +++++++ 4 files changed, 492 insertions(+), 83 deletions(-) diff --git a/forge_cli/cli.py b/forge_cli/cli.py index b29b310..0bfd76b 100644 --- a/forge_cli/cli.py +++ b/forge_cli/cli.py @@ -30,9 +30,10 @@ find_incident_path, generate_id, get_all_incidents, + list_incident_summaries, list_incidents, load_incident, - save_incident, + save_generated_incident, ) from forge_cli.models import ( CAPABILITY_AREA_VALUES, @@ -358,7 +359,7 @@ def log( raise typer.Exit(0) try: - filepath = save_incident(incident, cfg.incidents_dir) + filepath = save_generated_incident(incident, cfg.incidents_dir) except DuplicateIncidentError as e: print_error(str(e)) raise typer.Exit(1) @@ -620,24 +621,17 @@ def stats( print_error(str(e)) raise typer.Exit(1) - incidents = get_all_incidents(cfg.incidents_dir) - - if project: - incidents = [i for i in incidents if i.project == project] - if severity: - incidents = [i for i in incidents if i.severity == severity] - if since: - incidents = [i for i in incidents if i.timestamp >= since] - if issue_class: - incidents = [i for i in incidents if i.issue_class == issue_class] - if capability_area: - incidents = [i for i in incidents if i.capability_area == capability_area] - if lifecycle_stage: - incidents = [i for i in incidents if i.lifecycle_stage == lifecycle_stage] - if workflow_archetype: - incidents = [i for i in incidents if i.workflow_archetype == workflow_archetype] - if blocked_use_class: - incidents = [i for i in incidents if i.blocked_use_class == blocked_use_class] + incidents = list_incident_summaries( + cfg.incidents_dir, + project=project, + severity=severity, + since=since, + issue_class=issue_class, + capability_area=capability_area, + lifecycle_stage=lifecycle_stage, + workflow_archetype=workflow_archetype, + blocked_use_class=blocked_use_class, + ) display_stats(incidents) diff --git a/forge_cli/incident_store.py b/forge_cli/incident_store.py index 355f738..cc18edb 100644 --- a/forge_cli/incident_store.py +++ b/forge_cli/incident_store.py @@ -1,6 +1,11 @@ from __future__ import annotations +from collections import Counter +from contextlib import contextmanager +from dataclasses import dataclass from datetime import date, datetime +import fcntl +import json import os from pathlib import Path import tempfile @@ -43,6 +48,99 @@ class AmbiguousIncidentLookupError(LookupError): """Raised when a suffix lookup matches multiple incidents.""" +INCIDENT_INDEX_VERSION = 1 +INCIDENT_INDEX_FILENAME = ".forge_incident_index.json" +INCIDENT_INDEX_LOCK_FILENAME = ".forge_incident_index.lock" + + +@dataclass(frozen=True) +class IncidentIndexEntry: + id: str + timestamp: str + project: str + severity: str + failure_type: str + platform: str + tags: list[str] + path: str + issue_class: str = "" + capability_area: str = "" + lifecycle_stage: str = "" + workflow_archetype: str = "" + blocked_use_class: str = "" + + @classmethod + def from_dict(cls, data: dict) -> IncidentIndexEntry: + return cls( + id=str(data.get("id", "")), + timestamp=str(data.get("timestamp", "")), + project=str(data.get("project", "")), + severity=str(data.get("severity", "")), + failure_type=str(data.get("failure_type", "")), + platform=str(data.get("platform", "")), + tags=list(data.get("tags", []) or []), + path=str(data.get("path", "")), + issue_class=str(data.get("issue_class", "")), + capability_area=str(data.get("capability_area", "")), + lifecycle_stage=str(data.get("lifecycle_stage", "")), + workflow_archetype=str(data.get("workflow_archetype", "")), + blocked_use_class=str(data.get("blocked_use_class", "")), + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "timestamp": self.timestamp, + "project": self.project, + "severity": self.severity, + "failure_type": self.failure_type, + "platform": self.platform, + "tags": self.tags, + "path": self.path, + "issue_class": self.issue_class, + "capability_area": self.capability_area, + "lifecycle_stage": self.lifecycle_stage, + "workflow_archetype": self.workflow_archetype, + "blocked_use_class": self.blocked_use_class, + } + + def to_incident_summary(self) -> Incident: + return Incident( + id=self.id, + timestamp=self.timestamp, + reported_by="", + project=self.project, + agent="", + platform=self.platform, + severity=self.severity, + failure_type=self.failure_type, + expected_behavior="", + actual_behavior="", + context="", + root_cause="", + immediate_fix="", + systemic_takeaway="", + tags=list(self.tags), + issue_class=self.issue_class, + capability_area=self.capability_area, + lifecycle_stage=self.lifecycle_stage, + workflow_archetype=self.workflow_archetype, + blocked_use_class=self.blocked_use_class, + ) + + +@dataclass(frozen=True) +class IncidentStats: + total: int + by_severity: dict[str, int] + by_type: dict[str, int] + by_project: dict[str, int] + by_platform: dict[str, int] + by_issue_class: dict[str, int] + by_capability_area: dict[str, int] + by_tag: dict[str, int] + + def generate_id(incidents_dir: Path, incident_date: date | None = None) -> str: """Generate the next incident ID for the given date (YYYY-MM-DD-NNN).""" if incident_date is None: @@ -97,9 +195,31 @@ def save_incident(incident: Incident, incidents_dir: Path) -> Path: if tmp_path is not None and tmp_path.exists(): tmp_path.unlink() + _upsert_index_entry(incidents_dir, _entry_from_incident(incident, filepath, incidents_dir)) return filepath +def save_generated_incident( + incident: Incident, + incidents_dir: Path, + *, + max_attempts: int = 1000, +) -> Path: + """Assign a dated incident ID and save, retrying on concurrent duplicates.""" + incident_date = datetime.fromisoformat( + incident.timestamp.replace("Z", "+00:00") + ).date() + for _ in range(max_attempts): + incident.id = generate_id(incidents_dir, incident_date) + try: + return save_incident(incident, incidents_dir) + except DuplicateIncidentError: + continue + raise DuplicateIncidentError( + f"Could not allocate unique incident id for date {incident_date.isoformat()}" + ) + + def load_incident(path: Path) -> Incident: """Load a single incident from a YAML file.""" with open(path) as f: @@ -121,38 +241,27 @@ def list_incidents( limit: int = 10, ) -> list[Incident]: """List incidents with optional filtering, most recent first.""" - all_files = sorted(incidents_dir.rglob("*.yml"), reverse=True) - - # Exclude template-like files (e.g., .gitkeep won't match *.yml) + entries = _filter_index_entries( + _index_entries(incidents_dir), + project=project, + severity=severity, + since=since, + tag=tag, + issue_class=issue_class, + capability_area=capability_area, + lifecycle_stage=lifecycle_stage, + workflow_archetype=workflow_archetype, + blocked_use_class=blocked_use_class, + limit=limit, + ) incidents: list[Incident] = [] - for filepath in all_files: + for entry in entries: + filepath = incidents_dir / entry.path try: incident = load_incident(filepath) except Exception: continue - - if project and incident.project != project: - continue - if severity and incident.severity != severity: - continue - if since and incident.timestamp < since: - continue - if tag and tag not in incident.tags: - continue - if issue_class and incident.issue_class != issue_class: - continue - if capability_area and incident.capability_area != capability_area: - continue - if lifecycle_stage and incident.lifecycle_stage != lifecycle_stage: - continue - if workflow_archetype and incident.workflow_archetype != workflow_archetype: - continue - if blocked_use_class and incident.blocked_use_class != blocked_use_class: - continue - incidents.append(incident) - if len(incidents) >= limit: - break return incidents @@ -166,7 +275,12 @@ def find_incident_path(incidents_dir: Path, incident_id: str) -> Path | None: if exact_path.exists(): return exact_path - matches = sorted(filepath for filepath in incidents_dir.rglob("*.yml") if filepath.stem.endswith(incident_id)) + matches = [ + incidents_dir / entry.path + for entry in _index_entries(incidents_dir) + if entry.id.endswith(incident_id) + ] + matches = sorted(path for path in matches if path.exists()) if len(matches) == 1: return matches[0] if len(matches) > 1: @@ -197,3 +311,239 @@ def get_all_incidents(incidents_dir: Path) -> list[Incident]: except Exception: continue return incidents + + +def list_incident_summaries( + incidents_dir: Path, + project: str | None = None, + severity: str | None = None, + since: str | None = None, + issue_class: str | None = None, + capability_area: str | None = None, + lifecycle_stage: str | None = None, + workflow_archetype: str | None = None, + blocked_use_class: str | None = None, +) -> list[Incident]: + """Return index-backed summary incidents for aggregate displays.""" + return [ + entry.to_incident_summary() + for entry in _filter_index_entries( + _index_entries(incidents_dir), + project=project, + severity=severity, + since=since, + issue_class=issue_class, + capability_area=capability_area, + lifecycle_stage=lifecycle_stage, + workflow_archetype=workflow_archetype, + blocked_use_class=blocked_use_class, + limit=None, + ) + ] + + +def get_incident_stats( + incidents_dir: Path, + project: str | None = None, + severity: str | None = None, + issue_class: str | None = None, + capability_area: str | None = None, +) -> IncidentStats: + """Aggregate incident stats from the compact incident index.""" + entries = _filter_index_entries( + _index_entries(incidents_dir), + project=project, + severity=severity, + issue_class=issue_class, + capability_area=capability_area, + limit=None, + ) + return IncidentStats( + total=len(entries), + by_severity=_sorted_counter(entry.severity for entry in entries if entry.severity), + by_type=_sorted_counter(entry.failure_type for entry in entries if entry.failure_type), + by_project=_sorted_counter(entry.project for entry in entries if entry.project), + by_platform=_sorted_counter(entry.platform for entry in entries if entry.platform), + by_issue_class=_sorted_counter( + entry.issue_class for entry in entries if entry.issue_class + ), + by_capability_area=_sorted_counter( + entry.capability_area for entry in entries if entry.capability_area + ), + by_tag=_sorted_counter(tag for entry in entries for tag in entry.tags), + ) + + +def rebuild_incident_index(incidents_dir: Path) -> list[IncidentIndexEntry]: + with _locked_index(incidents_dir): + return _rebuild_incident_index_unlocked(incidents_dir) + + +def _rebuild_incident_index_unlocked(incidents_dir: Path) -> list[IncidentIndexEntry]: + entries: list[IncidentIndexEntry] = [] + for filepath in sorted(incidents_dir.rglob("*.yml")): + try: + incident = load_incident(filepath) + except Exception: + continue + entries.append(_entry_from_incident(incident, filepath, incidents_dir)) + entries = _sort_index_entries(entries) + _write_index(incidents_dir, entries) + return entries + + +def _index_entries(incidents_dir: Path) -> list[IncidentIndexEntry]: + loaded = _load_index(incidents_dir) + if loaded is not None: + return loaded + return rebuild_incident_index(incidents_dir) + + +def _index_path(incidents_dir: Path) -> Path: + return incidents_dir / INCIDENT_INDEX_FILENAME + + +def _load_index(incidents_dir: Path) -> list[IncidentIndexEntry] | None: + path = _index_path(incidents_dir) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + if payload.get("version") != INCIDENT_INDEX_VERSION: + return None + entries = [IncidentIndexEntry.from_dict(item) for item in payload.get("entries", [])] + except Exception: + return None + return _sort_index_entries(entries) + + +def _write_index(incidents_dir: Path, entries: list[IncidentIndexEntry]) -> None: + incidents_dir.mkdir(parents=True, exist_ok=True) + payload = { + "version": INCIDENT_INDEX_VERSION, + "entries": [entry.to_dict() for entry in _sort_index_entries(entries)], + } + tmp_path: Path | None = None + try: + with tempfile.NamedTemporaryFile( + "w", + dir=incidents_dir, + prefix=f".{INCIDENT_INDEX_FILENAME}.", + suffix=".tmp", + delete=False, + encoding="utf-8", + ) as f: + tmp_path = Path(f.name) + json.dump(payload, f, indent=2, sort_keys=True) + f.write("\n") + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, _index_path(incidents_dir)) + _fsync_directory(incidents_dir) + finally: + if tmp_path is not None and tmp_path.exists(): + tmp_path.unlink() + + +def _upsert_index_entry(incidents_dir: Path, entry: IncidentIndexEntry) -> None: + with _locked_index(incidents_dir): + entries = _load_index(incidents_dir) + if entries is None: + entries = _rebuild_incident_index_unlocked(incidents_dir) + entries = [existing for existing in entries if existing.id != entry.id] + entries.append(entry) + _write_index(incidents_dir, entries) + + +def _entry_from_incident( + incident: Incident, filepath: Path, incidents_dir: Path +) -> IncidentIndexEntry: + return IncidentIndexEntry( + id=incident.id, + timestamp=incident.timestamp, + project=incident.project, + severity=incident.severity, + failure_type=incident.failure_type, + platform=incident.platform, + tags=list(incident.tags), + path=str(filepath.relative_to(incidents_dir)), + issue_class=incident.issue_class, + capability_area=incident.capability_area, + lifecycle_stage=incident.lifecycle_stage, + workflow_archetype=incident.workflow_archetype, + blocked_use_class=incident.blocked_use_class, + ) + + +def _filter_index_entries( + entries: list[IncidentIndexEntry], + *, + project: str | None = None, + severity: str | None = None, + since: str | None = None, + tag: str | None = None, + issue_class: str | None = None, + capability_area: str | None = None, + lifecycle_stage: str | None = None, + workflow_archetype: str | None = None, + blocked_use_class: str | None = None, + limit: int | None = 10, +) -> list[IncidentIndexEntry]: + filtered: list[IncidentIndexEntry] = [] + for entry in _sort_index_entries(entries): + if project and entry.project != project: + continue + if severity and entry.severity != severity: + continue + if since and entry.timestamp < since: + continue + if tag and tag not in entry.tags: + continue + if issue_class and entry.issue_class != issue_class: + continue + if capability_area and entry.capability_area != capability_area: + continue + if lifecycle_stage and entry.lifecycle_stage != lifecycle_stage: + continue + if workflow_archetype and entry.workflow_archetype != workflow_archetype: + continue + if blocked_use_class and entry.blocked_use_class != blocked_use_class: + continue + filtered.append(entry) + if limit is not None and len(filtered) >= limit: + break + return filtered + + +def _sort_index_entries(entries: list[IncidentIndexEntry]) -> list[IncidentIndexEntry]: + return sorted(entries, key=lambda entry: (entry.timestamp, entry.id), reverse=True) + + +def _sorted_counter(values) -> dict[str, int]: + counter = Counter(values) + return dict(counter.most_common()) + + +def _fsync_directory(path: Path) -> None: + try: + directory_fd = os.open(path, os.O_RDONLY) + except OSError: + return + try: + os.fsync(directory_fd) + except OSError: + return + finally: + os.close(directory_fd) + + +@contextmanager +def _locked_index(incidents_dir: Path): + incidents_dir.mkdir(parents=True, exist_ok=True) + lock_path = incidents_dir / INCIDENT_INDEX_LOCK_FILENAME + with lock_path.open("a+", encoding="utf-8") as lock_file: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + try: + yield + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) diff --git a/forge_cli/mcp_server.py b/forge_cli/mcp_server.py index 1ba24c0..0c483ff 100644 --- a/forge_cli/mcp_server.py +++ b/forge_cli/mcp_server.py @@ -14,9 +14,9 @@ DuplicateIncidentError, find_incident, generate_id, - get_all_incidents, + get_incident_stats, list_incidents, - save_incident, + save_generated_incident, ) from forge_cli.models import ( CAPABILITY_AREA_VALUES, @@ -234,10 +234,10 @@ def forge_log( return str(e) try: - filepath = save_incident(incident, cfg.incidents_dir) + filepath = save_generated_incident(incident, cfg.incidents_dir) except DuplicateIncidentError as e: return str(e) - return f"Incident logged: {incident_id}\nSaved to: {filepath}" + return f"Incident logged: {incident.id}\nSaved to: {filepath}" @mcp.tool() @@ -349,62 +349,49 @@ def forge_stats( issue_class: Filter by structured issue class capability_area: Filter by capability area """ - from collections import Counter - cfg = load_config() - incidents = get_all_incidents(cfg.incidents_dir) - - if project: - incidents = [i for i in incidents if i.project == project] - if severity: - incidents = [i for i in incidents if i.severity == severity] - if issue_class: - incidents = [i for i in incidents if i.issue_class == issue_class] - if capability_area: - incidents = [i for i in incidents if i.capability_area == capability_area] + stats = get_incident_stats( + cfg.incidents_dir, + project=project or None, + severity=severity or None, + issue_class=issue_class or None, + capability_area=capability_area or None, + ) - if not incidents: + if not stats.total: return "No incidents found." - by_severity = Counter(i.severity for i in incidents) - by_type = Counter(i.failure_type for i in incidents) - by_project = Counter(i.project for i in incidents) - by_platform = Counter(i.platform for i in incidents if i.platform) - by_issue_class = Counter(i.issue_class for i in incidents if i.issue_class) - by_capability_area = Counter(i.capability_area for i in incidents if i.capability_area) - all_tags = Counter(tag for i in incidents for tag in i.tags) - - lines = [f"Total incidents: {len(incidents)}", ""] + lines = [f"Total incidents: {stats.total}", ""] lines.append("By Severity:") - for sev, count in by_severity.most_common(): + for sev, count in stats.by_severity.items(): lines.append(f" {sev}: {count}") lines.append("\nBy Project:") - for proj, count in by_project.most_common(): + for proj, count in stats.by_project.items(): lines.append(f" {proj}: {count}") lines.append("\nBy Failure Type:") - for ft, count in by_type.most_common(): + for ft, count in stats.by_type.items(): lines.append(f" {ft}: {count}") lines.append("\nBy Platform:") - for plat, count in by_platform.most_common(): + for plat, count in stats.by_platform.items(): lines.append(f" {plat}: {count}") - if by_issue_class: + if stats.by_issue_class: lines.append("\nBy Issue Class:") - for value, count in by_issue_class.most_common(): + for value, count in stats.by_issue_class.items(): lines.append(f" {value}: {count}") - if by_capability_area: + if stats.by_capability_area: lines.append("\nBy Capability Area:") - for value, count in by_capability_area.most_common(): + for value, count in stats.by_capability_area.items(): lines.append(f" {value}: {count}") - if all_tags: + if stats.by_tag: lines.append("\nTop Tags:") - for tag, count in all_tags.most_common(10): + for tag, count in list(stats.by_tag.items())[:10]: lines.append(f" {tag}: {count}") return "\n".join(lines) diff --git a/tests/test_incident_store.py b/tests/test_incident_store.py index a88ce0b..9db07c6 100644 --- a/tests/test_incident_store.py +++ b/tests/test_incident_store.py @@ -7,8 +7,10 @@ find_incident, find_incident_path, generate_id, + get_incident_stats, list_incidents, load_incident, + save_generated_incident, save_incident, ) from forge_cli.models import Incident @@ -195,6 +197,82 @@ def test_list_incidents_filters_structured_axes(tmp_incidents_dir, sample_data): assert [incident.id for incident in result] == ["2026-03-04-001"] +def test_list_incidents_uses_index_without_tree_scan( + tmp_incidents_dir, sample_data, monkeypatch +): + first = sample_data.copy() + first["project"] = "mila" + second = sample_data.copy() + second["id"] = "2026-03-04-002" + second["project"] = "aegis" + save_incident(Incident.from_dict(first), tmp_incidents_dir) + save_incident(Incident.from_dict(second), tmp_incidents_dir) + + def fail_rglob(self, pattern): + raise AssertionError("list_incidents should use the compact incident index") + + monkeypatch.setattr(Path, "rglob", fail_rglob) + + result = list_incidents(tmp_incidents_dir, project="mila") + + assert [incident.id for incident in result] == ["2026-03-04-001"] + + +def test_find_incident_suffix_uses_index_without_tree_scan( + tmp_incidents_dir, sample_data, monkeypatch +): + incident = Incident.from_dict(sample_data) + save_incident(incident, tmp_incidents_dir) + + def fail_rglob(self, pattern): + raise AssertionError("find_incident_path should use the compact incident index") + + monkeypatch.setattr(Path, "rglob", fail_rglob) + + assert find_incident_path(tmp_incidents_dir, "001") == ( + tmp_incidents_dir / "2026-03" / "2026-03-04-001.yml" + ) + + +def test_get_incident_stats_uses_index_without_loading_yaml( + tmp_incidents_dir, sample_data, monkeypatch +): + first = sample_data.copy() + first["project"] = "mila" + second = sample_data.copy() + second["id"] = "2026-03-04-002" + second["project"] = "aegis" + second["severity"] = "safety-critical" + save_incident(Incident.from_dict(first), tmp_incidents_dir) + save_incident(Incident.from_dict(second), tmp_incidents_dir) + + def fail_load(path): + raise AssertionError("stats should use the compact incident index") + + monkeypatch.setattr("forge_cli.incident_store.load_incident", fail_load) + + stats = get_incident_stats(tmp_incidents_dir) + + assert stats.total == 2 + assert stats.by_project == {"aegis": 1, "mila": 1} + assert stats.by_severity == {"functional": 1, "safety-critical": 1} + + +def test_save_generated_incident_retries_on_duplicate_id(tmp_incidents_dir, sample_data): + save_incident(Incident.from_dict(sample_data), tmp_incidents_dir) + + data = sample_data.copy() + data["id"] = "" + data["actual_behavior"] = "Second incident for the same day." + incident = Incident.from_dict(data) + + filepath = save_generated_incident(incident, tmp_incidents_dir) + + assert incident.id == "2026-03-04-002" + assert filepath.name == "2026-03-04-002.yml" + assert filepath.exists() + + def test_save_incident_rejects_duplicate_id(tmp_incidents_dir, sample_data): incident = Incident.from_dict(sample_data) save_incident(incident, tmp_incidents_dir)