diff --git a/tests/test_metrics_server.py b/tests/test_metrics_server.py new file mode 100644 index 000000000..881e6d367 --- /dev/null +++ b/tests/test_metrics_server.py @@ -0,0 +1,76 @@ +import asyncio +import http.client + +import pytest + +from verifiers.serve.server.env_router import EnvRouterStats +from verifiers.serve.server.env_worker import EnvWorkerStats +from verifiers.serve.server.metrics import MetricsServer, render_prometheus_text + + +def test_render_prometheus_text_smoke(): + stats = EnvRouterStats( + workers={ + 0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=3), + 1: EnvWorkerStats(worker_id=1, timestamp=0.0, active_tasks=5), + } + ) + + body = render_prometheus_text(stats, env_id="math-python", version="0.1.15") + + assert "verifiers_env_active_tasks 8" in body + assert "verifiers_env_workers_total 2" in body + assert 'verifiers_env_worker_active_tasks{worker_id="0"} 3' in body + assert 'verifiers_env_worker_active_tasks{worker_id="1"} 5' in body + assert 'verifiers_env_server_info{env_id="math-python",version="0.1.15"} 1' in body + assert 'verifiers_env_loop_lag_seconds{worker_id="0",quantile="p95"} 0.0' in body + assert body.endswith("\n") + + +@pytest.mark.asyncio +async def test_metrics_server_serves_metrics(unused_tcp_port): + class StubRouter: + stats = EnvRouterStats( + workers={0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=2)} + ) + + server = MetricsServer( + StubRouter(), env_id="test", version="dev", port=unused_tcp_port + ) + await server.start() + try: + + def fetch(): + conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2) + conn.request("GET", "/metrics") + response = conn.getresponse() + return response.status, response.read().decode() + + status, body = await asyncio.get_running_loop().run_in_executor(None, fetch) + + assert status == 200 + assert "verifiers_env_active_tasks 2" in body + assert "verifiers_env_workers_total 1" in body + finally: + await server.close() + + +@pytest.mark.asyncio +async def test_metrics_server_404_on_other_path(unused_tcp_port): + class StubRouter: + stats = EnvRouterStats(workers={}) + + server = MetricsServer(StubRouter(), env_id="t", version="x", port=unused_tcp_port) + await server.start() + try: + + def fetch(): + conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2) + conn.request("GET", "/") + return conn.getresponse().status + + status = await asyncio.get_running_loop().run_in_executor(None, fetch) + + assert status == 404 + finally: + await server.close() diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index ed379f086..79ca4f39c 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -1308,6 +1308,7 @@ async def start_server( log_level: str | None = None, log_dir: str | None = None, console_logging: bool = True, + metrics_port: int | None = None, # health check configs health_check_interval: float = 1.0, # 1s startup_timeout: float = 600.0, # 10m @@ -1349,6 +1350,7 @@ async def start_server( kwargs=dict( address=address, num_workers=num_workers, + metrics_port=metrics_port, death_pipe=death_pipe_reader, ), daemon=False, diff --git a/verifiers/scripts/eval.py b/verifiers/scripts/eval.py index 0c3c07828..fd151e4db 100644 --- a/verifiers/scripts/eval.py +++ b/verifiers/scripts/eval.py @@ -483,6 +483,12 @@ def build_parser() -> argparse.ArgumentParser: default="auto", help='Number of env server worker processes ("auto" = concurrency // 256, or an integer)', ) + parser.add_argument( + "--metrics-port", + type=int, + default=None, + help="Expose env server Prometheus metrics at http://0.0.0.0:/metrics", + ) parser.add_argument( "--abbreviated-summary", "-A", @@ -813,6 +819,7 @@ def build_eval_config(raw: dict) -> EvalConfig: max_retries=raw.get("max_retries", 0), num_workers=raw.get("num_workers", "auto"), disable_env_server=raw.get("disable_env_server", False), + metrics_port=raw.get("metrics_port"), verbose=raw.get("verbose", False), disable_tui=raw.get("disable_tui", False), state_columns=raw.get("state_columns", []), diff --git a/verifiers/serve/server/env_router.py b/verifiers/serve/server/env_router.py index a1dd5a4a7..65353ec32 100644 --- a/verifiers/serve/server/env_router.py +++ b/verifiers/serve/server/env_router.py @@ -158,6 +158,14 @@ def active_requests(self) -> dict[bytes, ActiveRequestInfo]: for rid, info in handle.active_requests.items() } + @property + def stats(self) -> EnvRouterStats: + """Current router and worker stats snapshot.""" + return EnvRouterStats( + lag=EventLoopLagStats.from_monitor(self.lag_monitor), + workers={wid: w.stats for wid, w in sorted(self.workers.items())}, + ) + def get_worker_name(self, worker_id: int) -> str: """Get the name of an env worker.""" return f"{self.env_id}-{worker_id}" @@ -396,11 +404,7 @@ async def check_workers(self) -> None: def log_stats(self) -> None: """Log server lag + per-worker stats.""" - stats = EnvRouterStats( - lag=EventLoopLagStats.from_monitor(self.lag_monitor), - workers={wid: w.stats for wid, w in sorted(self.workers.items())}, - ) - self.logger.info(stats) + self.logger.info(self.stats) async def close(self) -> None: """Close all router resources.""" diff --git a/verifiers/serve/server/env_server.py b/verifiers/serve/server/env_server.py index 14f397938..9c3a6437c 100644 --- a/verifiers/serve/server/env_server.py +++ b/verifiers/serve/server/env_server.py @@ -15,6 +15,7 @@ import verifiers as vf from verifiers.serve.server.env_router import EnvRouter +from verifiers.serve.server.metrics import MetricsServer from verifiers.utils.process_utils import monitor_death_pipe, set_proc_title @@ -39,10 +40,14 @@ def __init__( num_workers: int = 1, worker_heartbeat_timeout: float = 30.0, stats_log_interval: float = 10.0, + metrics_port: int | None = None, death_pipe: Connection | None = None, ): set_proc_title("EnvServer") + self.env_id = env_id self.death_pipe = death_pipe + self.metrics_port = metrics_port + self.metrics_server: MetricsServer | None = None logger_kwargs: dict[str, Any] = { "console_logging": console_logging, @@ -99,11 +104,21 @@ def signal_handler(sig, frame): signal.signal(signal.SIGINT, signal_handler) try: + if self.metrics_port is not None: + self.metrics_server = MetricsServer( + self.router, + env_id=self.env_id, + version=vf.__version__, + port=self.metrics_port, + ) + await self.metrics_server.start() await self.serve(stop_event=stop_event) finally: # Ignore signals during cleanup to avoid interrupting teardown. signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) + if self.metrics_server is not None: + await self.metrics_server.close() await self.router.close() await self.close() diff --git a/verifiers/serve/server/metrics.py b/verifiers/serve/server/metrics.py new file mode 100644 index 000000000..17b88c40b --- /dev/null +++ b/verifiers/serve/server/metrics.py @@ -0,0 +1,135 @@ +"""Prometheus text-format metrics for env server stats.""" + +import asyncio +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from verifiers.serve.server.env_router import EnvRouter, EnvRouterStats + +from verifiers.utils.async_utils import EventLoopLagStats + +logger = logging.getLogger(__name__) + + +def _escape_label_value(value: str) -> str: + return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') + + +def _lag_quantiles(lag: EventLoopLagStats) -> dict[str, float]: + return { + "p50": lag.median, + "p95": lag.p95, + "p99": lag.p99, + } + + +def render_prometheus_text( + router_stats: "EnvRouterStats", *, env_id: str, version: str +) -> str: + """Render an EnvRouterStats snapshot as Prometheus text exposition format.""" + escaped_env_id = _escape_label_value(env_id) + escaped_version = _escape_label_value(version) + lines: list[str] = [ + "# HELP verifiers_env_server_info Env server build and identity labels.", + "# TYPE verifiers_env_server_info gauge", + ( + "verifiers_env_server_info" + f'{{env_id="{escaped_env_id}",version="{escaped_version}"}} 1' + ), + "# HELP verifiers_env_active_tasks Total active rollouts across workers.", + "# TYPE verifiers_env_active_tasks gauge", + f"verifiers_env_active_tasks {router_stats.active_tasks}", + "# HELP verifiers_env_workers_total Configured worker count.", + "# TYPE verifiers_env_workers_total gauge", + f"verifiers_env_workers_total {router_stats.num_workers}", + "# HELP verifiers_env_worker_active_tasks Active rollouts per worker.", + "# TYPE verifiers_env_worker_active_tasks gauge", + ] + + for worker_id, worker_stats in sorted(router_stats.workers.items()): + active_tasks = worker_stats.active_tasks if worker_stats is not None else 0 + lines.append( + f'verifiers_env_worker_active_tasks{{worker_id="{worker_id}"}} {active_tasks}' + ) + + lines.extend( + [ + "# HELP verifiers_env_loop_lag_seconds Asyncio event loop lag in seconds.", + "# TYPE verifiers_env_loop_lag_seconds gauge", + ] + ) + for quantile, value in _lag_quantiles(router_stats.lag).items(): + lines.append( + "verifiers_env_loop_lag_seconds" + f'{{worker_id="router",quantile="{quantile}"}} {value}' + ) + for worker_id, worker_stats in sorted(router_stats.workers.items()): + if worker_stats is None: + continue + for quantile, value in _lag_quantiles(worker_stats.lag).items(): + lines.append( + "verifiers_env_loop_lag_seconds" + f'{{worker_id="{worker_id}",quantile="{quantile}"}} {value}' + ) + + return "\n".join(lines) + "\n" + + +class MetricsServer: + """Asyncio HTTP server serving /metrics in Prometheus text format.""" + + def __init__( + self, router: "EnvRouter", *, env_id: str, version: str, port: int + ) -> None: + self.router = router + self.env_id = env_id + self.version = version + self.port = port + self.server: asyncio.AbstractServer | None = None + + async def start(self) -> None: + self.server = await asyncio.start_server(self.handle, "0.0.0.0", self.port) + logger.info(f"Metrics server listening on http://0.0.0.0:{self.port}/metrics") + + async def close(self) -> None: + if self.server is None: + return + self.server.close() + await self.server.wait_closed() + self.server = None + + async def handle( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + try: + request_line = await reader.readline() + while True: + line = await reader.readline() + if line in (b"\r\n", b"\n", b""): + break + + parts = request_line.split() + if len(parts) < 2 or parts[0] != b"GET" or parts[1] != b"/metrics": + writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") + else: + body = render_prometheus_text( + self.router.stats, + env_id=self.env_id, + version=self.version, + ).encode("utf-8") + headers = ( + b"HTTP/1.1 200 OK\r\n" + b"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n" + + f"Content-Length: {len(body)}\r\n\r\n".encode() + ) + writer.write(headers + body) + await writer.drain() + except Exception: + logger.exception("Metrics server request handling failed") + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: + pass diff --git a/verifiers/types.py b/verifiers/types.py index 4d8e965f9..017f9e3fb 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -1126,6 +1126,7 @@ class EvalConfig(BaseModel): extra_env_kwargs: dict = {} max_retries: int = 0 disable_env_server: bool = False + metrics_port: int | None = None # logging verbose: bool = False disable_tui: bool = False diff --git a/verifiers/utils/async_utils.py b/verifiers/utils/async_utils.py index 1c96363b7..dc4adfda0 100644 --- a/verifiers/utils/async_utils.py +++ b/verifiers/utils/async_utils.py @@ -115,6 +115,7 @@ class EventLoopLagStats(BaseModel): mean: float = 0.0 median: float = 0.0 p90: float = 0.0 + p95: float = 0.0 p99: float = 0.0 max: float = 0.0 n: int = 0 @@ -127,7 +128,8 @@ def __str__(self) -> str: return ( f"min={print_time(self.min)} mean={print_time(self.mean)} " f"median={print_time(self.median)} p90={print_time(self.p90)} " - f"p99={print_time(self.p99)} max={print_time(self.max)} (n={self.n})" + f"p95={print_time(self.p95)} p99={print_time(self.p99)} " + f"max={print_time(self.max)} (n={self.n})" ) @classmethod @@ -142,6 +144,7 @@ def from_monitor(cls, monitor: EventLoopLagMonitor) -> "EventLoopLagStats": mean=float(arr.mean()), median=float(np.median(arr)), p90=float(np.percentile(arr, 90)), + p95=float(np.percentile(arr, 95)), p99=float(np.percentile(arr, 99)), max=float(arr.max()), n=n, diff --git a/verifiers/utils/eval_utils.py b/verifiers/utils/eval_utils.py index 5e7c8651d..9364553f4 100644 --- a/verifiers/utils/eval_utils.py +++ b/verifiers/utils/eval_utils.py @@ -567,6 +567,7 @@ def load_toml_config( "max_retries", "num_workers", "disable_env_server", + "metrics_port", "timeout", # logging "verbose", @@ -987,6 +988,7 @@ async def run_evaluation( log_level=get_log_level(config.verbose), log_dir=log_dir, console_logging=config.disable_tui, + metrics_port=config.metrics_port, ) if on_log_file is not None: from verifiers.serve import EnvServer