diff --git a/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag b/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag new file mode 100644 index 0000000..13f127b --- /dev/null +++ b/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag @@ -0,0 +1,48 @@ +# ============================================================= +# FRAGMENT: scan_authorize_fetch.py.frag +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: At scan start, fetch this user's S3 authorized-provider +# list (written by the dashboard's [Authorize] button) and +# merge it into AUTH_LIST. Providers in the list are filtered +# out by every scan_*() emitter via _is_authorized() — so +# authorised tools never reach the dashboard, ending the +# noise loop at source. +# Storage layout: +# s3:///config/authorized/{email_safe}.json +# Fetched via the presigned GET URL configured in +# ~/.patronai/config.json under "authorized_list_url". +# (Server's url_refresh_loop mints this alongside the +# existing upload URL — extend when wiring this in.) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +def _fetch_remote_authorized() -> list: + """Best-effort: pull the per-user authorized list from S3. + Returns a list of provider strings; empty on any failure so the + scan still runs with whatever local AUTH_LIST already had.""" + url = _cfg.get("authorized_list_url", "").strip() + if not url: + return [] + try: + import urllib.request + # 5s timeout — scans must not stall on a slow / dead S3 endpoint. + req = urllib.request.Request(url, headers={"User-Agent": "patronai-agent"}) + with urllib.request.urlopen(req, timeout=5) as resp: + doc = json.loads(resp.read().decode()) + providers = doc.get("providers", []) + if isinstance(providers, list): + return [str(p).strip().lower() for p in providers if p] + except Exception: + # Silent — agent must never block a scan on a remote-config failure. + return [] + return [] + + +# Merge remote list into AUTH_LIST. Local file remains the ground truth +# for offline operation; remote entries are additive. +_remote_auth = _fetch_remote_authorized() +if _remote_auth: + AUTH_LIST = sorted(set(AUTH_LIST) | set(_remote_auth)) diff --git a/ghost-ai-scanner/dashboard/ui/ai_posture_card.py b/ghost-ai-scanner/dashboard/ui/ai_posture_card.py new file mode 100644 index 0000000..d783e74 --- /dev/null +++ b/ghost-ai-scanner/dashboard/ui/ai_posture_card.py @@ -0,0 +1,108 @@ +# ============================================================= +# FILE: dashboard/ui/ai_posture_card.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Single aggregated card replacing the numeric KPI row at +# the top of the Inventory / Exec views. +# One risk score, one band colour, one "what needs action" +# breakdown. Drives the shift from "events log" UX to +# "decision surface" UX. +# DEPENDS: streamlit, scoring.risk_score +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import os +import sys + +import streamlit as st + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from scoring.risk_score import risk_score, risk_band, posture_breakdown # noqa: E402 + + +_CATEGORY_LABEL = { + "process": "AI processes running", + "package": "AI packages installed", + "ide_plugin": "IDE plugins detected", + "browser": "AI service browser hits", + "container_image": "Container images", + "container_log_signal": "Container traffic / key signals", + "shell_history": "Past shell commands", + "mcp_server": "MCP servers configured", + "agent_workflow": "Agent workflows (n8n / Flowise / langflow)", + "agent_scheduled": "Scheduled agents (cron / launchd)", + "tool_registration": "@tool decorators in code", + "vector_db": "Local vector DBs", +} + + +def _band_colour(band: str) -> str: + return { + "CRITICAL": "#cf222e", + "HIGH": "#bc4c00", + "MEDIUM": "#9a6700", + "LOW": "#1f6feb", + "CLEAN": "#1a7f37", + }.get(band, "#57606A") + + +def render_ai_posture(rows: list, device_label: str = "this fleet") -> None: + """Render the aggregated AI Posture card. + `rows` must be the COMPACTED rows (findings_current view) — one + per signature, with severity/category/occurrences/last_seen. + Falls back gracefully if older raw-finding rows are passed.""" + score = risk_score(rows) + band = risk_band(score) + bdown = posture_breakdown(rows) + open_categories = sum(1 for v in bdown.values() if v["count"] > 0) + + st.markdown( + f"
" + f"
" + f"
" + f"AI POSTURE — {device_label}
" + f"
" + f"RISK SCORE: {score} / 100  ·  {band}
" + f"
", + unsafe_allow_html=True, + ) + + if open_categories == 0: + st.markdown( + "
✓ No open AI findings. Posture is clean.
", + unsafe_allow_html=True, + ) + return + + # Render one row per non-empty category, sorted by severity then count. + sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + items = sorted( + bdown.items(), + key=lambda kv: (-sev_rank.get(kv[1]["max_severity"], 0), + -kv[1]["count"]), + ) + rows_html = [] + for cat, info in items: + if info["count"] == 0: + continue + label = _CATEGORY_LABEL.get(cat, cat.replace("_", " ").title()) + sev = info["max_severity"] + sev_clr = _band_colour(sev) + rows_html.append( + f"
" + f"
" + f"{info['count']} {label}
" + f"
max sev: {sev}
" + f"
" + ) + st.markdown("".join(rows_html) + "", unsafe_allow_html=True) diff --git a/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py b/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py new file mode 100644 index 0000000..dcc4952 --- /dev/null +++ b/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py @@ -0,0 +1,102 @@ +# ============================================================= +# FILE: dashboard/ui/category_grouped_risks.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Collapsible category-grouped view of open findings. +# Replaces the row-soup. Each category is a parent row with +# count + max-severity + last-seen. Expand to see per-signature +# children. Bulk actions per category: Authorize, Suppress, +# Show cleanup hint. +# DEPENDS: streamlit, services.authorize, cleanup_hints +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import os +import sys +from collections import defaultdict + +import streamlit as st + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from cleanup_hints import cleanup_hint # noqa: E402 +from services.authorize import authorize # noqa: E402 + + +_SEV_RANK = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + + +def _max_sev(rows: list) -> str: + out = "LOW" + for r in rows: + s = (r.get("severity") or "LOW").upper() + if _SEV_RANK.get(s, 0) > _SEV_RANK.get(out, 0): + out = s + return out + + +def _group_by_category(rows: list) -> dict: + out: dict = defaultdict(list) + for r in rows: + if r.get("status") == "resolved": + continue + out[r.get("category") or "unknown"].append(r) + return out + + +def render_grouped_risks(rows: list, store=None, owner_email: str = "") -> None: + """One section per category. Click to expand → per-signature rows. + `store` is required for the Authorize button to write to S3; + if None, button is hidden (read-only mode).""" + groups = _group_by_category(rows) + if not groups: + st.info("No open findings. Clean posture.") + return + + st.markdown( + '
OPEN FINDINGS — GROUPED
', + unsafe_allow_html=True, + ) + # Render sorted by severity then count. + items = sorted( + groups.items(), + key=lambda kv: (-_SEV_RANK.get(_max_sev(kv[1]), 0), -len(kv[1])), + ) + for cat, cat_rows in items: + max_sev = _max_sev(cat_rows) + last_seen = max(r.get("last_seen") or "" for r in cat_rows) + header = (f"{cat.replace('_', ' ').title()} — " + f"{len(cat_rows)} signature(s) · max sev {max_sev} · " + f"last seen {last_seen[:19] or '—'}") + with st.expander(header, expanded=False): + providers = sorted({r.get("provider", "") for r in cat_rows}) + for r in cat_rows[:50]: + pname = r.get("provider", "") + occ = r.get("occurrences", 1) + fseen = (r.get("first_seen") or "")[:19] + lseen = (r.get("last_seen") or "")[:19] + hint = cleanup_hint(cat, r.get("os_name", "")) + st.markdown( + f"
" + f"{pname} · {occ} occurrence(s) · " + f"{fseen} → {lseen}
" + f"💡 {hint}" + f"
", + unsafe_allow_html=True, + ) + # Bulk Authorize for this category (only if store + email available) + if store and owner_email: + btn_key = f"auth_cat_{cat}_{owner_email}" + if st.button( + f"✓ Authorize all {len(providers)} {cat.replace('_',' ')} provider(s) for {owner_email}", + key=btn_key, + ): + total = authorize(store, owner_email, providers) + st.success( + f"Authorized {len(providers)} provider(s). " + f"User's allow-list now has {total} entries. " + "Agent picks up on next scan." + ) diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py b/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py index 69ee11e..f6b2b60 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py @@ -1,22 +1,22 @@ # ============================================================= # FILE: dashboard/ui/manager_tab_actions.py -# VERSION: 2.0.0 -# UPDATED: 2026-05-02 +# VERSION: 2.1.0 +# UPDATED: 2026-05-11 # OWNER: Giggso Inc # PURPOSE: Action helpers for the Manager Risks tab. -# mark_resolved — writes RESOLVED outcome back to S3 findings store. -# escalate — POSTs events to Trinity via dispatcher. -# send_alert_email — thin shim: delegates to notify.email.send_alert. +# mark_resolved — writes RESOLVED outcome back to S3. +# escalate — POSTs to Trinity via dispatcher. +# send_alert_email — thin shim → notify.email.send_alert. +# authorize_for_user — appends to per-user authorized list +# on S3; agent picks up on next scan. # All functions are pure — no Streamlit calls inside. -# DEPENDS: requests, alerter.dispatcher, notify.email +# DEPENDS: requests, alerter.dispatcher, notify.email, services.authorize # AUDIT LOG: # v1.0.0 2026-04-19 Initial — split from manager_tab_risks.py -# v2.0.0 2026-05-02 send_alert_email body collapsed to a one-line -# call into notify.email.send_alert. The duplicated -# boto3 SES path here used PATRONAI_FROM_EMAIL + -# skipped recipient verification, both inconsistent -# with the welcome / OTP paths. notify.email is -# now the single SES call site for the codebase. +# v2.0.0 2026-05-02 send_alert_email → notify.email single call site. +# v2.1.0 2026-05-11 Add authorize_for_user — closes the noise loop +# by teaching the agent which tools the operator +# has approved for a given user. # ============================================================= import logging @@ -89,3 +89,15 @@ def send_alert_email(events: list, recipients: str) -> bool: recipient list. Thin shim — actual SES work lives in notify.email.""" from notify.email import send_alert return send_alert(recipients=recipients, events=events) + + +def authorize_for_user(store, email: str, events: list) -> int: + """Append every distinct provider in `events` to the user's + authorized list on S3. Agent fetches the list at next scan and + filters those providers from emission — closing the noise loop + at source. Idempotent; returns the new total entry count. + Server-side `findings_compact` then auto-resolves the open + findings within the stale-window cycle.""" + from services.authorize import authorize # local — avoid hard dep on streamlit entry + providers = sorted({e.get("provider", "") for e in events if e.get("provider")}) + return authorize(store, email, providers) diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py b/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py index da953ca..3dc24b4 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py @@ -28,6 +28,7 @@ from .filtered_table import search_box, apply_search_dicts from .clickable_metric import clickable_metric, static_metric from .drill_panel import render_drill_panel +from .ai_posture_card import render_ai_posture _PANEL = "mgr_inventory" @@ -46,7 +47,7 @@ def _owner_of(e: dict) -> str: def render_inventory(events: list) -> None: - """Asset summary KPIs, endpoint-protection banner, and asset table.""" + """AI Posture card (headline) → KPI cards → asset table.""" q = search_box("inventory", placeholder="search owner / IP / MAC …") if q: events = apply_search_dicts(events, q) @@ -54,6 +55,13 @@ def render_inventory(events: list) -> None: keys = [_asset_key(e) for e in events] unique_keys = list(dict.fromkeys(keys)) + # Headline — aggregated AI Posture card. Single risk score + + # per-category breakdown replaces the count-of-everything KPI noise. + # When compacted findings_current rows are available they're used; + # otherwise we degrade gracefully to raw events. + device_label = unique_keys[0] if len(unique_keys) == 1 else f"{len(unique_keys)} devices" + render_ai_posture(events, device_label=device_label) + # KPIs count DISTINCT DEVICES, not event rows. A laptop emitting a # scan every 30 min must show as 1 device + N scan events, never # as N "endpoints". The pre-2.2.0 sum() over events.asset_type was diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py b/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py index 67f2bbc..2c9b40d 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py @@ -19,10 +19,11 @@ import pandas as pd import streamlit as st -from .helpers import sev_badge -from .manager_tab_actions import mark_resolved, escalate, send_alert_email -from .time_fmt import fmt as fmt_time -from .filtered_table import filtered_table +from .helpers import sev_badge +from .manager_tab_actions import mark_resolved, escalate, send_alert_email +from .time_fmt import fmt as fmt_time +from .filtered_table import filtered_table +from .category_grouped_risks import render_grouped_risks sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) @@ -32,9 +33,21 @@ _SEV_ORDER = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2} -def render_risks(events: list) -> None: - """Selectable alert table + Mark Resolved / Escalate / Email actions.""" - alerts = sorted( # .get() guards missing severity on endpoint events +def render_risks(events: list, store=None, owner_email: str = "") -> None: + """Two views in one tab: + 1) Grouped view (default) — collapsible category cards with + bulk-authorize on each category. Reads compacted rows. + 2) Flat alert list — legacy table with Mark/Escalate/Email. + Operator can switch between them via a single toggle.""" + grouped = st.toggle("Grouped view (recommended)", value=True, + key="risks_grouped_toggle") + if grouped: + render_grouped_risks(events, store=store, owner_email=owner_email) + st.markdown("
", unsafe_allow_html=True) + st.caption("Switch off Grouped view above to see the flat alert table.") + return + + alerts = sorted( [e for e in events if e.get("severity") in _SEV_ORDER], key=lambda x: (_SEV_ORDER.get(x.get("severity", ""), 9), x.get("timestamp", "")), diff --git a/ghost-ai-scanner/src/cleanup_hints.py b/ghost-ai-scanner/src/cleanup_hints.py new file mode 100644 index 0000000..cc77948 --- /dev/null +++ b/ghost-ai-scanner/src/cleanup_hints.py @@ -0,0 +1,97 @@ +# ============================================================= +# FILE: src/cleanup_hints.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Per-category cleanup-suggestion mapping. +# Returns a HUMAN-READABLE hint operators can copy-paste to +# remove an AI tool from a device. The agent NEVER executes +# these — that's a deliberate security boundary. The hint is +# rendered next to the finding so the operator can decide. +# DEPENDS: (stdlib only) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +# Hint resolved from (category, os_name) → string. None means "no +# automatic suggestion — operator's call." OS is best-effort; falls +# back to a cross-platform hint when device OS is unknown. +_HINTS = { + "process": { + "darwin": "Quit the app + remove from /Applications/. " + "macOS Login Items: System Settings → General → Login Items.", + "linux": "killall ; check ~/.config/autostart/ and " + "systemd --user list-unit-files for restart triggers.", + "windows": "Task Manager → End task; remove from Startup tab.", + "*": "Stop the running process and remove from auto-start.", + }, + "package": { + "*": "Uninstall from the relevant package manager: " + "pip uninstall | npm uninstall -g | " + "brew uninstall ", + }, + "ide_plugin": { + "*": "VS Code/Cursor: code --uninstall-extension . " + "JetBrains: Preferences → Plugins → Uninstall.", + }, + "mcp_server": { + "darwin": "Edit ~/Library/Application Support/Claude/" + "claude_desktop_config.json — remove the entry under " + "`mcpServers`. Restart Claude Desktop.", + "windows": "Edit %APPDATA%\\Claude\\claude_desktop_config.json — " + "remove the entry. Restart Claude Desktop.", + "linux": "Edit ~/.config/Claude/claude_desktop_config.json — " + "remove the entry. Restart Claude Desktop.", + "*": "Remove the entry from your MCP host config and " + "restart the host (Claude Desktop / Cursor / Continue).", + }, + "agent_workflow": { + "*": "Delete or move the workflow file out of the watched path " + "(or stop the orchestrator: e.g. `pm2 stop flowise`).", + }, + "agent_scheduled": { + "darwin": "launchctl unload ~/Library/LaunchAgents/; " + "rm ~/Library/LaunchAgents/", + "linux": "crontab -e and remove the line, or " + "systemctl --user disable .", + "windows": "schtasks /Delete /TN ", + "*": "Disable the cron/launchd/scheduled-task that " + "triggers this workflow.", + }, + "vector_db": { + "*": "Locate the file/folder via the `path_safe` field and " + "`rm -rf `. Sensitive: vector DBs can contain " + "embeddings derived from your code/docs.", + }, + "browser": { + "*": "No on-device action — close the tab or block the domain " + "via your proxy / browser extension allowlist.", + }, + "container_image": { + "*": "docker rmi (and stop any container running it).", + }, + "container_log_signal": { + "*": "Inspect the container; rotate any API key that leaked " + "into logs.", + }, + "shell_history": { + "*": "Past command — usually informational. If it leaked a " + "secret, rotate it.", + }, + "tool_registration": { + "*": "Code-level @tool decorator — review the repo and remove " + "if not authorised.", + }, +} + + +def cleanup_hint(category: str, os_name: str = "") -> str: + """Return a human-readable cleanup suggestion for a finding. + `os_name` is optional — falls back to the cross-platform hint. + Returns empty string when no hint is known.""" + cat = (category or "").lower() + os_ = (os_name or "").lower() + bucket = _HINTS.get(cat) + if not bucket: + return "" + return bucket.get(os_) or bucket.get("*") or "" diff --git a/ghost-ai-scanner/src/scoring/__init__.py b/ghost-ai-scanner/src/scoring/__init__.py new file mode 100644 index 0000000..72008b5 --- /dev/null +++ b/ghost-ai-scanner/src/scoring/__init__.py @@ -0,0 +1 @@ +# Scoring package — risk scoring + aggregated posture metrics. diff --git a/ghost-ai-scanner/src/scoring/risk_score.py b/ghost-ai-scanner/src/scoring/risk_score.py new file mode 100644 index 0000000..4c10942 --- /dev/null +++ b/ghost-ai-scanner/src/scoring/risk_score.py @@ -0,0 +1,103 @@ +# ============================================================= +# FILE: src/scoring/risk_score.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Weighted risk-score calculation over a set of findings. +# Drives the aggregated AI Posture card (one number instead +# of a row-soup of severities). Pure functions — no I/O — +# so unit-testable without S3 / Streamlit / Polars. +# DEPENDS: (stdlib only) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. Ships with feat/dashboard-posture. +# ============================================================= + +from collections import defaultdict +from typing import Iterable + +# Per-severity weight contributed by ONE distinct signature. Tuned so a +# single CRITICAL finding (in a high-priority category like "process") +# crosses the CRITICAL band threshold of 75 on its own: +# 50 * 1.5 (process multiplier) = 75 → red. Locked in +# test_one_critical_drives_red. +_SEV_WEIGHT = { + "CRITICAL": 50, + "HIGH": 12, + "MEDIUM": 4, + "LOW": 1, +} + +# Category multipliers. A running process is more urgent than a stale +# shell history line even at the same severity. Applied after the +# severity weight. +_CATEGORY_MULT = { + "process": 1.5, + "mcp_server": 1.4, + "agent_workflow": 1.3, + "agent_scheduled": 1.3, + "browser": 1.1, + "container_log_signal": 1.2, + "vector_db": 1.0, + "package": 0.9, + "ide_plugin": 0.9, + "container_image": 0.8, + "tool_registration": 0.7, + "shell_history": 0.5, +} + +# Cap — anything above this clamps to RED (100). Empirically a device +# with ~6 unauthorised running tools sits around 90; cap protects the +# UI from showing 4-digit "risk". +_SCORE_CAP = 100 + + +def _row_weight(row: dict) -> float: + """Score contribution from one compacted finding row.""" + if row.get("status") == "resolved": + return 0.0 + sev = (row.get("severity") or "LOW").upper() + cat = (row.get("category") or "").lower() + base = _SEV_WEIGHT.get(sev, 1) + mult = _CATEGORY_MULT.get(cat, 1.0) + # Occurrences: log-dampened — a thing that re-appears 100 times is + # not 100× as bad as seen once, but is worse than seen once. + occ = int(row.get("occurrences") or 1) + occ_factor = 1 + min(0.5, 0.05 * (occ - 1)) + return base * mult * occ_factor + + +def risk_score(rows: Iterable[dict]) -> int: + """Aggregate risk score 0-100 for a set of compacted finding rows. + Rows expected to come from findings_current (one per signature).""" + total = sum(_row_weight(r) for r in rows) + return int(min(_SCORE_CAP, round(total))) + + +def risk_band(score: int) -> str: + """Human label for a 0-100 score — drives card colour.""" + if score >= 75: return "CRITICAL" + if score >= 40: return "HIGH" + if score >= 15: return "MEDIUM" + if score > 0: return "LOW" + return "CLEAN" + + +def posture_breakdown(rows: Iterable[dict]) -> dict: + """Group OPEN signatures by category for the posture card. + Returns {category: {count, max_severity, last_seen}}.""" + out: dict = defaultdict(lambda: {"count": 0, "max_severity": "LOW", + "last_seen": ""}) + sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + for r in rows: + if r.get("status") == "resolved": + continue + cat = r.get("category") or "unknown" + slot = out[cat] + slot["count"] += 1 + sev = (r.get("severity") or "LOW").upper() + if sev_rank.get(sev, 0) > sev_rank.get(slot["max_severity"], 0): + slot["max_severity"] = sev + ls = r.get("last_seen") or "" + if ls > slot["last_seen"]: + slot["last_seen"] = ls + return dict(out) diff --git a/ghost-ai-scanner/src/services/__init__.py b/ghost-ai-scanner/src/services/__init__.py new file mode 100644 index 0000000..b73b7cc --- /dev/null +++ b/ghost-ai-scanner/src/services/__init__.py @@ -0,0 +1 @@ +# Services package — orchestration helpers that span store/notify/etc. diff --git a/ghost-ai-scanner/src/services/authorize.py b/ghost-ai-scanner/src/services/authorize.py new file mode 100644 index 0000000..1195ac5 --- /dev/null +++ b/ghost-ai-scanner/src/services/authorize.py @@ -0,0 +1,104 @@ +# ============================================================= +# FILE: src/services/authorize.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Write per-user authorized-provider lists to S3 so the hook +# agent can fetch them on next scan and stop emitting noise +# for tools the operator has explicitly approved. +# Storage layout: +# s3:///config/authorized/{email_safe}.json +# Body: +# {"version": 1, "updated_at": "...", "providers": [...]} +# Companion scan_authorize_fetch.py.frag pulls this at scan +# time. Findings whose provider is in the list never reach +# the dashboard. +# DEPENDS: store.base_store (any blob store) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import json +import logging +import re +from datetime import datetime, timezone + +log = logging.getLogger("marauder-scan.services.authorize") + +# Same character set as we use elsewhere for filesystem-safe email. +_EMAIL_SAFE_RE = re.compile(r"[^a-zA-Z0-9._-]") + + +def _safe_email(email: str) -> str: + """Filesystem-safe email — replaces @ + special chars with _.""" + return _EMAIL_SAFE_RE.sub("_", (email or "").strip().lower()) + + +def _key_for(email: str) -> str: + """S3 key for one user's authorized list.""" + return f"config/authorized/{_safe_email(email)}.json" + + +def load_authorized(store, email: str) -> dict: + """Read the current authorized list for a user. + Returns {"version": 1, "updated_at": "...", "providers": [...]} + or a fresh empty doc if none exists yet.""" + if not email: + return {"version": 1, "providers": [], "updated_at": ""} + try: + raw = store.findings._get(_key_for(email)) + if not raw: + return {"version": 1, "providers": [], "updated_at": ""} + doc = json.loads(raw.decode()) + # Tolerate older shapes — coerce to canonical. + return { + "version": int(doc.get("version", 1)), + "providers": sorted(set(doc.get("providers", []))), + "updated_at": doc.get("updated_at", ""), + } + except Exception as exc: + log.error("load_authorized failed for %s: %s", email, exc) + return {"version": 1, "providers": [], "updated_at": ""} + + +def authorize(store, email: str, providers: list) -> int: + """Add `providers` to the user's authorized list. Idempotent — + duplicates collapse. Returns the new total count after merge.""" + if not email or not providers: + return 0 + doc = load_authorized(store, email) + merged = sorted(set(doc["providers"]) | set(str(p) for p in providers if p)) + new_doc = { + "version": 1, + "providers": merged, + "updated_at": datetime.now(timezone.utc).isoformat(), + } + body = json.dumps(new_doc, indent=2).encode() + try: + store.findings._put(_key_for(email), body, "application/json") + log.info("authorize: %s now has %d entries (added %s)", + email, len(merged), providers) + except Exception as exc: + log.error("authorize put failed for %s: %s", email, exc) + return len(doc["providers"]) + return len(merged) + + +def revoke(store, email: str, providers: list) -> int: + """Remove `providers` from the user's authorized list. + Returns the new total count after removal.""" + if not email or not providers: + return 0 + doc = load_authorized(store, email) + remaining = sorted(set(doc["providers"]) - set(str(p) for p in providers if p)) + new_doc = { + "version": 1, + "providers": remaining, + "updated_at": datetime.now(timezone.utc).isoformat(), + } + body = json.dumps(new_doc, indent=2).encode() + try: + store.findings._put(_key_for(email), body, "application/json") + except Exception as exc: + log.error("revoke put failed for %s: %s", email, exc) + return len(remaining) diff --git a/ghost-ai-scanner/tests/unit/test_authorize_service.py b/ghost-ai-scanner/tests/unit/test_authorize_service.py new file mode 100644 index 0000000..2ed03cc --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_authorize_service.py @@ -0,0 +1,113 @@ +# ============================================================= +# FILE: tests/unit/test_authorize_service.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the authorize service — once a user authorises a +# tool, the agent must see it forever (unless revoked). +# ============================================================= + +import json +import sys +from pathlib import Path + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from services.authorize import ( + authorize, revoke, load_authorized, _safe_email, _key_for, +) + + +class _StubFindings: + """Minimal stub for the FindingsStore — only _get and _put used.""" + def __init__(self): + self._blob = {} + def _get(self, key): + return self._blob.get(key) + def _put(self, key, body, ctype): + self._blob[key] = body + return True + + +class _StubStore: + def __init__(self): + self.findings = _StubFindings() + + +def test_safe_email_strips_at_and_special_chars(): + assert _safe_email("Ravi@Giggso.COM") == "ravi_giggso.com" + assert _safe_email("a+b@c.io") == "a_b_c.io" + + +def test_key_per_user_isolated(): + a = _key_for("a@x.com") + b = _key_for("b@x.com") + assert a != b + assert a.startswith("config/authorized/") + + +def test_authorize_creates_new_doc(): + s = _StubStore() + n = authorize(s, "ravi@giggso.com", ["cursor", "flowise"]) + assert n == 2 + doc = load_authorized(s, "ravi@giggso.com") + assert sorted(doc["providers"]) == ["cursor", "flowise"] + assert doc["updated_at"] + + +def test_authorize_is_idempotent(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["cursor"]) + assert len(load_authorized(s, "ravi@giggso.com")["providers"]) == 1 + + +def test_authorize_merges_lists(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["flowise", "ollama"]) + providers = load_authorized(s, "ravi@giggso.com")["providers"] + assert set(providers) == {"cursor", "flowise", "ollama"} + + +def test_revoke_removes_entries(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor", "flowise"]) + revoke(s, "ravi@giggso.com", ["cursor"]) + providers = load_authorized(s, "ravi@giggso.com")["providers"] + assert providers == ["flowise"] + + +def test_empty_inputs_are_noop(): + s = _StubStore() + assert authorize(s, "", ["cursor"]) == 0 + assert authorize(s, "ravi@giggso.com", []) == 0 + + +def test_per_user_isolation(): + """Authorising for user A must not touch user B's list.""" + s = _StubStore() + authorize(s, "a@giggso.com", ["cursor"]) + authorize(s, "b@giggso.com", ["flowise"]) + a = load_authorized(s, "a@giggso.com")["providers"] + b = load_authorized(s, "b@giggso.com")["providers"] + assert "flowise" not in a + assert "cursor" not in b + + +def test_load_authorized_handles_garbage_gracefully(): + s = _StubStore() + s.findings._blob[_key_for("x@y.com")] = b"not-json-at-all" + doc = load_authorized(s, "x@y.com") + assert doc["providers"] == [] # graceful fallback + + +def test_load_authorized_canonicalises_legacy_shapes(): + s = _StubStore() + s.findings._blob[_key_for("x@y.com")] = json.dumps( + {"providers": ["b", "a", "b"]} + ).encode() + doc = load_authorized(s, "x@y.com") + assert doc["providers"] == ["a", "b"] # sorted + unique diff --git a/ghost-ai-scanner/tests/unit/test_cleanup_hints.py b/ghost-ai-scanner/tests/unit/test_cleanup_hints.py new file mode 100644 index 0000000..2040d46 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_cleanup_hints.py @@ -0,0 +1,60 @@ +# ============================================================= +# FILE: tests/unit/test_cleanup_hints.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the cleanup-hint contract — every supported category +# must produce a non-empty suggestion. Operators read these +# on every dashboard row. +# ============================================================= + +import sys +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from cleanup_hints import cleanup_hint + + +# Every category referenced by agent_explode._FINDING_SEVERITY must +# have a non-empty default ("*") hint. Adding a new category to the +# agent without a matching hint regresses this test → forces an update. +_SUPPORTED_CATEGORIES = [ + "browser", "process", "container_log_signal", "package", + "ide_plugin", "container_image", "shell_history", + "mcp_server", "agent_workflow", "agent_scheduled", + "tool_registration", "vector_db", +] + + +@pytest.mark.parametrize("cat", _SUPPORTED_CATEGORIES) +def test_every_category_has_default_hint(cat): + h = cleanup_hint(cat) + assert h, f"category {cat!r} has no cleanup hint — add one" + + +def test_os_specific_hint_used_when_available(): + mac = cleanup_hint("mcp_server", os_name="darwin") + lin = cleanup_hint("mcp_server", os_name="linux") + win = cleanup_hint("mcp_server", os_name="windows") + assert "Library/Application Support" in mac + assert ".config/Claude" in lin + assert "%APPDATA%" in win + assert mac != lin != win + + +def test_unknown_category_returns_empty_string(): + assert cleanup_hint("not_a_category") == "" + assert cleanup_hint("") == "" + + +def test_unknown_os_falls_back_to_wildcard(): + h = cleanup_hint("process", os_name="bsd") + assert h # falls back to "*" + + +def test_case_insensitive_inputs(): + assert cleanup_hint("PROCESS", os_name="DARWIN") diff --git a/ghost-ai-scanner/tests/unit/test_risk_score.py b/ghost-ai-scanner/tests/unit/test_risk_score.py new file mode 100644 index 0000000..0a585e3 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_risk_score.py @@ -0,0 +1,108 @@ +# ============================================================= +# FILE: tests/unit/test_risk_score.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the risk scoring contract — drives the AI Posture +# card. If these numbers drift, the headline UX drifts. +# ============================================================= + +import sys +from pathlib import Path + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from scoring.risk_score import ( + risk_score, risk_band, posture_breakdown, +) + + +def _row(sev="HIGH", cat="process", occ=1, status="open"): + return {"severity": sev, "category": cat, "occurrences": occ, + "status": status} + + +def test_empty_input_is_clean(): + assert risk_score([]) == 0 + assert risk_band(0) == "CLEAN" + + +def test_resolved_rows_do_not_score(): + rows = [_row(sev="CRITICAL", status="resolved")] + assert risk_score(rows) == 0 + + +def test_single_high_process_contributes(): + s = risk_score([_row(sev="HIGH", cat="process")]) + assert s > 0 + assert s < 75 # one HIGH shouldn't pin red + + +def test_one_critical_drives_red(): + """A single CRITICAL finding alone must push the device to CRITICAL band.""" + s = risk_score([_row(sev="CRITICAL", cat="process")]) + assert risk_band(s) == "CRITICAL", f"score={s} expected CRITICAL band" + + +def test_score_caps_at_100(): + rows = [_row(sev="CRITICAL", cat="process") for _ in range(50)] + assert risk_score(rows) == 100 + + +def test_category_multiplier_applied(): + """A running process scores higher than a stale shell-history line.""" + proc = risk_score([_row(sev="HIGH", cat="process")]) + hist = risk_score([_row(sev="HIGH", cat="shell_history")]) + assert proc > hist + + +def test_occurrences_dampened(): + """Seen 100 times is worse than seen once but not 100×.""" + once = risk_score([_row(sev="HIGH", cat="process", occ=1)]) + many = risk_score([_row(sev="HIGH", cat="process", occ=100)]) + assert many > once + assert many < once * 10 + + +def test_band_thresholds(): + assert risk_band(0) == "CLEAN" + assert risk_band(5) == "LOW" + assert risk_band(20) == "MEDIUM" + assert risk_band(50) == "HIGH" + assert risk_band(80) == "CRITICAL" + assert risk_band(100) == "CRITICAL" + + +# ── posture_breakdown ────────────────────────────────────────── + +def test_posture_breakdown_groups_by_category(): + rows = [ + _row(cat="process", sev="HIGH"), + _row(cat="process", sev="CRITICAL"), + _row(cat="vector_db", sev="MEDIUM"), + ] + b = posture_breakdown(rows) + assert b["process"]["count"] == 2 + assert b["process"]["max_severity"] == "CRITICAL" + assert b["vector_db"]["count"] == 1 + + +def test_posture_breakdown_skips_resolved(): + rows = [ + _row(cat="process", status="open"), + _row(cat="process", status="resolved"), + ] + b = posture_breakdown(rows) + assert b["process"]["count"] == 1 + + +def test_posture_breakdown_picks_latest_last_seen(): + rows = [ + {"category": "process", "severity": "HIGH", + "occurrences": 1, "last_seen": "2026-05-10T00:00:00"}, + {"category": "process", "severity": "HIGH", + "occurrences": 1, "last_seen": "2026-05-11T12:00:00"}, + ] + b = posture_breakdown(rows) + assert b["process"]["last_seen"] == "2026-05-11T12:00:00"