Skip to content

feat(serve): expose env server / worker stats on /metrics Prometheus endpoint#1415

Open
mvanhorn wants to merge 1 commit into
PrimeIntellect-ai:mainfrom
mvanhorn:fix/1188-prometheus-metrics-env-server
Open

feat(serve): expose env server / worker stats on /metrics Prometheus endpoint#1415
mvanhorn wants to merge 1 commit into
PrimeIntellect-ai:mainfrom
mvanhorn:fix/1188-prometheus-metrics-env-server

Conversation

@mvanhorn
Copy link
Copy Markdown

@mvanhorn mvanhorn commented May 19, 2026

Summary

verifiers/serve/server/metrics.py (new)

"""Prometheus text-format renderer + asyncio HTTP server for env server stats."""

import asyncio
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from verifiers.serve.server.env_router import EnvRouter

logger = logging.getLogger(__name__)


def render_prometheus_text(router_stats, *, env_id: str, version: str) -> str:
    """Render an EnvRouterStats snapshot as Prometheus text exposition format."""
    lines: list[str] = []

    # info gauge
    lines.append("# HELP verifiers_env_server_info Build/identity labels.")
    lines.append("# TYPE verifiers_env_server_info gauge")
    lines.append(
        f'verifiers_env_server_info{{env_id="{env_id}",version="{version}"}} 1'
    )

    # active tasks (router aggregate)
    lines.append("# HELP verifiers_env_active_tasks Total active rollouts across workers.")
    lines.append("# TYPE verifiers_env_active_tasks gauge")
    lines.append(f"verifiers_env_active_tasks {router_stats.active_tasks}")

    # workers total
    lines.append("# HELP verifiers_env_workers_total Configured worker count.")
    lines.append("# TYPE verifiers_env_workers_total gauge")
    lines.append(f"verifiers_env_workers_total {router_stats.num_workers}")

    # per-worker active tasks
    lines.append("# HELP verifiers_env_worker_active_tasks Active rollouts per worker.")
    lines.append("# TYPE verifiers_env_worker_active_tasks gauge")
    for wid, w in sorted(router_stats.workers.items()):
        active = w.active_tasks if w is not None else 0
        lines.append(f'verifiers_env_worker_active_tasks{{worker_id="{wid}"}} {active}')

    # per-worker loop lag quantiles
    lines.append("# HELP verifiers_env_loop_lag_seconds Asyncio event loop lag (seconds).")
    lines.append("# TYPE verifiers_env_loop_lag_seconds gauge")
    # router lag
    lines.append(
        f'verifiers_env_loop_lag_seconds{{worker_id="router",quantile="p50"}} '
        f'{router_stats.lag.p50}'
    )
    lines.append(
        f'verifiers_env_loop_lag_seconds{{worker_id="router",quantile="p95"}} '
        f'{router_stats.lag.p95}'
    )
    lines.append(
        f'verifiers_env_loop_lag_seconds{{worker_id="router",quantile="p99"}} '
        f'{router_stats.lag.p99}'
    )
    for wid, w in sorted(router_stats.workers.items()):
        if w is None:
            continue
        for q in ("p50", "p95", "p99"):
            v = getattr(w.lag, q, 0.0) or 0.0
            lines.append(
                f'verifiers_env_loop_lag_seconds{{worker_id="{wid}",quantile="{q}"}} {v}'
            )

    return "\n".join(lines) + "\n"


