diff --git a/scripts/test_pf_core_bundle.sh b/scripts/test_pf_core_bundle.sh new file mode 100644 index 0000000..a62d7af --- /dev/null +++ b/scripts/test_pf_core_bundle.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +# Unit tests for PF-Core bundle verification (Phase 7 PR-1). +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +PF_CORE_ROOT="${PF_CORE_ROOT:-$ROOT/../provability-fabric-core}" +OUT="${TMPDIR:-/tmp}/pip-pf-core-bundle-test-$$" +OBS="$PF_CORE_ROOT/pf-core/examples/valid/mcp_sidecar_observation.json" +VERSION="$(cat "$PF_CORE_ROOT/pf-core/VERSION" 2>/dev/null || echo "0.6.0")" + +cleanup() { rm -rf "$OUT"; } +trap cleanup EXIT + +if [ ! -f "$OBS" ]; then + echo "SKIP: provability-fabric-core not found at $PF_CORE_ROOT" + exit 0 +fi + +PYTHON="${PYTHON:-python3}" +export PYTHONPATH="$PF_CORE_ROOT/pf-core/validator" +$PYTHON -m pip install -q -e "$PF_CORE_ROOT/pf-core/validator" jsonschema referencing 2>/dev/null || true + +$PYTHON -m pf_core.cli core emit-artifacts \ + --schemas "$PF_CORE_ROOT/pf-core/schemas" \ + --file "$OBS" \ + --out-dir "$OUT" + +$PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" \ + --bundle-dir "$OUT" \ + --pf-core-version "$VERSION" \ + --schemas "$PF_CORE_ROOT/pf-core/schemas" + +lake exe verify_bundle -- --bundle-dir "$OUT" --pf-core-version "$VERSION" + +# Tampered trace_hash must fail +cp -r "$OUT" "${OUT}-bad" +$PYTHON - <<'PY' "${OUT}-bad/trace.json" +import json, sys +path = sys.argv[1] +data = json.load(open(path, encoding="utf-8")) +data["trace_hash"] = "f" * 64 +json.dump(data, open(path, "w", encoding="utf-8"), indent=2, sort_keys=True) +PY +if $PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" --bundle-dir "${OUT}-bad" --pf-core-version "$VERSION"; then + echo "expected tampered bundle to fail" + exit 1 +fi + +# unsafe certificate must fail +cp -r "$OUT" "${OUT}-unsafe" +$PYTHON - <<'PY' "${OUT}-unsafe/certificate.json" +import json, sys +path = sys.argv[1] +data = json.load(open(path, encoding="utf-8")) +data["safe"] = False +json.dump(data, open(path, "w", encoding="utf-8"), indent=2, sort_keys=True) +PY +if $PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" --bundle-dir "${OUT}-unsafe" --pf-core-version "$VERSION"; then + echo "expected unsafe certificate to fail" + exit 1 +fi + +echo "OK: PF-Core bundle verification tests passed" diff --git a/scripts/verify_pf_core_bundle.py b/scripts/verify_pf_core_bundle.py new file mode 100644 index 0000000..abfe209 --- /dev/null +++ b/scripts/verify_pf_core_bundle.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""Verify PF-Core emit-artifacts five-file bundle layout (Phase 7 PR-1).""" + +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any, Mapping + +REQUIRED_FILES = ( + "runtime_observation.json", + "event.json", + "trace.json", + "certificate.json", + "audit.jsonl", +) + +GENESIS = "0" * 64 + + +def _load_json(path: Path) -> dict[str, Any]: + return json.loads(path.read_text(encoding="utf-8")) + + +def _normalize_hash(value: str) -> str: + text = str(value).strip().lower() + if text.startswith("sha256:"): + text = text[7:] + if len(text) != 64 or any(c not in "0123456789abcdef" for c in text): + raise ValueError(f"invalid hash: {value!r}") + return text + + +def _verify_structure(bundle_dir: Path) -> list[str]: + errors: list[str] = [] + for name in REQUIRED_FILES: + if not (bundle_dir / name).is_file(): + errors.append(f"missing required file: {name}") + if errors: + return errors + + cert = _load_json(bundle_dir / "certificate.json") + trace = _load_json(bundle_dir / "trace.json") + event = _load_json(bundle_dir / "event.json") + obs = _load_json(bundle_dir / "runtime_observation.json") + + if cert.get("safe") is not True: + errors.append("certificate.safe must be true for passing bundles") + + cert_trace = _normalize_hash(str(cert.get("trace_hash", ""))) + trace_hash = _normalize_hash(str(trace.get("trace_hash", ""))) + if cert_trace != trace_hash: + errors.append( + f"certificate.trace_hash ({cert_trace}) != trace.trace_hash ({trace_hash})" + ) + + events = trace.get("events") + if not isinstance(events, list) or not events: + errors.append("trace.events must be a non-empty list") + return errors + + prev = GENESIS + for idx, ev in enumerate(events): + if not isinstance(ev, Mapping): + errors.append(f"events[{idx}] must be an object") + continue + prev_field = _normalize_hash(str(ev.get("previous_event_hash", ""))) + if prev_field != prev: + errors.append( + f"events[{idx}].previous_event_hash mismatch: expected {prev}, got {prev_field}" + ) + event_hash = _normalize_hash(str(ev.get("event_hash", ""))) + prev = event_hash + + if events: + top_event_hash = _normalize_hash(str(event.get("event_hash", ""))) + last_trace_hash = _normalize_hash(str(events[-1].get("event_hash", ""))) + if top_event_hash != last_trace_hash: + errors.append("event.json event_hash must match last trace event") + + audit_lines = [ + line.strip() + for line in (bundle_dir / "audit.jsonl").read_text(encoding="utf-8").splitlines() + if line.strip() + ] + if not audit_lines: + errors.append("audit.jsonl must contain at least one line") + else: + audit = json.loads(audit_lines[0]) + audit_trace = _normalize_hash(str(audit.get("trace_hash", ""))) + if audit_trace != trace_hash: + errors.append("audit.jsonl trace_hash must match trace.trace_hash") + audit_event = _normalize_hash(str(audit.get("event_hash", ""))) + event_hash = _normalize_hash(str(event.get("event_hash", ""))) + if audit_event != event_hash: + errors.append("audit.jsonl event_hash must match event.json") + + if obs.get("schema_version") != "pf-core.runtime_observation.v1": + errors.append("runtime_observation.json schema_version must be v1") + + return errors + + +def _verify_with_pf_core(bundle_dir: Path, schemas_dir: Path | None) -> list[str]: + try: + from pf_core.hash_chain import validate_trace_hashes + from pf_core.schemas import load_registry, validate_object + except ImportError: + return [] + + errors: list[str] = [] + if schemas_dir is None: + return errors + + registry = load_registry(schemas_dir) + for name in ("runtime_observation.json", "event.json", "trace.json", "certificate.json"): + obj = _load_json(bundle_dir / name) + try: + validate_object(obj, registry) + except Exception as exc: # noqa: BLE001 - aggregate validation errors + errors.append(f"{name}: schema validation failed: {exc}") + + trace = _load_json(bundle_dir / "trace.json") + try: + validate_trace_hashes(trace) + except Exception as exc: # noqa: BLE001 + errors.append(f"trace hash chain recomputation failed: {exc}") + + return errors + + +def verify_bundle( + bundle_dir: Path, + *, + pf_core_version: str | None = None, + schemas_dir: Path | None = None, +) -> tuple[bool, list[str]]: + errors = _verify_structure(bundle_dir) + if pf_core_version: + version_file = bundle_dir / "VERSION" + if version_file.is_file(): + if version_file.read_text(encoding="utf-8").strip() != pf_core_version: + errors.append(f"bundle VERSION mismatch (expected {pf_core_version})") + + if schemas_dir and schemas_dir.is_dir(): + errors.extend(_verify_with_pf_core(bundle_dir, schemas_dir)) + + return (len(errors) == 0, errors) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Verify PF-Core artifact bundle") + parser.add_argument("--bundle-dir", type=Path, required=True) + parser.add_argument("--pf-core-version", default=None) + parser.add_argument("--schemas", type=Path, default=None) + args = parser.parse_args(argv) + + ok, errors = verify_bundle( + args.bundle_dir, + pf_core_version=args.pf_core_version, + schemas_dir=args.schemas, + ) + if ok: + print(f"verify_bundle: PASS ({args.bundle_dir})") + return 0 + for err in errors: + print(f"verify_bundle: FAIL — {err}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/PostIncidentProofs/Bundle/PFCore.lean b/src/PostIncidentProofs/Bundle/PFCore.lean new file mode 100644 index 0000000..222628c --- /dev/null +++ b/src/PostIncidentProofs/Bundle/PFCore.lean @@ -0,0 +1,77 @@ +import Lean.Data.Json + +namespace PostIncidentProofs.Bundle.PFCore + +/-- Required PF-Core emit-artifacts files (five-file layout). -/ +def requiredArtifactFiles : List String := [ + "runtime_observation.json", + "event.json", + "trace.json", + "certificate.json", + "audit.jsonl" +] + +structure PFCoreArtifacts where + bundleDir : String + pfCoreVersion : Option String + deriving Repr + +def parseArgs (args : List String) : Option PFCoreArtifacts := Id.run do + let mut bundleDir? : Option String := none + let mut version? : Option String := none + let mut i := 0 + while h : i < args.length do + let arg := args[i] + if arg == "--bundle-dir" then + if h' : i + 1 < args.length then + bundleDir? := some args[i + 1] + i := i + 2 + continue + else + return none + if arg == "--pf-core-version" then + if h' : i + 1 < args.length then + version? := some args[i + 1] + i := i + 2 + continue + else + return none + if bundleDir?.isNone && !arg.startsWith "-" then + bundleDir? := some arg + i := i + 1 + match bundleDir? with + | none => none + | some dir => some { bundleDir := dir, pfCoreVersion := version? } + +def bundleFilesPresent (bundleDir : String) : IO Bool := do + for name in requiredArtifactFiles do + let path := s!"{bundleDir}/{name}" + if !(← System.FilePath.pathExists path) then + return false + return true + +def runPythonVerifier (artifacts : PFCoreArtifacts) : IO UInt32 := do + let script := "scripts/verify_pf_core_bundle.py" + if !(← System.FilePath.pathExists script) then + IO.println "verify_bundle: missing scripts/verify_pf_core_bundle.py" + return 1 + let mut procArgs : Array String := #["--bundle-dir", artifacts.bundleDir] + match artifacts.pfCoreVersion with + | some v => procArgs := procArgs ++ #["--pf-core-version", v] + | none => pure () + let proc ← IO.Process.spawn { + cmd := "python3" + args := #[script] ++ procArgs + stdout := IO.Process.Stdio.inherited + stderr := IO.Process.Stdio.inherited + } + let exitCode ← proc.wait + return exitCode.toUInt32 + +def verifyPFCoreBundle (artifacts : PFCoreArtifacts) : IO UInt32 := do + if !(← bundleFilesPresent artifacts.bundleDir) then + IO.println s!"verify_bundle: missing PF-Core artifact files under {artifacts.bundleDir}" + return 1 + runPythonVerifier artifacts + +end PostIncidentProofs.Bundle.PFCore diff --git a/src/VerifyBundle.lean b/src/VerifyBundle.lean index 56f4f28..0e98cd0 100644 --- a/src/VerifyBundle.lean +++ b/src/VerifyBundle.lean @@ -1,13 +1,19 @@ import PostIncidentProofs.Bundle.Builder +import PostIncidentProofs.Bundle.PFCore import PostIncidentProofs.Utils.Time def main (args : List String) : IO UInt32 := do - match args with - | [] => - IO.println "usage: verify_bundle " - pure 1 - | _ => - let bundle := PostIncidentProofs.Bundle.Builder.create_bundle [] ["spec"] { start := 0, stop := 86400 } - let ok := PostIncidentProofs.Bundle.Builder.verify_bundle bundle - IO.println s!"verify_bundle: {if ok then "PASS" else "FAIL"}" - pure (if ok then 0 else 1) + match PostIncidentProofs.Bundle.PFCore.parseArgs args with + | some pfArtifacts => + PostIncidentProofs.Bundle.PFCore.verifyPFCoreBundle pfArtifacts + | none => + match args with + | [] => + IO.println "usage: verify_bundle --bundle-dir [--pf-core-version ]" + IO.println " verify_bundle " + pure 1 + | _ => + let bundle := PostIncidentProofs.Bundle.Builder.create_bundle [] ["spec"] { start := 0, stop := 86400 } + let ok := PostIncidentProofs.Bundle.Builder.verify_bundle bundle + IO.println s!"verify_bundle: {if ok then "PASS" else "FAIL"}" + pure (if ok then 0 else 1)