Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions scripts/seed_phoenix_calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""Seed the ``glasshat-calibration`` Phoenix dataset with the spike-D YELLOW
optimism over-confidence prior, so the live ``PhoenixMcpConsultant`` (read over
MCP ``get-dataset-examples``) reproduces the deterministic table prior.

Each example is ``input={hat,criterion,bucket}`` + ``output={delta}`` — exactly
the shape ``PhoenixMcpConsultant._parse_examples`` reads. For every preset
criterion and evidence bucket we emit ``n`` examples at the measured mean_delta
(low 1.45/n7, mid 0.80/n10, high 0.31/n16), so the consultant's mean == the table
prior's mean and ``n`` matches.

Run:
PHOENIX_URL=https://glasshat-phoenix-...run.app \
uv run --extra phoenix python scripts/seed_phoenix_calibration.py
"""

from __future__ import annotations

import os

# spike-D held-out YELLOW prior (mirror of engine._YELLOW_DELTA_BY_BUCKET).
_YELLOW_DELTA_BY_BUCKET: dict[str, tuple[float, int]] = {
"low": (1.45, 7),
"mid": (0.80, 10),
"high": (0.31, 16),
}
_DATASET = "glasshat-calibration"


def main() -> None:
import pathlib
import sys

# uv's editable install of the PEP420 glasshat namespace isn't always picked up
# under `--extra phoenix`; add every workspace src dir so the namespace merges.
_root = pathlib.Path(__file__).resolve().parents[1]
for _p in (
"packages/rubric/src",
"packages/shared/src",
"agents/src",
"services/code-grader/src",
"services/ingest/src",
"services/pipeline-orchestrator/src",
):
_abs = str(_root / _p)
if _abs not in sys.path:
sys.path.insert(0, _abs)

from glasshat.rubric.presets import list_presets, load_preset
from phoenix.client import Client

base_url = os.environ.get("PHOENIX_URL") or os.environ.get("PHOENIX_COLLECTOR_ENDPOINT")
if not base_url:
raise SystemExit("PHOENIX_URL (or PHOENIX_COLLECTOR_ENDPOINT) must be set")
api_key = os.environ.get("PHOENIX_API_KEY") or None

inputs: list[dict[str, object]] = []
outputs: list[dict[str, object]] = []
seen: set[str] = set()
for preset_id in list_presets():
for crit in load_preset(preset_id).criteria:
if crit.id in seen: # one set of anchors per criterion id (n stays exact)
continue
seen.add(crit.id)
for bucket, (mean_delta, n) in _YELLOW_DELTA_BY_BUCKET.items():
for _ in range(n):
inputs.append({"hat": "yellow", "criterion": crit.id, "bucket": bucket})
outputs.append({"delta": mean_delta})

client = Client(base_url=base_url, api_key=api_key)

# Idempotent: drop any existing dataset of this name first so re-running
# produces a single clean dataset (the phoenix client exposes no delete, so
# use the REST endpoint directly). Re-seeding is the normal reset path.
import httpx

headers = {"authorization": f"Bearer {api_key}"} if api_key else {}
with httpx.Client(base_url=base_url, headers=headers, timeout=20.0) as http:
existing = http.get("/v1/datasets").json().get("data", [])
for d in existing:
if d.get("name") == _DATASET:
http.delete(f"/v1/datasets/{d['id']}")
print(f" removed existing dataset id={d['id']}")

ds = client.datasets.create_dataset(
Comment on lines +70 to +85

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The seeding script is currently not idempotent. If you run it more than once, client.datasets.create_dataset will raise an HTTP conflict/duplicate error because a dataset with the name glasshat-calibration already exists.

To make the script safely reusable and idempotent (e.g., for local resets or CI/CD pipelines), we should check for and delete any existing dataset with the same name before creating the new one.

    client = Client(base_url=base_url, api_key=api_key)
    for dataset in client.list_datasets():
        if dataset.name == _DATASET:
            client.delete_dataset(id=dataset.id)
            break

    ds = client.datasets.create_dataset(

name=_DATASET,
inputs=inputs,
outputs=outputs,
dataset_description=(
"spike-D YELLOW optimism over-confidence prior — delta by "
"(hat, criterion, evidence-bucket). Read live by PhoenixMcpConsultant."
),
)
print(f"created dataset '{_DATASET}' with {len(inputs)} examples")
print(f" criteria seeded: {sorted(seen)}")
print(f" dataset: {ds}")


if __name__ == "__main__":
main()
34 changes: 23 additions & 11 deletions services/pipeline-orchestrator/src/glasshat/pipeline/adk_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ def build_phoenix_mcp_toolset(base_url: str, api_key: str = "") -> Any: # pragm
)


def _percentile(values: list[float], pct: float) -> float:
ordered = sorted(values)
idx = min(len(ordered) - 1, max(0, round(pct * (len(ordered) - 1))))
return ordered[idx]


class PhoenixMcpConsultant:
"""Consultant that derives calibration stats from Phoenix over MCP stdio.