class MetricsServer:
    """Asyncio HTTP server serving /metrics in Prometheus text format."""

    def __init__(self, router: "EnvRouter", *, env_id: str, version: str, port: int):
        self.router = router
        self.env_id = env_id
        self.version = version
        self.port = port
        self._server: asyncio.AbstractServer | None = None

    async def start(self) -> None:
        self._server = await asyncio.start_server(self._handle, "0.0.0.0", self.port)
        logger.info(f"Metrics server listening on http://0.0.0.0:{self.port}/metrics")

    async def close(self) -> None:
        if self._server is not None:
            self._server.close()
            await self._server.wait_closed()

    async def _handle(
        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
    ) -> None:
        try:
            request_line = await reader.readline()
            # Drain remaining headers
            while True:
                line = await reader.readline()
                if line in (b"\r\n", b"\n", b""):
                    break
            if not request_line.startswith(b"GET /metrics"):
                writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n")
            else:
                body = render_prometheus_text(
                    self.router.stats, env_id=self.env_id, version=self.version
                ).encode("utf-8")
                headers = (
                    b"HTTP/1.1 200 OK\r\n"
                    b"Content-Type: text/plain; version=0.0.4; charset=utf-8\r\n"
                    + f"Content-Length: {len(body)}\r\n\r\n".encode()
                )
                writer.write(headers + body)
            await writer.drain()
        except Exception:
            logger.exception("metrics server request handling failed")
        finally:
            writer.close()
            try:
                await writer.wait_closed()
            except Exception:
                pass

verifiers/serve/server/env_router.py

Expose the live snapshot via a public stats property so MetricsServer can read it without poking private state. Currently the router builds an EnvRouterStats only inside log_stats() — refactor to a @property (no functional change to the periodic logger; log_stats() calls self.stats).

@property
def stats(self) -> EnvRouterStats:
    return EnvRouterStats(
        lag=self.lag_monitor.stats,
        workers={wid: w.stats for wid, w in self.workers.items()},
    )

verifiers/serve/server/env_server.py

Add metrics_port: int | None = None to EnvServer.__init__. In run_server(), if metrics_port is not None, instantiate and await metrics_server.start() after the router boots, and await metrics_server.close() during shutdown alongside the existing await self.router.close().

verifiers/scripts/serve.py (if it exists; otherwise wherever env-server CLI lives — see verifiers/cli/commands/)

Add --metrics-port argparse flag plumbed through to EnvServer(metrics_port=...). Default None.

Why this matters

Issue #1188 (filed by @mikasenghaas, MEMBER) asks for "Prometheus metrics logging from env server/worker." The body is empty; the title carries the full ask. The env server already maintains rich per-worker stats (EnvWorkerStats: worker_id, timestamp, active_tasks, event-loop lag) and aggregate router stats (EnvRouterStats), and the router logs them on a stats_log_interval (default 10s). What is missing is a scrape-friendly transport: a Prometheus-format text endpoint that operators can wire into existing monitoring infra. The ZMQ ROUTER transport is the only client-facing surface today, which is opaque to standard exporters.

Acceptance:

  • A new --metrics-port flag (and matching metrics_port config) on the env server CLI exposes a Prometheus text endpoint on http://0.0.0.0:<port>/metrics.
  • The endpoint emits the metrics derived from EnvRouterStats + EnvWorkerStats: verifiers_env_active_tasks, verifiers_env_workers_total, verifiers_env_worker_active_tasks{worker_id="N"}, verifiers_env_loop_lag_seconds{worker_id="N",quantile="p50|p95|p99"}, plus a verifiers_env_server_info{env_id=...,version=...} gauge.
  • Default behavior is unchanged when --metrics-port is omitted (no HTTP server started).
  • New unit test verifies a scrape returns valid Prometheus text format and includes the expected metric names.

Testing

tests/test_metrics_server.py (new)

import asyncio
import http.client

import pytest

from verifiers.serve.server.env_router import EnvRouterStats
from verifiers.serve.server.env_worker import EnvWorkerStats
from verifiers.serve.server.metrics import MetricsServer, render_prometheus_text


def test_render_prometheus_text_smoke():
    stats = EnvRouterStats(
        workers={
            0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=3),
            1: EnvWorkerStats(worker_id=1, timestamp=0.0, active_tasks=5),
        }
    )
    body = render_prometheus_text(stats, env_id="math-python", version="0.1.15")
    assert "verifiers_env_active_tasks 8" in body
    assert 'verifiers_env_worker_active_tasks{worker_id="0"} 3' in body
    assert 'verifiers_env_worker_active_tasks{worker_id="1"} 5' in body
    assert 'verifiers_env_server_info{env_id="math-python",version="0.1.15"} 1' in body
    # text format must end in newline
    assert body.endswith("\n")


