Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions tests/test_metrics_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
import http.client

import pytest

from verifiers.serve.server.env_router import EnvRouterStats
from verifiers.serve.server.env_worker import EnvWorkerStats
from verifiers.serve.server.metrics import MetricsServer, render_prometheus_text


def test_render_prometheus_text_smoke():
stats = EnvRouterStats(
workers={
0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=3),
1: EnvWorkerStats(worker_id=1, timestamp=0.0, active_tasks=5),
}
)

body = render_prometheus_text(stats, env_id="math-python", version="0.1.15")

assert "verifiers_env_active_tasks 8" in body
assert "verifiers_env_workers_total 2" in body
assert 'verifiers_env_worker_active_tasks{worker_id="0"} 3' in body
assert 'verifiers_env_worker_active_tasks{worker_id="1"} 5' in body
assert 'verifiers_env_server_info{env_id="math-python",version="0.1.15"} 1' in body
assert 'verifiers_env_loop_lag_seconds{worker_id="0",quantile="p95"} 0.0' in body
assert body.endswith("\n")


@pytest.mark.asyncio
async def test_metrics_server_serves_metrics(unused_tcp_port):
class StubRouter:
stats = EnvRouterStats(
workers={0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=2)}
)

server = MetricsServer(
StubRouter(), env_id="test", version="dev", port=unused_tcp_port
)
await server.start()
try:

def fetch():
conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2)
conn.request("GET", "/metrics")
response = conn.getresponse()
return response.status, response.read().decode()

status, body = await asyncio.get_running_loop().run_in_executor(None, fetch)

assert status == 200
assert "verifiers_env_active_tasks 2" in body
assert "verifiers_env_workers_total 1" in body
finally:
await server.close()


@pytest.mark.asyncio
async def test_metrics_server_404_on_other_path(unused_tcp_port):
class StubRouter:
stats = EnvRouterStats(workers={})

server = MetricsServer(StubRouter(), env_id="t", version="x", port=unused_tcp_port)
await server.start()
try:

def fetch():
conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2)
conn.request("GET", "/")
return conn.getresponse().status

status = await asyncio.get_running_loop().run_in_executor(None, fetch)

