Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions scripts/test_pf_core_bundle.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Unit tests for PF-Core bundle verification (Phase 7 PR-1).
set -euo pipefail

ROOT="$(cd "$(dirname "$0")/.." && pwd)"
PF_CORE_ROOT="${PF_CORE_ROOT:-$ROOT/../provability-fabric-core}"
OUT="${TMPDIR:-/tmp}/pip-pf-core-bundle-test-$$"
OBS="$PF_CORE_ROOT/pf-core/examples/valid/mcp_sidecar_observation.json"
VERSION="$(cat "$PF_CORE_ROOT/pf-core/VERSION" 2>/dev/null || echo "0.6.0")"

cleanup() { rm -rf "$OUT"; }
trap cleanup EXIT

if [ ! -f "$OBS" ]; then
echo "SKIP: provability-fabric-core not found at $PF_CORE_ROOT"
exit 0
fi

PYTHON="${PYTHON:-python3}"
export PYTHONPATH="$PF_CORE_ROOT/pf-core/validator"
$PYTHON -m pip install -q -e "$PF_CORE_ROOT/pf-core/validator" jsonschema referencing 2>/dev/null || true

$PYTHON -m pf_core.cli core emit-artifacts \
--schemas "$PF_CORE_ROOT/pf-core/schemas" \
--file "$OBS" \
--out-dir "$OUT"

$PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" \
--bundle-dir "$OUT" \
--pf-core-version "$VERSION" \
--schemas "$PF_CORE_ROOT/pf-core/schemas"

lake exe verify_bundle -- --bundle-dir "$OUT" --pf-core-version "$VERSION"

# Tampered trace_hash must fail
cp -r "$OUT" "${OUT}-bad"
$PYTHON - <<'PY' "${OUT}-bad/trace.json"
import json, sys
path = sys.argv[1]
data = json.load(open(path, encoding="utf-8"))
data["trace_hash"] = "f" * 64
json.dump(data, open(path, "w", encoding="utf-8"), indent=2, sort_keys=True)
PY
if $PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" --bundle-dir "${OUT}-bad" --pf-core-version "$VERSION"; then
echo "expected tampered bundle to fail"
exit 1
fi

# unsafe certificate must fail
cp -r "$OUT" "${OUT}-unsafe"
$PYTHON - <<'PY' "${OUT}-unsafe/certificate.json"
import json, sys
path = sys.argv[1]
data = json.load(open(path, encoding="utf-8"))
data["safe"] = False
json.dump(data, open(path, "w", encoding="utf-8"), indent=2, sort_keys=True)
PY
if $PYTHON "$ROOT/scripts/verify_pf_core_bundle.py" --bundle-dir "${OUT}-unsafe" --pf-core-version "$VERSION"; then
echo "expected unsafe certificate to fail"
exit 1
fi