@pytest.mark.asyncio
async def test_metrics_server_serves_metrics(unused_tcp_port):
    class StubRouter:
        stats = EnvRouterStats(
            workers={0: EnvWorkerStats(worker_id=0, timestamp=0.0, active_tasks=2)}
        )

    server = MetricsServer(
        StubRouter(), env_id="test", version="dev", port=unused_tcp_port
    )
    await server.start()
    try:
        # plain stdlib HTTP client in a thread to avoid asyncio HTTP client dep
        def fetch():
            conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2)
            conn.request("GET", "/metrics")
            r = conn.getresponse()
            return r.status, r.read().decode()

        status, body = await asyncio.get_event_loop().run_in_executor(None, fetch)
        assert status == 200
        assert "verifiers_env_active_tasks 2" in body
    finally:
        await server.close()


@pytest.mark.asyncio
async def test_metrics_server_404_on_other_path(unused_tcp_port):
    class StubRouter:
        stats = EnvRouterStats(workers={})

    server = MetricsServer(StubRouter(), env_id="t", version="x", port=unused_tcp_port)
    await server.start()
    try:
        def fetch():
            conn = http.client.HTTPConnection("127.0.0.1", unused_tcp_port, timeout=2)
            conn.request("GET", "/")
            return conn.getresponse().status

        status = await asyncio.get_event_loop().run_in_executor(None, fetch)
        assert status == 404
    finally:
        await server.close()

Run with uv run pytest tests/test_metrics_server.py -v.

Fixes #1188

AI was used for assistance.


Note

Medium Risk
Adds a new network-facing HTTP listener and threads it through env server startup/shutdown and eval configuration, so misconfiguration (port conflicts/exposure) or handler bugs could impact availability, though default behavior remains unchanged when disabled.

Overview
Adds an optional asyncio HTTP MetricsServer (verifiers/serve/server/metrics.py) that serves Prometheus text metrics at /metrics, including env identity labels, aggregate/per-worker active tasks, and router/worker event-loop lag quantiles.

Plumbs metrics_port through env server lifecycle: Environment.start_server(...) forwards the port to the server process, EnvServer conditionally starts/stops the metrics server, and EnvRouter now exposes a stats snapshot property consumed by the endpoint (and reused by existing periodic logging).

Extends evaluation configuration/CLI to accept --metrics-port and persist it in EvalConfig, and updates loop-lag stats to compute/report p95; adds unit tests validating Prometheus rendering and HTTP behavior (200 on /metrics, 404 otherwise).

Reviewed by Cursor Bugbot for commit df7e644. Bugbot is set up for automated code reviews on this repo. Configure here.

Note

Expose env server and worker stats on a Prometheus /metrics endpoint

  • Adds metrics.py with render_prometheus_text and MetricsServer: an asyncio HTTP server that serves Prometheus text at /metrics (404 for all other paths), including active tasks, worker count, and event loop lag quantiles (p50, p95, p99).
  • Adds a stats property to EnvRouter that returns a structured EnvRouterStats snapshot used by both the metrics endpoint and existing log output.
  • Threads metrics_port through EvalConfig, the eval CLI (--metrics-port), TOML config loading, Environment.start_server, and EnvServer so the endpoint is started automatically when the port is configured.
  • Adds p95 to EventLoopLagStats to support the new quantile reporting.
  • Risk: the metrics server binds an additional TCP port per env server process; no auth or rate limiting is applied.

Macroscope summarized df7e644.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit df7e644. Configure here.

log_level=get_log_level(config.verbose),
log_dir=log_dir,
console_logging=config.disable_tui,
metrics_port=config.metrics_port,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenEnv start_server rejects metrics_port

High Severity

run_evaluation always passes metrics_port into start_server, but OpenEnvEnv.start_server does not declare that keyword and does not forward it to Environment.start_server. Evaluations that use OpenEnvEnv with the env server enabled raise TypeError on startup, even when metrics are disabled.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit df7e644. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Prometheus metrics logging from env server/worker

1 participant