Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .claude/tools/xpia/hooks/pre_tool_use.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ def main():
"""Main hook execution.

Claude Code PreToolUse hook protocol:
- Input: JSON with toolUse.name and toolUse.input
- Input: JSON on stdin with top-level keys:
tool_name, tool_input, session_id, cwd, hook_event_name, etc.
- Output: {} to allow, {"permissionDecision": "deny", "message": "..."} to block
- Exit 0 always (hook doesn't control exit code, output controls behavior)
"""
try:
# Parse input from Claude Code
# Input format: JSON with toolUse object containing name and input
input_data = {}
if len(sys.argv) > 1:
# Command line argument
Expand All @@ -220,10 +220,9 @@ def main():
if input_line:
input_data = json.loads(input_line)

# Extract tool information using correct Claude Code protocol
tool_use = input_data.get("toolUse", {})
tool_name = tool_use.get("name", "unknown")
parameters = tool_use.get("input", {})
# Claude Code sends top-level tool_name and tool_input
tool_name = input_data.get("tool_name", "")
parameters = input_data.get("tool_input", {})

# Process the validation
result = process_tool_use_request(tool_name, parameters)
Expand Down
135 changes: 135 additions & 0 deletions .claude/tools/xpia/hooks/pre_tool_use_rust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
XPIA PreToolUse Hook — Rust-backed.

Calls the xpia-defend Rust binary via subprocess to validate bash commands
before execution. This replaces the Python regex-based validation with
guaranteed-linear Rust regex matching.

Protocol (Claude Code PreToolUse):
Input: JSON on stdin with top-level keys:
tool_name, tool_input, session_id, cwd, hook_event_name, etc.
Output: {} to allow, {"permissionDecision": "deny", "message": "..."} to block
Exit: Always 0 (output controls behavior, not exit code)
"""

import json
import os
import sys
from pathlib import Path

# Find project root: Claude Code provides CWD in the hook input,
# and also sets the process CWD to the project directory.
# Walk up from CWD first, then fall back to __file__ parent chain.
def _find_project_root(cwd_override: str | None = None) -> Path | None:
starts = []
if cwd_override:
starts.append(Path(cwd_override))
starts.append(Path.cwd())
starts.append(Path(__file__).resolve())
for start in starts:
for candidate in [start] + list(start.parents):
if (candidate / "pyproject.toml").exists() and (candidate / "src").is_dir():
return candidate
return None

def _log_event(event_type: str, data: dict) -> None:
"""Log to XPIA security log."""
log_dir = Path.home() / ".claude" / "logs" / "xpia"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"rust_security_{__import__('datetime').datetime.now().strftime('%Y%m%d')}.log"
entry = {
"timestamp": __import__("datetime").datetime.now().isoformat(),
"event_type": event_type,
"backend": "rust",
"data": data,
}
try:
with open(log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass # Never fail tool execution over logging


def _allow() -> None:
print(json.dumps({}))
sys.exit(0)


def _deny(message: str) -> None:
print(json.dumps({"permissionDecision": "deny", "message": message}))
sys.exit(0)


def main():
try:
# Parse input from Claude Code
input_data = {}
if len(sys.argv) > 1:
input_data = json.loads(sys.argv[1])
else:
stdin_text = sys.stdin.read().strip()
if stdin_text:
input_data = json.loads(stdin_text)

# Claude Code sends top-level tool_name and tool_input
tool_name = input_data.get("tool_name", "")
tool_input = input_data.get("tool_input", {})

# Only validate Bash tool
if tool_name != "Bash":
_allow()

command = tool_input.get("command", "")
if not command:
_allow()

# Resolve project root using CWD from hook input
cwd_override = input_data.get("cwd")
project_root = _find_project_root(cwd_override)
if project_root is None:
_deny("🚫 XPIA: Cannot find project root — blocking (fail-closed).")

sys.path.insert(0, str(project_root / "src"))
try:
from amplihack.security.rust_xpia import validate_bash_command, is_available
except ImportError:
_deny("🚫 XPIA: Cannot import rust_xpia bridge — blocking (fail-closed).")
return # unreachable, _deny calls sys.exit

if not is_available():
_log_event("rust_unavailable", {"command": command[:100]})
_deny(
"🚫 XPIA Security: Rust defense binary (xpia-defend) not found. "
"All bash commands blocked until binary is installed."
)

# Call Rust binary via subprocess bridge
result = validate_bash_command(command)

_log_event("pre_tool_validation", {
"command": command[:100],
"is_valid": result.is_valid,
"risk_level": result.risk_level,
"threats": len(result.threats),
"session_id": input_data.get("session_id", "unknown"),
})

if result.should_block or not result.is_valid:
threat_descs = [t.get("description", "unknown") for t in result.threats[:3]]
_deny(
f"🚫 XPIA Security Block (Rust): Command blocked — {result.risk_level} risk\n"
f"Threats: {', '.join(threat_descs)}\n"
f"Recommendations: {', '.join(result.recommendations[:2])}"
)
else:
_allow()

except Exception as e:
# Fail-closed: block on ANY error
_log_event("hook_error", {"error": str(e)})
_deny(f"🚫 XPIA Security: Hook error (fail-closed): {e}")


if __name__ == "__main__":
main()
21 changes: 21 additions & 0 deletions src/amplihack/launcher/copilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,27 @@ def launch_copilot(args: list[str] | None = None, interactive: bool = True) -> i
# Register awesome-copilot marketplace extensions (best-effort, silent on failure)
register_awesome_copilot_marketplace()

# Ensure XPIA defender binary is installed (security-critical, fail-closed)
try:
from ..security.xpia_install import ensure_xpia_binary

binary_path = ensure_xpia_binary()
print(f"✓ XPIA security defender ready ({binary_path})")
except ImportError:
# Module not available — installer not yet integrated, warn but continue
import logging

logging.getLogger(__name__).warning("XPIA installer module not found")
print("⚠ XPIA defender installer not available (module missing)")
except Exception as e:
# Installation failed — warn loudly but don't block startup.
# The pre-tool-use hook will enforce fail-closed at validation time.
import logging

logging.getLogger(__name__).error("XPIA defender binary install failed: %s", e)
print(f"⚠ XPIA defender not installed: {e}")
print(" Security validation will block tool use until xpia-defend is available.")

# Prompt to re-enable power-steering if disabled (#2544)
try:
from ..power_steering.re_enable_prompt import prompt_re_enable_if_disabled
Expand Down
Loading
Loading