assert status == 404
finally:
await server.close()
2 changes: 2 additions & 0 deletions verifiers/envs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,7 @@ async def start_server(
log_level: str | None = None,
log_dir: str | None = None,
console_logging: bool = True,
metrics_port: int | None = None,
# health check configs
health_check_interval: float = 1.0, # 1s
startup_timeout: float = 600.0, # 10m
Expand Down Expand Up @@ -1349,6 +1350,7 @@ async def start_server(
kwargs=dict(
address=address,
num_workers=num_workers,
metrics_port=metrics_port,
death_pipe=death_pipe_reader,
),
daemon=False,
Expand Down
7 changes: 7 additions & 0 deletions verifiers/scripts/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ def build_parser() -> argparse.ArgumentParser:
default="auto",
help='Number of env server worker processes ("auto" = concurrency // 256, or an integer)',
)
parser.add_argument(
"--metrics-port",
type=int,
default=None,
help="Expose env server Prometheus metrics at http://0.0.0.0:<port>/metrics",
)
parser.add_argument(
"--abbreviated-summary",
"-A",
Expand Down Expand Up @@ -813,6 +819,7 @@ def build_eval_config(raw: dict) -> EvalConfig:
max_retries=raw.get("max_retries", 0),
num_workers=raw.get("num_workers", "auto"),
disable_env_server=raw.get("disable_env_server", False),
metrics_port=raw.get("metrics_port"),
verbose=raw.get("verbose", False),
disable_tui=raw.get("disable_tui", False),
state_columns=raw.get("state_columns", []),
Expand Down
14 changes: 9 additions & 5 deletions verifiers/serve/server/env_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ def active_requests(self) -> dict[bytes, ActiveRequestInfo]:
for rid, info in handle.active_requests.items()
}

@property
def stats(self) -> EnvRouterStats:
"""Current router and worker stats snapshot."""
return EnvRouterStats(
lag=EventLoopLagStats.from_monitor(self.lag_monitor),
workers={wid: w.stats for wid, w in sorted(self.workers.items())},
)

def get_worker_name(self, worker_id: int) -> str:
"""Get the name of an env worker."""
return f"{self.env_id}-{worker_id}"
Expand Down Expand Up @@ -396,11 +404,7 @@ async def check_workers(self) -> None:

def log_stats(self) -> None:
"""Log server lag + per-worker stats."""
stats = EnvRouterStats(
lag=EventLoopLagStats.from_monitor(self.lag_monitor),
workers={wid: w.stats for wid, w in sorted(self.workers.items())},
)
self.logger.info(stats)
self.logger.info(self.stats)

async def close(self) -> None:
"""Close all router resources."""
Expand Down
15 changes: 15 additions & 0 deletions verifiers/serve/server/env_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import verifiers as vf
from verifiers.serve.server.env_router import EnvRouter
from verifiers.serve.server.metrics import MetricsServer
from verifiers.utils.process_utils import monitor_death_pipe, set_proc_title


Expand All @@ -39,10 +40,14 @@ def __init__(
num_workers: int = 1,
worker_heartbeat_timeout: float = 30.0,
stats_log_interval: float = 10.0,
metrics_port: int | None = None,
death_pipe: Connection | None = None,
):
set_proc_title("EnvServer")
self.env_id = env_id
self.death_pipe = death_pipe
self.metrics_port = metrics_port
self.metrics_server: MetricsServer | None = None

logger_kwargs: dict[str, Any] = {
"console_logging": console_logging,
Expand Down Expand Up @@ -99,11 +104,21 @@ def signal_handler(sig, frame):
signal.signal(signal.SIGINT, signal_handler)

try:
if self.metrics_port is not None:
self.metrics_server = MetricsServer(
self.router,
env_id=self.env_id,
version=vf.__version__,
port=self.metrics_port,
)
await self.metrics_server.start()
await self.serve(stop_event=stop_event)
finally:
# Ignore signals during cleanup to avoid interrupting teardown.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
if self.metrics_server is not None:
await self.metrics_server.close()
await self.router.close()
await self.close()

Expand Down
135 changes: 135 additions & 0 deletions verifiers/serve/server/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Prometheus text-format metrics for env server stats."""

import asyncio
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from verifiers.serve.server.env_router import EnvRouter, EnvRouterStats

from verifiers.utils.async_utils import EventLoopLagStats

logger = logging.getLogger(__name__)


def _escape_label_value(value: str) -> str:
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')


def _lag_quantiles(lag: EventLoopLagStats) -> dict[str, float]:
return {
"p50": lag.median,
"p95": lag.p95,
"p99": lag.p99,
}


def render_prometheus_text(
router_stats: "EnvRouterStats", *, env_id: str, version: str
) -> str:
"""Render an EnvRouterStats snapshot as Prometheus text exposition format."""
escaped_env_id = _escape_label_value(env_id)
escaped_version = _escape_label_value(version)
lines: list[str] = [
"# HELP verifiers_env_server_info Env server build and identity labels.",
"# TYPE verifiers_env_server_info gauge",
(
"verifiers_env_server_info"
f'{{env_id="{escaped_env_id}",version="{escaped_version}"}} 1'
),
"# HELP verifiers_env_active_tasks Total active rollouts across workers.",
"# TYPE verifiers_env_active_tasks gauge",
f"verifiers_env_active_tasks {router_stats.active_tasks}",
"# HELP verifiers_env_workers_total Configured worker count.",
"# TYPE verifiers_env_workers_total gauge",
f"verifiers_env_workers_total {router_stats.num_workers}",
"# HELP verifiers_env_worker_active_tasks Active rollouts per worker.",
"# TYPE verifiers_env_worker_active_tasks gauge",
]

for worker_id, worker_stats in sorted(router_stats.workers.items()):
active_tasks = worker_stats.active_tasks if worker_stats is not None else 0
lines.append(
f'verifiers_env_worker_active_tasks{{worker_id="{worker_id}"}} {active_tasks}'
)

lines.extend(
[
"# HELP verifiers_env_loop_lag_seconds Asyncio event loop lag in seconds.",
"# TYPE verifiers_env_loop_lag_seconds gauge",
]
)
for quantile, value in _lag_quantiles(router_stats.lag).items():
lines.append(
"verifiers_env_loop_lag_seconds"
f'{{worker_id="router",quantile="{quantile}"}} {value}'
)
for worker_id, worker_stats in sorted(router_stats.workers.items()):
if worker_stats is None:
continue
for quantile, value in _lag_quantiles(worker_stats.lag).items():
lines.append(
"verifiers_env_loop_lag_seconds"
f'{{worker_id="{worker_id}",quantile="{quantile}"}} {value}'
)

return "\n".join(lines) + "\n"


class MetricsServer:
"""Asyncio HTTP server serving /metrics in Prometheus text format."""

def __init__(
self, router: "EnvRouter", *, env_id: str, version: str, port: int
) -> None:
self.router = router
self.env_id = env_id
self.version = version
self.port = port
self.server: asyncio.AbstractServer | None = None

async def start(self) -> None:
self.server = await asyncio.start_server(self.handle, "0.0.0.0", self.port)
logger.info(f"Metrics server listening on http://0.0.0.0:{self.port}/metrics")

async def close(self) -> None:
if self.server is None:
return
self.server.close()
await self.server.wait_closed()
self.server = None

async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
try:
request_line = await reader.readline()
while True:
line = await reader.readline()
if line in (b"\r\n", b"\n", b""):
break

parts = request_line.split()
if len(parts) < 2 or parts[0] != b"GET" or parts[1] != b"/metrics":
writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n")
else:
body = render_prometheus_text(
self.router.stats,
env_id=self.env_id,
version=self.version,
).encode("utf-8")
headers = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n"
+ f"Content-Length: {len(body)}\r\n\r\n".encode()
)
writer.write(headers + body)
await writer.drain()
except Exception:
logger.exception("Metrics server request handling failed")
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
1 change: 1 addition & 0 deletions verifiers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ class EvalConfig(BaseModel):
extra_env_kwargs: dict = {}
max_retries: int = 0
disable_env_server: bool = False
metrics_port: int | None = None
# logging
verbose: bool = False
disable_tui: bool = False
Expand Down
5 changes: 4 additions & 1 deletion verifiers/utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class EventLoopLagStats(BaseModel):
mean: float = 0.0
median: float = 0.0
p90: float = 0.0
p95: float = 0.0
p99: float = 0.0
max: float = 0.0
n: int = 0
Expand All @@ -127,7 +128,8 @@ def __str__(self) -> str:
return (
f"min={print_time(self.min)} mean={print_time(self.mean)} "
f"median={print_time(self.median)} p90={print_time(self.p90)} "
f"p99={print_time(self.p99)} max={print_time(self.max)} (n={self.n})"
f"p95={print_time(self.p95)} p99={print_time(self.p99)} "
f"max={print_time(self.max)} (n={self.n})"
)

@classmethod
Expand All @@ -142,6 +144,7 @@ def from_monitor(cls, monitor: EventLoopLagMonitor) -> "EventLoopLagStats":
mean=float(arr.mean()),
median=float(np.median(arr)),
p90=float(np.percentile(arr, 90)),
p95=float(np.percentile(arr, 95)),
p99=float(np.percentile(arr, 99)),
max=float(arr.max()),
n=n,
Expand Down
2 changes: 2 additions & 0 deletions verifiers/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ def load_toml_config(
"max_retries",
"num_workers",
"disable_env_server",
"metrics_port",
"timeout",
# logging
"verbose",
Expand Down Expand Up @@ -987,6 +988,7 @@ async def run_evaluation(
log_level=get_log_level(config.verbose),
log_dir=log_dir,
console_logging=config.disable_tui,
metrics_port=config.metrics_port,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenEnv start_server rejects metrics_port

High Severity

run_evaluation always passes metrics_port into start_server, but OpenEnvEnv.start_server does not declare that keyword and does not forward it to Environment.start_server. Evaluations that use OpenEnvEnv with the env server enabled raise TypeError on startup, even when metrics are disabled.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit df7e644. Configure here.

)
if on_log_file is not None:
from verifiers.serve import EnvServer
Expand Down