echo "OK: PF-Core bundle verification tests passed"
174 changes: 174 additions & 0 deletions scripts/verify_pf_core_bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""Verify PF-Core emit-artifacts five-file bundle layout (Phase 7 PR-1)."""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path
from typing import Any, Mapping

REQUIRED_FILES = (
"runtime_observation.json",
"event.json",
"trace.json",
"certificate.json",
"audit.jsonl",
)

GENESIS = "0" * 64


def _load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))


def _normalize_hash(value: str) -> str:
text = str(value).strip().lower()
if text.startswith("sha256:"):
text = text[7:]
if len(text) != 64 or any(c not in "0123456789abcdef" for c in text):
raise ValueError(f"invalid hash: {value!r}")
return text


def _verify_structure(bundle_dir: Path) -> list[str]:
errors: list[str] = []
for name in REQUIRED_FILES:
if not (bundle_dir / name).is_file():
errors.append(f"missing required file: {name}")
if errors:
return errors

cert = _load_json(bundle_dir / "certificate.json")
trace = _load_json(bundle_dir / "trace.json")
event = _load_json(bundle_dir / "event.json")
obs = _load_json(bundle_dir / "runtime_observation.json")

if cert.get("safe") is not True:
errors.append("certificate.safe must be true for passing bundles")

cert_trace = _normalize_hash(str(cert.get("trace_hash", "")))
trace_hash = _normalize_hash(str(trace.get("trace_hash", "")))
if cert_trace != trace_hash:
errors.append(
f"certificate.trace_hash ({cert_trace}) != trace.trace_hash ({trace_hash})"
)

events = trace.get("events")
if not isinstance(events, list) or not events:
errors.append("trace.events must be a non-empty list")
return errors

prev = GENESIS
for idx, ev in enumerate(events):
if not isinstance(ev, Mapping):
errors.append(f"events[{idx}] must be an object")
continue
prev_field = _normalize_hash(str(ev.get("previous_event_hash", "")))
if prev_field != prev:
errors.append(
f"events[{idx}].previous_event_hash mismatch: expected {prev}, got {prev_field}"
)
event_hash = _normalize_hash(str(ev.get("event_hash", "")))
prev = event_hash

if events:
top_event_hash = _normalize_hash(str(event.get("event_hash", "")))
last_trace_hash = _normalize_hash(str(events[-1].get("event_hash", "")))
if top_event_hash != last_trace_hash:
errors.append("event.json event_hash must match last trace event")

audit_lines = [
line.strip()
for line in (bundle_dir / "audit.jsonl").read_text(encoding="utf-8").splitlines()
if line.strip()
]
if not audit_lines:
errors.append("audit.jsonl must contain at least one line")
else:
audit = json.loads(audit_lines[0])
audit_trace = _normalize_hash(str(audit.get("trace_hash", "")))
if audit_trace != trace_hash:
errors.append("audit.jsonl trace_hash must match trace.trace_hash")
audit_event = _normalize_hash(str(audit.get("event_hash", "")))
event_hash = _normalize_hash(str(event.get("event_hash", "")))
if audit_event != event_hash:
errors.append("audit.jsonl event_hash must match event.json")

if obs.get("schema_version") != "pf-core.runtime_observation.v1":
errors.append("runtime_observation.json schema_version must be v1")

return errors


def _verify_with_pf_core(bundle_dir: Path, schemas_dir: Path | None) -> list[str]:
try:
from pf_core.hash_chain import validate_trace_hashes
from pf_core.schemas import load_registry, validate_object
except ImportError:
return []

errors: list[str] = []
if schemas_dir is None:
return errors

registry = load_registry(schemas_dir)
for name in ("runtime_observation.json", "event.json", "trace.json", "certificate.json"):
obj = _load_json(bundle_dir / name)
try:
validate_object(obj, registry)
except Exception as exc: # noqa: BLE001 - aggregate validation errors
errors.append(f"{name}: schema validation failed: {exc}")

trace = _load_json(bundle_dir / "trace.json")
try:
validate_trace_hashes(trace)
except Exception as exc: # noqa: BLE001
errors.append(f"trace hash chain recomputation failed: {exc}")

return errors


def verify_bundle(
bundle_dir: Path,
*,
pf_core_version: str | None = None,
schemas_dir: Path | None = None,
) -> tuple[bool, list[str]]:
errors = _verify_structure(bundle_dir)
if pf_core_version:
version_file = bundle_dir / "VERSION"
if version_file.is_file():
if version_file.read_text(encoding="utf-8").strip() != pf_core_version:
errors.append(f"bundle VERSION mismatch (expected {pf_core_version})")

if schemas_dir and schemas_dir.is_dir():
errors.extend(_verify_with_pf_core(bundle_dir, schemas_dir))

return (len(errors) == 0, errors)


def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Verify PF-Core artifact bundle")
parser.add_argument("--bundle-dir", type=Path, required=True)
parser.add_argument("--pf-core-version", default=None)
parser.add_argument("--schemas", type=Path, default=None)
args = parser.parse_args(argv)

ok, errors = verify_bundle(
args.bundle_dir,
pf_core_version=args.pf_core_version,
schemas_dir=args.schemas,
)
if ok:
print(f"verify_bundle: PASS ({args.bundle_dir})")
return 0
for err in errors:
print(f"verify_bundle: FAIL — {err}", file=sys.stderr)
return 1


if __name__ == "__main__":
raise SystemExit(main())
77 changes: 77 additions & 0 deletions src/PostIncidentProofs/Bundle/PFCore.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import Lean.Data.Json

namespace PostIncidentProofs.Bundle.PFCore

/-- Required PF-Core emit-artifacts files (five-file layout). -/
def requiredArtifactFiles : List String := [
"runtime_observation.json",
"event.json",
"trace.json",
"certificate.json",
"audit.jsonl"
]

structure PFCoreArtifacts where
bundleDir : String
pfCoreVersion : Option String
deriving Repr

def parseArgs (args : List String) : Option PFCoreArtifacts := Id.run do
let mut bundleDir? : Option String := none
let mut version? : Option String := none
let mut i := 0
while h : i < args.length do
let arg := args[i]
if arg == "--bundle-dir" then
if h' : i + 1 < args.length then
bundleDir? := some args[i + 1]
i := i + 2
continue
else
return none
if arg == "--pf-core-version" then
if h' : i + 1 < args.length then
version? := some args[i + 1]
i := i + 2
continue
else
return none
if bundleDir?.isNone && !arg.startsWith "-" then
bundleDir? := some arg
i := i + 1
match bundleDir? with
| none => none
| some dir => some { bundleDir := dir, pfCoreVersion := version? }

def bundleFilesPresent (bundleDir : String) : IO Bool := do
for name in requiredArtifactFiles do
let path := s!"{bundleDir}/{name}"
if !(← System.FilePath.pathExists path) then
return false
return true

def runPythonVerifier (artifacts : PFCoreArtifacts) : IO UInt32 := do
let script := "scripts/verify_pf_core_bundle.py"
if !(← System.FilePath.pathExists script) then
IO.println "verify_bundle: missing scripts/verify_pf_core_bundle.py"
return 1
let mut procArgs : Array String := #["--bundle-dir", artifacts.bundleDir]
match artifacts.pfCoreVersion with
| some v => procArgs := procArgs ++ #["--pf-core-version", v]
| none => pure ()
let proc ← IO.Process.spawn {
cmd := "python3"
args := #[script] ++ procArgs
stdout := IO.Process.Stdio.inherited
stderr := IO.Process.Stdio.inherited
}
let exitCode ← proc.wait
return exitCode.toUInt32

def verifyPFCoreBundle (artifacts : PFCoreArtifacts) : IO UInt32 := do
if !(← bundleFilesPresent artifacts.bundleDir) then
IO.println s!"verify_bundle: missing PF-Core artifact files under {artifacts.bundleDir}"
return 1
runPythonVerifier artifacts

end PostIncidentProofs.Bundle.PFCore
24 changes: 15 additions & 9 deletions src/VerifyBundle.lean
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import PostIncidentProofs.Bundle.Builder
import PostIncidentProofs.Bundle.PFCore
import PostIncidentProofs.Utils.Time

def main (args : List String) : IO UInt32 := do
match args with
| [] =>
IO.println "usage: verify_bundle <path>"
pure 1
| _ =>
let bundle := PostIncidentProofs.Bundle.Builder.create_bundle [] ["spec"] { start := 0, stop := 86400 }
let ok := PostIncidentProofs.Bundle.Builder.verify_bundle bundle
IO.println s!"verify_bundle: {if ok then "PASS" else "FAIL"}"
pure (if ok then 0 else 1)
match PostIncidentProofs.Bundle.PFCore.parseArgs args with
| some pfArtifacts =>
PostIncidentProofs.Bundle.PFCore.verifyPFCoreBundle pfArtifacts
| none =>
match args with
| [] =>
IO.println "usage: verify_bundle --bundle-dir <dir> [--pf-core-version <ver>]"
IO.println " verify_bundle <legacy-path>"
pure 1
| _ =>
let bundle := PostIncidentProofs.Bundle.Builder.create_bundle [] ["spec"] { start := 0, stop := 86400 }
let ok := PostIncidentProofs.Bundle.Builder.verify_bundle bundle
IO.println s!"verify_bundle: {if ok then "PASS" else "FAIL"}"
pure (if ok then 0 else 1)
Loading