Expand Down Expand Up @@ -149,7 +143,9 @@ async def _load(self) -> dict[tuple[str, str, str], list[float]]:
async def _call() -> Any:
async with stdio_client(params) as (read, write), ClientSession(read, write) as session:
await session.initialize()
return await session.call_tool("get-dataset-examples", {"dataset": self._dataset})
return await session.call_tool(
"get-dataset-examples", {"dataset_name": self._dataset}
)

# Bound the single round-trip: a hung npx/stdio session must not hang the run.
result = await asyncio.wait_for(_call(), timeout=_MCP_CALL_TIMEOUT)
Expand All @@ -167,8 +163,13 @@ async def consult(self, hat: Hat, criterion_id: str, bucket: str) -> ConsultResu
return ConsultResult(
mean_delta=statistics.mean(deltas),
n=len(deltas),
p25=_percentile(deltas, 0.25),
p75=_percentile(deltas, 0.75),
# p25/p75 are the SCORE clip bounds for apply_correction
# (clip(raw_score, p25, p75)) — NOT delta percentiles. Match the
# table/anchor convention (ConsultResult(mean_delta, n, 0.0, 10.0)) so
# the live MCP correction equals the deterministic prior instead of
# being crushed onto the ±2 cap.
p25=0.0,
p75=10.0,
)


Expand All @@ -179,6 +180,8 @@ def _parse_examples(

Reads the shape the writer emits — ``input.{hat,criterion,bucket}`` +
``output.delta`` — tolerating a flat ``{hat,criterion,bucket,delta}`` fallback.
phoenix-mcp's ``get-dataset-examples`` wraps the rows as
``{"data": {"examples": [...]}}``, so unwrap ``data`` before reading ``examples``.
"""
import json

Expand All @@ -191,7 +194,16 @@ def _parse_examples(
payload = json.loads(text)
except (ValueError, TypeError):
continue
examples = payload if isinstance(payload, list) else payload.get("examples", [])
if isinstance(payload, dict):
if isinstance(payload.get("data"), dict):
payload = payload["data"] # phoenix-mcp wraps rows under "data"
examples = payload.get("examples", [])
elif isinstance(payload, list):
examples = payload
else:
continue # scalar JSON (bool/number/null) — nothing to parse, skip
if not isinstance(examples, list):
continue
for ex in examples:
if not isinstance(ex, dict):
continue
Expand Down Expand Up @@ -251,7 +263,7 @@ async def _call() -> None:
await session.initialize()
await session.call_tool(
"add-dataset-examples",
{"dataset": self._dataset, "examples": rows},
{"dataset_name": self._dataset, "examples": rows},
)

# Bound the round-trip: a hung npx/stdio session must not hang the run.
Expand Down
8 changes: 5 additions & 3 deletions services/pipeline-orchestrator/tests/test_adk_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def __init__(self, items: list[_Item]) -> None:


def _result(examples: list[dict[str, Any]]) -> _ToolResult:
return _ToolResult([_Item(json.dumps({"examples": examples}))])
# Mirror phoenix-mcp@4.0.13's get-dataset-examples shape: rows are wrapped
# under "data" (regression guard for _parse_examples' unwrap).
return _ToolResult([_Item(json.dumps({"data": {"examples": examples}}))])


@contextmanager
Expand Down Expand Up @@ -196,7 +198,7 @@ def test_consultant_fetches_dataset_once_and_filters_locally() -> None:
# Fetched ONCE despite three consults (the cache), and asked the right tool.
fetches = [c for c in recorder if c[0] == "get-dataset-examples"]
assert len(fetches) == 1
assert fetches[0][1] == {"dataset": "glasshat-calibration"}
assert fetches[0][1] == {"dataset_name": "glasshat-calibration"}


def test_writer_calls_add_dataset_examples_with_rows() -> None:
Expand All @@ -214,7 +216,7 @@ def test_writer_calls_add_dataset_examples_with_rows() -> None:
writes = [c for c in recorder if c[0] == "add-dataset-examples"]
assert len(writes) == 1
payload = writes[0][1]
assert payload["dataset"] == "glasshat-calibration"
assert payload["dataset_name"] == "glasshat-calibration"
row = payload["examples"][0]
assert row["input"] == {"hat": "yellow", "criterion": "tech", "bucket": "low"}
assert row["output"] == {"delta": 1.2}
Expand Down
Loading