diff --git a/Cargo.lock b/Cargo.lock index 83b470e2..164a38da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3105,6 +3105,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89550ee9f79e88fef3119de263694973a8adb26c21d75322164fb8c493039fe2" +dependencies = [ + "portable-atomic", + "rapidhash", +] + [[package]] name = "mime" version = "0.3.17" @@ -3797,7 +3807,9 @@ dependencies = [ name = "openinfer-engine" version = "0.1.0" dependencies = [ + "metrics", "tokio", + "tracing", ] [[package]] @@ -3989,6 +4001,7 @@ dependencies = [ "tokio", "tokio-util", "tower", + "tracing", "uuid", "vllm-engine-core-client", "vllm-server", @@ -4901,6 +4914,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rapidhash" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" +dependencies = [ + "rustversion", +] + [[package]] name = "rav1e" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 16ef9c71..ff559728 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ log = "0.4" logforth = { version = "0.29", features = ["starter-log", "layout-text", "diagnostic-task-local"] } libc = "0.2" memmap2 = "0.9" +metrics = "0.24.6" mio = "=1.1.1" mockall = "0.13.1" nvtx = "1.3.0" diff --git a/docs/index.md b/docs/index.md index f02df5ea..c74d2990 100644 --- a/docs/index.md +++ b/docs/index.md @@ -119,6 +119,7 @@ Organized by domain (model line / subsystem / playbook / lesson) instead of by l | `subsystems/frontend/simulated-inference-engine.md` | CPU-only simulated model crate for vLLM/OpenAI frontend and `vllm bench serve` validation without CUDA, real model weights, or real-model performance claims. | | `subsystems/frontend/cpu-profiling-baseline.md` | Frontend CPU profiling baseline using `openinfer-sim` with fixed TTFT=5ms/TPOT=12ms: 200 req / concurrency=16 shows ~150ms TTFT overhead (no dominant hotspot), heap allocation ~10%, stream polling ~7.5%, IPC ~1%; reproducible benchmark command and perf evidence documented. | | `subsystems/frontend/startup-time.md` | Qwen3-4B warm startup-to-ready 3.25s → ~1.45s: frontend tokenizer load runs concurrently with the engine load (HTTP still binds only after the engine registers), and the source safetensors mmap is kept alive to dodge ~0.4s of munmap stalling the next cudaMalloc. | +| `subsystems/frontend/telemetry-system.md` | Lightweight telemetry: Grafana/Prometheus `/metrics`, optional OTLP payload sink, opt-in request logs/traces, sparse `tracing` spans, and a small `metrics` facade at the engine boundary. | ## subsystems / correctness diff --git a/docs/subsystems/frontend/telemetry-system.md b/docs/subsystems/frontend/telemetry-system.md new file mode 100644 index 00000000..6006bf51 --- /dev/null +++ b/docs/subsystems/frontend/telemetry-system.md @@ -0,0 +1,178 @@ +# Frontend Telemetry System + +> **TL;DR:** Telemetry now has Grafana/Prometheus `/metrics`, configurable metric prefix/buckets, sparse request lifecycle `tracing` spans/events, a lightweight `metrics` facade at the shared engine boundary, optional OpenTelemetry OTLP payload sink, and opt-in structured trace buffers/log lines. +> +> **Last touched:** 2026-06 + +## Preparation + +- **Read**: + - `docs/index.md` - routes telemetry to frontend/runtime/scheduler; no existing telemetry subsystem doc. + - `docs/subsystems/frontend/cpu-profiling-baseline.md` - frontend latency needs phase timestamps, especially TTFT decomposition; perf alone cannot explain async wall time. + - `docs/subsystems/scheduler/output-dispatch.md` - token dispatch already carries request-tagged events and prefill stats through the bridge; this is the narrowest place to count request lifecycle metrics. + - `docs/subsystems/runtime/runtime.md` - shared runtime should stay small; per-model execution details should not leak into the frontend. + - `docs/roadmap/direction.md` - long-term tracing is desired, but should grow from shared infrastructure, not a universal model abstraction. + - `docs/subsystems/kernels/openinfer-kernels-boundary.md` - request tracing should eventually line up with simulator/kernel ledger identities; near-term bridge payloads are the natural integration point. + - `openinfer-vllm-frontend/src/lib.rs` - router extension is already the only clean place to add frontend-owned routes. + - `openinfer-vllm-frontend/src/bridge.rs` - bridge sees `Scheduled`, `Token`, terminal events, request ids, prompt/completion counts, cache stats, and first-token emit timestamps. + - `openinfer-vllm-frontend/src/bridge/tests.rs` - demux tests can validate lifecycle accounting without GPU or sockets. + - `openinfer-engine/src/engine.rs` - shared `TokenEvent` timestamps and counts are already sufficient for request-level metrics; no engine API expansion is needed for phase 1. + - `scripts/bench_http_serving.py` and `tests/test_bench_http_serving.py` - benchmark tooling already parses `openinfer_http_trace` JSON lines. + - `openinfer-sim/src/lib.rs` and `openinfer-sim/src/main.rs` - simulator gives a CPU-only verification path through the real frontend. +- **Relevant history**: + - `docs/subsystems/frontend/cpu-profiling-baseline.md` - explicitly calls for bridge timestamps before attributing the ~145 ms TTFT gap. + - `docs/subsystems/scheduler/output-dispatch.md` - bridge demux is already the performance-sensitive shared choke point, so telemetry must be O(events) and low allocation. +- **Plan**: + 1. Add a small `openinfer-vllm-frontend::telemetry` module with atomics and manual Prometheus text rendering; avoid global registries on the hot path. + 2. Surface OpenInfer metrics through the existing vLLM `/metrics` route by appending a middleware response, avoiding route conflicts and preserving vLLM metrics. + 3. Thread one telemetry dispatcher into `LocalEngineBridge` and update only request lifecycle points: start, abort, scheduled metadata, first token, terminal finish/error/reject. + 4. Emit structured request logs only when `OPENINFER_TELEMETRY_LOG=1`; keep benchmark-compatible `openinfer_http_trace {json}` only when `OPENINFER_HTTP_TRACE=1`. + 5. Add an opt-in trace buffer enabled by `OPENINFER_TRACE_BUFFER=N`, exposed at `/openinfer/traces`. + 6. Extend focused bridge tests to cover metrics increments and trace-compatible state shape; add frontend route tests for `/metrics` and traces. + 7. Run the smallest useful checks: `cargo fmt --check`, `cargo test --release -p openinfer-vllm-frontend --lib`, and if build time permits `cargo test --release -p openinfer-sim --test frontend_e2e simulated_engine_serves_openai_completions_over_http -- --nocapture`. +- **Risks / open questions**: + - The bridge can measure queue wait and first-token/request duration, but not true model prefill/decode split without per-model scheduler timing. Ponytail choice: expose honest available fields now; add scheduler hooks later only when a benchmark needs them. + - Logs and traces can become high-volume at serving load, so they are opt-in. `/metrics` stays always available and uses only a few atomic updates per request. + - Main lacks the worker-fatal `/health` code from the other worktree, so this task does not depend on it. + +## Execution Log + +### Step 1: Worktree and route point +- Switched the current checkout to branch `feat/telemetry-system-review` from `main`. +- Added `openinfer-vllm-frontend/src/telemetry.rs` with an in-process telemetry handle and Prometheus text renderer. +- Wired `openinfer-vllm-frontend/src/lib.rs` so every served frontend appends OpenInfer metrics to the existing vLLM `/metrics` response and the local bridge receives the same telemetry handle. + +### Step 2: Bridge lifecycle telemetry +- Added request lifecycle state to `openinfer-vllm-frontend/src/bridge.rs`. +- Counts are recorded once per request at terminal/abort, not globally per token. +- Structured request log construction is gated by `OPENINFER_TELEMETRY_LOG`; benchmark-compatible `openinfer_http_trace` logs remain gated by `OPENINFER_HTTP_TRACE`; default serving only pays the in-memory metrics cost. +- Added bridge tests for finished and aborted request accounting. + +### Step 3: Logs and trace buffer +- Added `OPENINFER_TELEMETRY_LOG=1` for one structured `openinfer_request_log {json}` line per terminated request. +- Kept `OPENINFER_HTTP_TRACE=1` as a compatibility path for `scripts/bench_http_serving.py`. +- Added `OPENINFER_TRACE_BUFFER=N` to retain the last N request traces in memory and expose them at `GET /openinfer/traces`. +- Trace/log JSON is built only when one of those three trace surfaces is enabled. + +### Step 4: Integration fix and verification +- First sim e2e run failed with `Overlapping method route. Handler for GET /metrics already exists`; vLLM already owns that route. +- Replaced the attempted route merge with middleware that runs only on `/metrics`, reads the existing vLLM metrics body, and appends OpenInfer metrics. +- Extended `openinfer-sim/tests/frontend_e2e.rs` to fetch real HTTP `/metrics` after non-streaming and streaming completions and assert OpenInfer counters; it also verifies `/openinfer/traces` is present and disabled by default. +- Checks run: + - `cargo fmt --check` + - `cargo test --release -p openinfer-vllm-frontend --lib` + - `cargo test --release -p openinfer-sim --test frontend_e2e simulated_engine_serves_openai_completions_over_http -- --nocapture` + - `git diff --check` + +### Step 5: Minimal extension point +- Exposed `openinfer_vllm_frontend::telemetry` plus root re-exports for `Telemetry`, `TelemetryOptions`, `RequestMetrics`, and `RequestOutcome`. +- Added `serve_with_telemetry` and `serve_model_with_lora_routes_and_telemetry` so embedding users can pass a configured telemetry handle instead of forking router setup. +- `Telemetry` owns the built-in metrics/log/trace-buffer configuration; enterprise extensions should subscribe to standard `tracing` spans/events rather than implement an OpenInfer-specific callback trait. + +### Step 6: Built-in OpenTelemetry and Grafana surfaces +- Treated `/metrics` as the built-in Grafana/Prometheus interface; no extra Grafana-specific server state is needed. +- Added `OpenTelemetryOptions` and `OpenTelemetrySink`, re-exported from `openinfer_vllm_frontend`. +- `OpenTelemetrySink` converts request traces into OTLP JSON and pushes them into a caller-provided bounded channel. +- No HTTP client/exporter is built into the frontend. Embedders can wire the OTLP payload to their preferred exporter/client through `TelemetryOptions.opentelemetry_sink`, or use a normal `tracing_subscriber` OpenTelemetry layer. +- The OTLP sink reserves queue capacity before building the payload, so a saturated caller queue does not keep paying JSON construction cost. +- Re-ran `cargo fmt --check`, `cargo test --release -p openinfer-vllm-frontend --lib`, `cargo test --release -p openinfer-sim --test frontend_e2e simulated_engine_serves_openai_completions_over_http -- --nocapture`, and `git diff --check`. + +### Step 7: Drop frontend reqwest dependency +- Removed `reqwest` from `openinfer-vllm-frontend`. +- Kept the OpenTelemetry interface as an OTLP payload sink rather than a transport opinion. +- Updated tests to cover the caller-queue sink instead of HTTP endpoint normalization. +- Re-ran `cargo fmt --check`, `cargo test --release -p openinfer-vllm-frontend --lib`, `cargo test --release -p openinfer-sim --test frontend_e2e simulated_engine_serves_openai_completions_over_http -- --nocapture`, and `git diff --check`. + +### Step 8: Reviewability split +- Split `openinfer-vllm-frontend/src/telemetry.rs` into focused submodules: + - `telemetry/otlp.rs` owns the OTLP payload sink. + - `telemetry/trace.rs` owns request trace observation, metrics conversion, and trace JSON construction. +- Simplified `bridge.rs`: `TokenEvent` handling now calls `RequestTrace::observe_event(&event)` once before the output match, so terminal metrics are not repeated across `Finished` / `Error` / `Rejected` branches. + +### Step 9: Customizable metric surface +- Kept Prometheus/OTLP JSON field strings as protocol surface: changing those per request would make exporters and dashboards harder to reason about. +- Added `TelemetryOptions.metric_prefix` so embedders can namespace metrics without forking the frontend. +- Added `TelemetryOptions.latency_buckets_ms`; buckets are filtered, sorted, and deduplicated at construction time, so request handling still only does atomic histogram updates. +- Existing `TelemetryOptions.opentelemetry_sink` remains only the built-in OTLP helper; custom export/log/trace integrations should attach standard `tracing_subscriber` layers. + +### Step 10: Intermediate dispatcher split +- Refactored request state so it carries only a lightweight `RequestTrace`, analogous to a span's local fields. +- `RequestTrace::finish` now returns a typed request record; it no longer receives or calls the telemetry dispatcher. +- `dispatch_burst` records finished request records through the single outer `Telemetry` dispatcher, keeping collection policy outside per-request state. +- An OpenInfer-specific layer trait was considered here, then removed in Step 11 in favor of the real `tracing` crate. + +### Step 11: Direct tracing integration +- Added `tracing` to `openinfer-vllm-frontend`. +- Removed the OpenInfer-specific `TelemetryLayer`/`TelemetryEvent` extension trait. +- `RequestTrace` now creates an `openinfer.request` span with request id, token counts, outcome, queue wait, TTFT, and duration fields. +- Request lifecycle emits standard `tracing` events from the shared engine boundary: `openinfer_request_scheduled` and `openinfer_request_finished`. +- The opt-in benchmark/log trace lines now use `tracing::info!`; the existing log bridge can still route them into the configured logger. + +### Step 12: Tracing macro and level policy +- Moved repeated span/event field lists into local macros in `openinfer-engine::TokenSink`, the shared event emitter used by all schedulers. +- Level policy: + - `DEBUG` span/event: successful request lifecycle, useful while debugging. + - `WARN` terminal event: rejected requests. + - `ERROR` terminal event: execution failures. +- High-volume success counts and token totals belong to `metrics`; the shared serving path intentionally does not emit per-token tracing events. + +### Step 13: Keep reduce_request pure +- Removed the `include_trace` parameter from `reduce_request`; it now only folds token events into wire output plus a terminal outcome. +- `Telemetry::finish_request` owns the decision to build optional JSON traces, keeping telemetry policy out of the bridge reducer signature. + +### Step 14: Lightweight non-frontend tracing +- Added `tracing` to `openinfer-engine`. +- `TokenSink` now owns the request span and emits lifecycle events when schedulers send existing `TokenEvent`s. +- This touches the shared event boundary once instead of adding per-model scheduler/executor hooks. +- Frontend `RequestTrace` no longer emits tracing spans/events; it only computes Prometheus metrics and optional JSON traces, avoiding duplicate request lifecycle events on HTTP serving. + +### Step 15: metrics facade +- Added the `metrics` crate as a workspace dependency and to `openinfer-engine`. +- `TokenSink` emits low-cardinality facade metrics at the shared event boundary: + - `openinfer_engine_requests_submitted_total` + - `openinfer_engine_requests_scheduled_total` + - `openinfer_engine_requests_finished_total{outcome=...}` + - `openinfer_engine_prompt_tokens_total` + - `openinfer_engine_cached_prompt_tokens_total` + - `openinfer_engine_completion_tokens_total` + - `openinfer_engine_queue_wait_ms` +- Per-token metrics and tracing events are intentionally omitted; request lifecycle traces stay sparse enough for production debugging. +- No `metrics` exporter was added. Embedders can install their own recorder; the frontend `/metrics` route keeps the existing atomic renderer for now. +- Cargo initially refreshed unrelated `prost`/`itertools` resolution while adding `metrics`; the lockfile was narrowed back so only `metrics`, `rapidhash`, and crate dependency edges remain in this patch. +- Re-ran `cargo test --release -p openinfer-engine --lib`, `cargo test --release -p openinfer-vllm-frontend --lib`, `cargo test --release -p openinfer-sim --test frontend_e2e simulated_engine_serves_openai_completions_over_http -- --nocapture`, `cargo fmt --check`, and `git diff --check`. + +### Step 16: sparse tracing levels +- Lowered successful request lifecycle spans/events to `DEBUG`; default `INFO` serving logs should not get one line per successful request. +- Kept rejected requests at `WARN` and execution failures at `ERROR`. +- Removed shared-path per-token tracing events entirely. Token volume is accounted through metrics counters; deep token debugging should use explicit benchmark/debug tooling, not production tracing. +- Re-ran `cargo test --release -p openinfer-engine --lib`, `cargo test --release -p openinfer-vllm-frontend --lib`, `cargo fmt --check`, and `git diff --check`. + +## Debrief + +- **Outcome**: + - Added lightweight frontend telemetry on the `feat/telemetry-system-review` branch from `main`. + - `/metrics` now includes OpenInfer frontend metrics without replacing vLLM's own metrics. + - The bridge records active/started/finished request counts, outcome counters, prompt/cache/completion token counters, and queue/TTFT/duration histograms. + - Optional `OPENINFER_TELEMETRY_LOG=1` emits one structured `openinfer_request_log` JSON line per terminated request. + - Optional `OPENINFER_HTTP_TRACE=1` emits one `openinfer_http_trace` JSON line per terminated request, compatible with the existing benchmark parser. + - Optional `OPENINFER_TRACE_BUFFER=N` keeps a small in-memory trace ring and exposes it at `/openinfer/traces`. + - Optional `OpenTelemetrySink` builds OTLP trace payloads and hands them to caller-owned export code. + - Custom integrations can set `TelemetryOptions.metric_prefix`, `TelemetryOptions.latency_buckets_ms`, and attach normal `tracing_subscriber` layers in their binary. + - Request tracing uses local macros to keep span/event field names stable and keep the level policy centralized. + - Successful lifecycle tracing is `DEBUG` only; `WARN/ERROR` are reserved for rejected and failed requests. + - `reduce_request` stays focused on demux/output reduction; telemetry recording happens in the outer dispatch loop. + - Shared scheduler/model tracing is centralized at `TokenSink`; no model executor hot path hooks were added. + - `metrics` facade support is available at the engine boundary without forcing a global recorder or exporter. +- **Pitfalls encountered**: + - vLLM already registers `/metrics`; adding another route panicked during sim e2e. Middleware append avoids the conflict. + - Always-on logs/traces would add avoidable serialization, locking, and log volume on hot serving paths, so only counters/histograms are on by default. + - OpenTelemetry SDK initialization would fight the existing logforth setup; OTLP payload export avoids global subscriber ownership. + - Frontend should not pick an HTTP client just to offer telemetry integration. The OTLP payload sink and standard tracing subscribers let downstream users pick `reqwest`, `hyper`, tonic, an agent sidecar, or an internal client. +- **Lessons learned**: + - The frontend bridge already has enough request lifecycle data for useful serving telemetry without widening the shared engine contract. + - Integration tests should hit real HTTP `/metrics`; route-level unit tests alone missed the vLLM route collision. + - Standard `tracing` is the right extension point; an OpenInfer-specific layer trait adds API surface without buying anything the ecosystem does not already provide. + - Metric names and buckets need a small DI surface because downstream dashboards often impose naming and histogram standards. + - Per-token tracing in the shared serving path is a production log-volume trap; keep token detail out of default tracing and rely on counters plus explicit debug tooling. + - True prefill/decode phase attribution still needs per-model scheduler timing. That belongs in a later model-side hook, not in this frontend patch. + - No known required follow-ups for the frontend baseline; scheduler phase metrics can be added later when a concrete benchmark needs them. diff --git a/openinfer-engine/Cargo.toml b/openinfer-engine/Cargo.toml index f1d6664f..5c37c7dc 100644 --- a/openinfer-engine/Cargo.toml +++ b/openinfer-engine/Cargo.toml @@ -5,7 +5,9 @@ version = "0.1.0" edition = "2024" [dependencies] +metrics = { workspace = true } tokio = { workspace = true, features = ["sync"] } +tracing = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/openinfer-engine/src/engine.rs b/openinfer-engine/src/engine.rs index 5867ad53..fdd6ca1b 100644 --- a/openinfer-engine/src/engine.rs +++ b/openinfer-engine/src/engine.rs @@ -182,6 +182,65 @@ pub type RequestTag = Arc; pub type TokenStreamSender = mpsc::UnboundedSender<(RequestTag, TokenEvent)>; pub type TokenStreamReceiver = mpsc::UnboundedReceiver<(RequestTag, TokenEvent)>; +// Request tracing level policy: +// - DEBUG span/event: successful request lifecycle, useful while debugging. +// - WARN terminal event: rejected requests. +// - ERROR terminal event: execution failures. +// High-volume success counts and token totals belong to `metrics`; the shared +// serving path intentionally does not emit per-token tracing events. +macro_rules! request_span { + ($request_id:expr) => { + tracing::debug_span!( + "openinfer.request", + request_id = $request_id, + queued_at_unix_s = tracing::field::Empty, + scheduled_at_unix_s = tracing::field::Empty, + terminal_at_unix_s = tracing::field::Empty, + finish_reason = tracing::field::Empty, + prompt_tokens = tracing::field::Empty, + cached_prompt_tokens = tracing::field::Empty, + completion_tokens = tracing::field::Empty, + ) + }; +} + +macro_rules! request_scheduled_event { + ($span:expr, $queued_at_unix_s:expr, $scheduled_at_unix_s:expr, $prompt_tokens:expr, $cached_tokens:expr) => { + tracing::debug!( + parent: $span, + queued_at_unix_s = $queued_at_unix_s, + scheduled_at_unix_s = $scheduled_at_unix_s, + prompt_tokens = $prompt_tokens as u64, + cached_prompt_tokens = $cached_tokens as u64, + "openinfer_request_scheduled" + ) + }; +} + +macro_rules! request_terminal_event { + ($level:ident, $span:expr, $terminal_at_unix_s:expr, $finish_reason:expr, $prompt_tokens:expr, $completion_tokens:expr) => { + tracing::$level!( + parent: $span, + terminal_at_unix_s = $terminal_at_unix_s, + finish_reason = $finish_reason, + prompt_tokens = $prompt_tokens as u64, + completion_tokens = $completion_tokens as u64, + "openinfer_request_finished" + ) + }; + ($level:ident, $span:expr, $terminal_at_unix_s:expr, $finish_reason:expr, $prompt_tokens:expr, $completion_tokens:expr, $message:expr) => { + tracing::$level!( + parent: $span, + terminal_at_unix_s = $terminal_at_unix_s, + finish_reason = $finish_reason, + prompt_tokens = $prompt_tokens as u64, + completion_tokens = $completion_tokens as u64, + message = $message.as_str(), + "openinfer_request_finished" + ) + }; +} + /// Per-request handle the scheduler holds to emit [`TokenEvent`]s. /// /// Drop-in for the former `UnboundedSender`: it keeps the same @@ -203,11 +262,18 @@ pub struct TokenSink { tag: RequestTag, tx: TokenStreamSender, cancelled: Arc, + span: tracing::Span, } impl TokenSink { pub fn new(tag: RequestTag, tx: TokenStreamSender, cancelled: Arc) -> Self { - Self { tag, tx, cancelled } + let span = request_span!(tag.as_ref()); + Self { + tag, + tx, + cancelled, + span, + } } /// Emit one event for this request. Returns `Err` (handing the event back) @@ -219,6 +285,7 @@ impl TokenSink { if self.cancelled.load(Ordering::Acquire) { return Err(mpsc::error::SendError(event)); } + self.observe_event(&event); self.tx.send((self.tag.clone(), event)).map_err(|err| { let (_, event) = err.0; mpsc::error::SendError(event) @@ -235,6 +302,155 @@ impl TokenSink { &self.tag } + fn observe_event(&self, event: &TokenEvent) { + match event { + TokenEvent::Scheduled { + queued_at_unix_s, + scheduled_at_unix_s, + prompt_tokens, + cached_tokens, + } => { + metrics::counter!("openinfer_engine_requests_scheduled_total").increment(1); + metrics::counter!("openinfer_engine_cached_prompt_tokens_total") + .increment(*cached_tokens as u64); + metrics::histogram!("openinfer_engine_queue_wait_ms") + .record((scheduled_at_unix_s - queued_at_unix_s).max(0.0) * 1_000.0); + self.span.record("queued_at_unix_s", *queued_at_unix_s); + self.span + .record("scheduled_at_unix_s", *scheduled_at_unix_s); + self.span.record("prompt_tokens", *prompt_tokens as u64); + self.span + .record("cached_prompt_tokens", *cached_tokens as u64); + request_scheduled_event!( + &self.span, + *queued_at_unix_s, + *scheduled_at_unix_s, + *prompt_tokens, + *cached_tokens + ); + } + TokenEvent::Token { .. } | TokenEvent::PromptTokens { .. } => {} + TokenEvent::Finished { + finish_reason, + prompt_tokens, + completion_tokens, + } => { + let terminal_at_unix_s = unix_now_s(); + let finish_reason = finish_reason_label(*finish_reason); + metrics::counter!( + "openinfer_engine_requests_finished_total", + "outcome" => finish_reason + ) + .increment(1); + metrics::counter!("openinfer_engine_prompt_tokens_total") + .increment(*prompt_tokens as u64); + metrics::counter!("openinfer_engine_completion_tokens_total") + .increment(*completion_tokens as u64); + self.record_terminal_fields( + terminal_at_unix_s, + finish_reason, + *prompt_tokens, + *completion_tokens, + ); + if finish_reason == "error" { + request_terminal_event!( + error, + &self.span, + terminal_at_unix_s, + finish_reason, + *prompt_tokens, + *completion_tokens + ); + } else { + request_terminal_event!( + debug, + &self.span, + terminal_at_unix_s, + finish_reason, + *prompt_tokens, + *completion_tokens + ); + } + } + TokenEvent::Error { + message, + prompt_tokens, + completion_tokens, + } => { + let terminal_at_unix_s = unix_now_s(); + metrics::counter!( + "openinfer_engine_requests_finished_total", + "outcome" => "error" + ) + .increment(1); + metrics::counter!("openinfer_engine_prompt_tokens_total") + .increment(*prompt_tokens as u64); + metrics::counter!("openinfer_engine_completion_tokens_total") + .increment(*completion_tokens as u64); + self.record_terminal_fields( + terminal_at_unix_s, + "error", + *prompt_tokens, + *completion_tokens, + ); + request_terminal_event!( + error, + &self.span, + terminal_at_unix_s, + "error", + *prompt_tokens, + *completion_tokens, + message + ); + } + TokenEvent::Rejected { + message, + prompt_tokens, + completion_tokens, + } => { + let terminal_at_unix_s = unix_now_s(); + metrics::counter!( + "openinfer_engine_requests_finished_total", + "outcome" => "rejected" + ) + .increment(1); + metrics::counter!("openinfer_engine_prompt_tokens_total") + .increment(*prompt_tokens as u64); + metrics::counter!("openinfer_engine_completion_tokens_total") + .increment(*completion_tokens as u64); + self.record_terminal_fields( + terminal_at_unix_s, + "rejected", + *prompt_tokens, + *completion_tokens, + ); + request_terminal_event!( + warn, + &self.span, + terminal_at_unix_s, + "rejected", + *prompt_tokens, + *completion_tokens, + message + ); + } + } + } + + fn record_terminal_fields( + &self, + terminal_at_unix_s: f64, + finish_reason: &'static str, + prompt_tokens: usize, + completion_tokens: usize, + ) { + self.span.record("terminal_at_unix_s", terminal_at_unix_s); + self.span.record("finish_reason", finish_reason); + self.span.record("prompt_tokens", prompt_tokens as u64); + self.span + .record("completion_tokens", completion_tokens as u64); + } + /// A sink backed by its own private channel, for direct drivers /// (benchmarks, integration tests, the simulator) that consume one /// request's events without the shared frontend demux. The returned @@ -246,6 +462,14 @@ impl TokenSink { } } +fn finish_reason_label(reason: FinishReason) -> &'static str { + match reason { + FinishReason::Length => "length", + FinishReason::Stop => "stop", + FinishReason::Error => "error", + } +} + /// Seconds since `UNIX_EPOCH` as `f64` — the clock base for `TokenEvent` /// timestamps. pub fn unix_now_s() -> f64 { @@ -371,7 +595,7 @@ impl EngineHandle { &self, req: GenerateRequest, ) -> std::result::Result<(), mpsc::error::SendError> { - match self.inner.submit_tx.as_ref() { + let result = match self.inner.submit_tx.as_ref() { Some(submit_tx) => submit_tx.send(req), None => match self.inner.command_tx.as_ref() { Some(command_tx) => command_tx @@ -382,7 +606,11 @@ impl EngineHandle { }), None => Err(mpsc::error::SendError(req)), }, + }; + if result.is_ok() { + metrics::counter!("openinfer_engine_requests_submitted_total").increment(1); } + result } pub fn supports_lora_control(&self) -> bool { diff --git a/openinfer-sim/tests/frontend_e2e.rs b/openinfer-sim/tests/frontend_e2e.rs index c7b95933..d2e7b36c 100644 --- a/openinfer-sim/tests/frontend_e2e.rs +++ b/openinfer-sim/tests/frontend_e2e.rs @@ -198,6 +198,8 @@ async fn simulated_engine_serves_openai_completions_over_http() -> Result<()> { assert_models_endpoint(&client, &server.base_url).await?; assert_non_streaming_completion_has_output(&client, &server.base_url).await?; assert_streaming_completion_emits_done(&client, &server.base_url).await?; + assert_metrics_endpoint(&client, &server.base_url).await?; + assert_traces_endpoint_disabled_by_default(&client, &server.base_url).await?; server.shutdown().await } @@ -317,6 +319,47 @@ async fn assert_streaming_completion_emits_done(client: &Client, base_url: &str) Ok(()) } +async fn assert_metrics_endpoint(client: &Client, base_url: &str) -> Result<()> { + let metrics = client + .get(format!("{base_url}/metrics")) + .send() + .await? + .error_for_status()? + .text() + .await?; + for expected in [ + "openinfer_frontend_active_requests 0", + "openinfer_frontend_requests_started_total 2", + "openinfer_frontend_requests_finished_total{outcome=\"length\"} 2", + "openinfer_frontend_completion_tokens_total 6", + ] { + if !metrics.contains(expected) { + bail!("/metrics missing {expected}: {metrics}"); + } + } + Ok(()) +} + +async fn assert_traces_endpoint_disabled_by_default(client: &Client, base_url: &str) -> Result<()> { + let traces: Value = client + .get(format!("{base_url}/openinfer/traces")) + .send() + .await? + .error_for_status()? + .json() + .await?; + if traces["enabled"] != false || traces["capacity"] != 0 { + bail!("trace endpoint should be disabled by default: {traces}"); + } + let trace_list = traces["traces"] + .as_array() + .ok_or_else(|| anyhow!("trace endpoint has no traces array: {traces}"))?; + if !trace_list.is_empty() { + bail!("disabled trace endpoint should have no traces: {traces}"); + } + Ok(()) +} + async fn post_completion(client: &Client, base_url: &str, stream: bool) -> Result { let response = client .post(format!("{base_url}/v1/completions")) diff --git a/openinfer-vllm-frontend/Cargo.toml b/openinfer-vllm-frontend/Cargo.toml index fd68a455..8123c44b 100644 --- a/openinfer-vllm-frontend/Cargo.toml +++ b/openinfer-vllm-frontend/Cargo.toml @@ -16,6 +16,7 @@ serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } tower = { workspace = true, features = ["util"] } +tracing = { workspace = true } uuid = { workspace = true, features = ["v4"] } vllm-engine-core-client = { workspace = true } vllm-server = { workspace = true } diff --git a/openinfer-vllm-frontend/src/bridge.rs b/openinfer-vllm-frontend/src/bridge.rs index 337de03a..111fba7e 100644 --- a/openinfer-vllm-frontend/src/bridge.rs +++ b/openinfer-vllm-frontend/src/bridge.rs @@ -27,6 +27,7 @@ use openinfer_engine::engine::{ EngineHandle, GenerateRequest, RequestTag, TokenEvent, TokenSink, TokenStreamReceiver, }; +use crate::telemetry::{RequestOutcome, RequestTrace, Telemetry}; use crate::wire::{ convert_finish_reason, convert_sampling, lora_adapter_from_sampling_params, requested_logprobs, to_wire_position_logprobs, @@ -39,6 +40,7 @@ pub(crate) struct LocalEngineBridge { pub(crate) output_address: String, pub(crate) handle: EngineHandle, pub(crate) max_model_len: u32, + pub(crate) telemetry: Telemetry, } impl LocalEngineBridge { @@ -106,7 +108,7 @@ impl LocalEngineBridge { () = shutdown.cancelled() => break, Some(first) = event_rx.recv() => { if let Err(error) = - dispatch_burst(first, &mut event_rx, &mut streams, &output_tx) + dispatch_burst(first, &mut event_rx, &mut streams, &output_tx, &self.telemetry) { warn!("local engine bridge output failed: {error:#}"); } @@ -169,6 +171,11 @@ impl LocalEngineBridge { // finds no stream entry. if let Some(state) = streams.remove(request_id.as_str()) { state.cancelled.store(true, Ordering::Release); + self.telemetry.finish_request( + &state.trace, + RequestOutcome::Aborted, + now_secs_f64(), + ); } } Ok(()) @@ -201,6 +208,7 @@ impl LocalEngineBridge { } = request; let Some(prompt_tokens) = prompt_token_ids else { warn!("request {request_id} dropped: missing prompt_token_ids"); + self.telemetry.request_rejected(0); send_terminal_output( output_tx, request_id, @@ -213,6 +221,7 @@ impl LocalEngineBridge { }; let Some(sampling_params) = sampling_params else { warn!("request {request_id} dropped: missing sampling_params"); + self.telemetry.request_rejected(prompt_tokens.len()); send_terminal_output( output_tx, request_id, @@ -227,6 +236,11 @@ impl LocalEngineBridge { let tag: RequestTag = Arc::from(request_id.as_str()); let cancelled = Arc::new(AtomicBool::new(false)); let token_tx = TokenSink::new(tag.clone(), event_tx.clone(), Arc::clone(&cancelled)); + let trace = RequestTrace::new( + request_id.clone(), + request.arrival_time, + prompt_tokens.len(), + ); self.handle .submit(GenerateRequest { request_id: Some(request_id), @@ -241,7 +255,8 @@ impl LocalEngineBridge { }) .context("failed to submit request to scheduler")?; - streams.insert(tag, RequestStreamState::new(cancelled)); + self.telemetry.request_started(); + streams.insert(tag, RequestStreamState::new(cancelled, trace)); Ok(()) } } @@ -255,14 +270,16 @@ struct RequestStreamState { first_token_events: Option>, first_token_prefill_stats: Option, cancelled: Arc, + trace: RequestTrace, } impl RequestStreamState { - fn new(cancelled: Arc) -> Self { + fn new(cancelled: Arc, trace: RequestTrace) -> Self { Self { first_token_events: None, first_token_prefill_stats: None, cancelled, + trace, } } } @@ -277,6 +294,7 @@ fn dispatch_burst( event_rx: &mut TokenStreamReceiver, streams: &mut HashMap, output_tx: &mpsc::UnboundedSender, + telemetry: &Telemetry, ) -> Result<()> { // Bucket the burst by request, keeping first-seen order so outputs are // deterministic and each request's events stay in arrival order. @@ -304,11 +322,12 @@ fn dispatch_burst( let Some(state) = streams.get_mut(&tag) else { continue; }; - let (output, terminated) = reduce_request(&tag, state, events); + let (output, terminal_outcome) = reduce_request(&tag, state, events); if let Some(output) = output { outputs.push(output); } - if terminated { + if let Some(outcome) = terminal_outcome { + telemetry.finish_request(&state.trace, outcome, now_secs_f64()); streams.remove(&tag); finished_requests.insert(tag.to_string()); } @@ -333,21 +352,21 @@ fn dispatch_burst( /// Tokens coalesce, and a trailing terminal rides the same output carrying its /// finish reason; `first_token_events`/`prefill_stats` flush onto whichever /// output goes first. A lone `Scheduled` (no token, no terminal) yields no -/// output — its metadata waits in `state` for the first real output. Returns -/// `(output, terminated)`. +/// output — its metadata waits in `state` for the first real output. fn reduce_request( request_id: &str, state: &mut RequestStreamState, events: Vec, -) -> (Option, bool) { +) -> (Option, Option) { let mut token_ids: Vec = Vec::new(); let mut positions: Vec = Vec::new(); let mut has_logprobs = false; let mut finish_reason: Option = None; let mut stop_reason: Option = None; - let mut terminated = false; + let mut terminal_outcome: Option = None; for event in events { + terminal_outcome = state.trace.observe_event(&event).or(terminal_outcome); match event { TokenEvent::Scheduled { queued_at_unix_s, @@ -394,13 +413,11 @@ fn reduce_request( finish_reason: fr, .. } => { finish_reason = Some(convert_finish_reason(fr)); - terminated = true; } TokenEvent::Error { message, .. } => { warn!("request {request_id} failed: {message}"); finish_reason = Some(EngineCoreFinishReason::Error); stop_reason = Some(StopReason::Text(message)); - terminated = true; } TokenEvent::Rejected { message, .. } => { // Rejected means the request could not be admitted, not that it @@ -408,13 +425,13 @@ fn reduce_request( warn!("request {request_id} rejected: {message}"); finish_reason = Some(EngineCoreFinishReason::Error); stop_reason = Some(StopReason::Text(message)); - terminated = true; } } } + let terminated = terminal_outcome.is_some(); if token_ids.is_empty() && !terminated { - return (None, false); + return (None, None); } let logprobs = has_logprobs.then_some(MaybeWireLogprobs::Direct(Logprobs { positions })); @@ -427,7 +444,7 @@ fn reduce_request( state.first_token_events.take(), state.first_token_prefill_stats.take(), ); - (Some(output), terminated) + (Some(output), terminal_outcome) } async fn output_loop( diff --git a/openinfer-vllm-frontend/src/bridge/tests.rs b/openinfer-vllm-frontend/src/bridge/tests.rs index ccfcbed6..52b683e8 100644 --- a/openinfer-vllm-frontend/src/bridge/tests.rs +++ b/openinfer-vllm-frontend/src/bridge/tests.rs @@ -21,6 +21,7 @@ struct Demux { streams: HashMap, output_tx: mpsc::UnboundedSender, output_rx: mpsc::UnboundedReceiver, + telemetry: Telemetry, } impl Demux { @@ -33,6 +34,7 @@ impl Demux { streams: HashMap::new(), output_tx, output_rx, + telemetry: Telemetry::new(), } } @@ -40,9 +42,13 @@ impl Demux { fn add(&mut self, id: &str) -> Arc { let tag: RequestTag = Arc::from(id); let cancelled = Arc::new(AtomicBool::new(false)); + self.telemetry.request_started(); self.streams.insert( Arc::clone(&tag), - RequestStreamState::new(Arc::clone(&cancelled)), + RequestStreamState::new( + Arc::clone(&cancelled), + RequestTrace::new(id.to_string(), 1.0, 0), + ), ); cancelled } @@ -62,6 +68,7 @@ impl Demux { &mut self.event_rx, &mut self.streams, &self.output_tx, + &self.telemetry, ) .expect("dispatch burst"); true @@ -147,6 +154,49 @@ fn token_and_finish_in_one_burst_coalesce() { assert!(d.next_output().is_none()); } +#[test] +fn telemetry_records_finished_request_once() { + let mut d = Demux::new(); + let now = now_secs_f64(); + d.add("req-metrics"); + d.emit( + "req-metrics", + TokenEvent::Scheduled { + queued_at_unix_s: now - 0.010, + scheduled_at_unix_s: now, + prompt_tokens: 8, + cached_tokens: 3, + }, + ); + d.emit( + "req-metrics", + TokenEvent::Token { + id: 7, + logprob: None, + }, + ); + d.emit( + "req-metrics", + TokenEvent::Finished { + finish_reason: FinishReason::Length, + prompt_tokens: 8, + completion_tokens: 1, + }, + ); + assert!(d.drain()); + + let text = d.telemetry.render(); + assert!(text.contains("openinfer_frontend_active_requests 0")); + assert!(text.contains("openinfer_frontend_requests_started_total 1")); + assert!(text.contains("openinfer_frontend_requests_finished_total{outcome=\"length\"} 1")); + assert!(text.contains("openinfer_frontend_prompt_tokens_total 8")); + assert!(text.contains("openinfer_frontend_cached_prompt_tokens_total 3")); + assert!(text.contains("openinfer_frontend_completion_tokens_total 1")); + assert!(text.contains("openinfer_frontend_queue_wait_ms_count 1")); + assert!(text.contains("openinfer_frontend_ttft_ms_count 1")); + assert!(text.contains("openinfer_frontend_request_duration_ms_count 1")); +} + /// A lone `Scheduled` (no token yet) emits nothing; its metadata waits in the /// stream state across bursts and flushes onto the first real output. This is /// the reason `RequestStreamState` holds `first_token_*` between bursts. @@ -333,11 +383,13 @@ fn burst_batches_multiple_requests_into_one_message() { #[test] fn aborted_request_drops_late_tokens() { let mut d = Demux::new(); - let cancelled = d.add("req-abort"); + d.add("req-abort"); // Replicate the Abort handler: flip the cancel flag, drop the stream. - cancelled.store(true, Ordering::Relaxed); - d.streams.remove("req-abort"); + let state = d.streams.remove("req-abort").expect("registered stream"); + state.cancelled.store(true, Ordering::Relaxed); + d.telemetry + .finish_request(&state.trace, RequestOutcome::Aborted, now_secs_f64()); d.emit( "req-abort", @@ -351,6 +403,9 @@ fn aborted_request_drops_late_tokens() { d.next_output().is_none(), "no output is produced for an aborted request" ); + let text = d.telemetry.render(); + assert!(text.contains("openinfer_frontend_active_requests 0")); + assert!(text.contains("openinfer_frontend_requests_finished_total{outcome=\"aborted\"} 1")); } /// A rejected request (could not be admitted, e.g. too large for the KV cache) diff --git a/openinfer-vllm-frontend/src/lib.rs b/openinfer-vllm-frontend/src/lib.rs index 8d0549a1..98345d46 100644 --- a/openinfer-vllm-frontend/src/lib.rs +++ b/openinfer-vllm-frontend/src/lib.rs @@ -22,13 +22,19 @@ use openinfer_engine::engine::EngineHandle; mod bridge; mod lora; mod sampling_guard; +pub mod telemetry; mod wire; use bridge::{LocalEngineBridge, ipc_endpoint, local_ipc_namespace}; use lora::{bad_request, load_startup_lora_modules, lora_openai_routes, lora_routes}; use sampling_guard::{ServableCap, guard_generation_request}; +use telemetry::{guard_metrics_request, traces_router}; pub use lora::LoraModule; +pub use telemetry::{ + OpenTelemetryOptions, OpenTelemetrySink, RequestMetrics, RequestOutcome, Telemetry, + TelemetryOptions, +}; const COMPLETION_ROUTE_BODY_LIMIT: usize = 2 * 1024 * 1024; @@ -60,6 +66,27 @@ pub async fn serve( port: u16, max_model_len: Option, shutdown: CancellationToken, +) -> Result<()> { + serve_with_telemetry( + engine, + model_path, + served_model_name, + port, + max_model_len, + shutdown, + Telemetry::new(), + ) + .await +} + +pub async fn serve_with_telemetry( + engine: impl Future> + Send + 'static, + model_path: &Path, + served_model_name: Vec, + port: u16, + max_model_len: Option, + shutdown: CancellationToken, + telemetry: Telemetry, ) -> Result<()> { serve_model_on_host( engine, @@ -69,6 +96,7 @@ pub async fn serve( port, resolve_max_model_len(model_path, max_model_len), shutdown, + telemetry, ) .await } @@ -81,6 +109,29 @@ pub async fn serve_model_with_lora_routes( port: u16, max_model_len: u32, shutdown: CancellationToken, +) -> Result<()> { + serve_model_with_lora_routes_and_telemetry( + handle, + model_id, + served_model_name, + lora_modules, + port, + max_model_len, + shutdown, + Telemetry::new(), + ) + .await +} + +pub async fn serve_model_with_lora_routes_and_telemetry( + handle: EngineHandle, + model_id: impl Into, + served_model_name: Vec, + lora_modules: Vec, + port: u16, + max_model_len: u32, + shutdown: CancellationToken, + telemetry: Telemetry, ) -> Result<()> { let model_id = model_id.into(); let adapter_names = Arc::new(RwLock::new(HashSet::new())); @@ -97,6 +148,7 @@ pub async fn serve_model_with_lora_routes( port, max_model_len, shutdown, + telemetry, move |router| { let lora_router = lora_routes(handle.clone(), Arc::clone(&adapter_names)); let openai_router = lora_openai_routes( @@ -119,6 +171,7 @@ async fn serve_model_on_host( port: u16, max_model_len: u32, shutdown: CancellationToken, + telemetry: Telemetry, ) -> Result<()> { serve_model_on_host_with_router_extension( engine, @@ -128,6 +181,7 @@ async fn serve_model_on_host( port, max_model_len, shutdown, + telemetry, |router| router, ) .await @@ -141,6 +195,7 @@ async fn serve_model_on_host_with_router_extension( port: u16, max_model_len: u32, shutdown: CancellationToken, + telemetry: Telemetry, extend_router: F, ) -> Result<()> where @@ -161,6 +216,7 @@ where let bridge_shutdown = shutdown.child_token(); let engine_task = tokio::spawn({ let servable_cap = servable_cap.clone(); + let telemetry = telemetry.clone(); let server_shutdown = server_shutdown.clone(); let bridge_shutdown = bridge_shutdown.clone(); let input_address = input_address.clone(); @@ -180,6 +236,7 @@ where output_address, handle, max_model_len: servable_limit.unwrap_or(max_model_len), + telemetry, }; if let Err(error) = bridge.run(bridge_shutdown).await { warn!("local vLLM engine bridge exited: {error:#}"); @@ -223,7 +280,10 @@ where }; let extend_router = move |router: Router| { - extend_router(router).layer(from_fn_with_state(servable_cap, guard_generation_request)) + extend_router(router) + .merge(traces_router(telemetry.clone())) + .layer(from_fn_with_state(telemetry, guard_metrics_request)) + .layer(from_fn_with_state(servable_cap, guard_generation_request)) }; let result = vllm_server::serve_with_router_extension(config, server_shutdown, extend_router).await; diff --git a/openinfer-vllm-frontend/src/telemetry.rs b/openinfer-vllm-frontend/src/telemetry.rs new file mode 100644 index 00000000..184aeda4 --- /dev/null +++ b/openinfer-vllm-frontend/src/telemetry.rs @@ -0,0 +1,685 @@ +use std::collections::VecDeque; +use std::fmt::Write as _; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; + +use axum::Router; +use axum::body::{Body, to_bytes}; +use axum::extract::State; +use axum::http::header::CONTENT_TYPE; +use axum::http::{HeaderValue, Request}; +use axum::middleware::Next; +use axum::response::{IntoResponse, Response}; +use axum::routing::get; +use serde_json::{Value, json}; + +mod otlp; +mod trace; + +pub use otlp::{OpenTelemetryOptions, OpenTelemetrySink}; +use trace::RequestRecord; +pub(crate) use trace::RequestTrace; + +const PROMETHEUS_TEXT: &str = "text/plain; version=0.0.4; charset=utf-8"; +const JSON: &str = "application/json"; +const DEFAULT_METRIC_PREFIX: &str = "openinfer_frontend"; +const LATENCY_BUCKETS_MS: &[f64] = &[ + 1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1_000.0, 2_500.0, 5_000.0, 10_000.0, +]; + +#[derive(Clone)] +pub struct Telemetry { + inner: Arc, +} + +struct Inner { + request_log_enabled: bool, + http_trace_log_enabled: bool, + trace_buffer_capacity: usize, + trace_buffer: Option>>, + opentelemetry_sink: Option, + active_requests: AtomicI64, + started_requests: AtomicU64, + finished_requests: [AtomicU64; RequestOutcome::COUNT], + prompt_tokens: AtomicU64, + cached_prompt_tokens: AtomicU64, + completion_tokens: AtomicU64, + metrics: MetricNames, + queue_wait_ms: Histogram, + ttft_ms: Histogram, + request_duration_ms: Histogram, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RequestOutcome { + Length, + Stop, + Error, + Rejected, + Aborted, +} + +impl RequestOutcome { + const COUNT: usize = 5; + const ALL: [Self; Self::COUNT] = [ + Self::Length, + Self::Stop, + Self::Error, + Self::Rejected, + Self::Aborted, + ]; + + fn index(self) -> usize { + match self { + Self::Length => 0, + Self::Stop => 1, + Self::Error => 2, + Self::Rejected => 3, + Self::Aborted => 4, + } + } + + pub fn label(self) -> &'static str { + match self { + Self::Length => "length", + Self::Stop => "stop", + Self::Error => "error", + Self::Rejected => "rejected", + Self::Aborted => "aborted", + } + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct RequestMetrics { + pub queue_wait_ms: Option, + pub ttft_ms: Option, + pub duration_ms: Option, + pub prompt_tokens: usize, + pub cached_prompt_tokens: usize, + pub completion_tokens: usize, +} + +#[derive(Clone)] +pub struct TelemetryOptions { + pub request_log_enabled: bool, + pub http_trace_log_enabled: bool, + pub trace_buffer_capacity: usize, + /// Prometheus metric prefix. Defaults to `openinfer_frontend`. + pub metric_prefix: String, + /// Prometheus latency histogram buckets in milliseconds. + /// + /// Values are filtered, sorted, and deduplicated. If no usable bucket is + /// provided, the default serving-oriented bucket set is used. + pub latency_buckets_ms: Vec, + /// Optional built-in OTLP payload sink. + /// + /// Custom integrations should subscribe to the emitted `tracing` spans and + /// events instead of adding callbacks here. + pub opentelemetry_sink: Option, +} + +impl Default for TelemetryOptions { + fn default() -> Self { + Self { + request_log_enabled: false, + http_trace_log_enabled: false, + trace_buffer_capacity: 0, + metric_prefix: DEFAULT_METRIC_PREFIX.to_string(), + latency_buckets_ms: LATENCY_BUCKETS_MS.to_vec(), + opentelemetry_sink: None, + } + } +} + +struct MetricNames { + active_requests: String, + requests_started_total: String, + requests_finished_total: String, + prompt_tokens_total: String, + cached_prompt_tokens_total: String, + completion_tokens_total: String, + queue_wait_ms: String, + ttft_ms: String, + request_duration_ms: String, +} + +impl MetricNames { + fn new(prefix: &str) -> Self { + let prefix = if prefix.trim().is_empty() { + DEFAULT_METRIC_PREFIX + } else { + prefix.trim() + }; + Self { + active_requests: format!("{prefix}_active_requests"), + requests_started_total: format!("{prefix}_requests_started_total"), + requests_finished_total: format!("{prefix}_requests_finished_total"), + prompt_tokens_total: format!("{prefix}_prompt_tokens_total"), + cached_prompt_tokens_total: format!("{prefix}_cached_prompt_tokens_total"), + completion_tokens_total: format!("{prefix}_completion_tokens_total"), + queue_wait_ms: format!("{prefix}_queue_wait_ms"), + ttft_ms: format!("{prefix}_ttft_ms"), + request_duration_ms: format!("{prefix}_request_duration_ms"), + } + } +} + +impl Default for Telemetry { + fn default() -> Self { + Self::new() + } +} + +impl Telemetry { + pub fn new() -> Self { + Self::with_options(TelemetryOptions { + request_log_enabled: env_flag("OPENINFER_TELEMETRY_LOG"), + http_trace_log_enabled: env_flag("OPENINFER_HTTP_TRACE"), + trace_buffer_capacity: env_usize("OPENINFER_TRACE_BUFFER").unwrap_or(0), + ..TelemetryOptions::default() + }) + } + + pub fn with_options(options: TelemetryOptions) -> Self { + let TelemetryOptions { + request_log_enabled, + http_trace_log_enabled, + trace_buffer_capacity, + metric_prefix, + latency_buckets_ms, + opentelemetry_sink, + } = options; + let latency_buckets_ms = normalize_latency_buckets(latency_buckets_ms); + Self { + inner: Arc::new(Inner { + request_log_enabled, + http_trace_log_enabled, + trace_buffer_capacity, + trace_buffer: (trace_buffer_capacity > 0) + .then(|| Mutex::new(VecDeque::with_capacity(trace_buffer_capacity))), + opentelemetry_sink, + active_requests: AtomicI64::new(0), + started_requests: AtomicU64::new(0), + finished_requests: std::array::from_fn(|_| AtomicU64::new(0)), + prompt_tokens: AtomicU64::new(0), + cached_prompt_tokens: AtomicU64::new(0), + completion_tokens: AtomicU64::new(0), + metrics: MetricNames::new(&metric_prefix), + queue_wait_ms: Histogram::new(&latency_buckets_ms), + ttft_ms: Histogram::new(&latency_buckets_ms), + request_duration_ms: Histogram::new(&latency_buckets_ms), + }), + } + } + + pub(crate) fn request_started(&self) { + self.inner.started_requests.fetch_add(1, Ordering::Relaxed); + self.inner.active_requests.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn request_rejected(&self, prompt_tokens: usize) { + self.request_started(); + self.request_finished( + RequestOutcome::Rejected, + RequestMetrics { + prompt_tokens, + ..RequestMetrics::default() + }, + ); + } + + pub(crate) fn trace_enabled(&self) -> bool { + self.inner.request_log_enabled + || self.inner.http_trace_log_enabled + || self.inner.trace_buffer_capacity > 0 + || self.inner.opentelemetry_sink.is_some() + } + + pub(crate) fn record_trace(&self, trace: Value) { + if self.inner.request_log_enabled { + tracing::info!("openinfer_request_log {trace}"); + } + if self.inner.http_trace_log_enabled { + tracing::info!("openinfer_http_trace {trace}"); + } + if let Some(sink) = &self.inner.opentelemetry_sink { + sink.enqueue_trace(&trace); + } + if let Some(buffer) = &self.inner.trace_buffer { + if let Ok(mut buffer) = buffer.lock() { + if buffer.len() == self.inner.trace_buffer_capacity { + buffer.pop_front(); + } + buffer.push_back(trace); + } + } + } + + pub(crate) fn request_finished(&self, outcome: RequestOutcome, metrics: RequestMetrics) { + self.inner.finished_requests[outcome.index()].fetch_add(1, Ordering::Relaxed); + self.inner.active_requests.fetch_sub(1, Ordering::Relaxed); + self.inner + .prompt_tokens + .fetch_add(metrics.prompt_tokens as u64, Ordering::Relaxed); + self.inner + .cached_prompt_tokens + .fetch_add(metrics.cached_prompt_tokens as u64, Ordering::Relaxed); + self.inner + .completion_tokens + .fetch_add(metrics.completion_tokens as u64, Ordering::Relaxed); + + if let Some(value) = metrics.queue_wait_ms { + self.inner.queue_wait_ms.record(value); + } + if let Some(value) = metrics.ttft_ms { + self.inner.ttft_ms.record(value); + } + if let Some(value) = metrics.duration_ms { + self.inner.request_duration_ms.record(value); + } + } + + pub(crate) fn record_request(&self, record: RequestRecord) { + self.request_finished(record.outcome, record.metrics); + if let Some(trace) = record.trace { + self.record_trace(trace); + } + } + + pub(crate) fn finish_request( + &self, + trace: &RequestTrace, + outcome: RequestOutcome, + terminal_at_unix_s: f64, + ) { + self.record_request(trace.finish(outcome, terminal_at_unix_s, self.trace_enabled())); + } + + pub(crate) fn render(&self) -> String { + let mut out = String::new(); + render_gauge( + &mut out, + &self.inner.metrics.active_requests, + "Requests currently tracked by the local frontend bridge.", + self.inner.active_requests.load(Ordering::Relaxed), + ); + render_counter( + &mut out, + &self.inner.metrics.requests_started_total, + "Requests accepted by the local frontend bridge.", + self.inner.started_requests.load(Ordering::Relaxed), + ); + + let _ = writeln!( + out, + "# HELP {} Requests terminated by outcome.", + self.inner.metrics.requests_finished_total + ); + let _ = writeln!( + out, + "# TYPE {} counter", + self.inner.metrics.requests_finished_total + ); + for outcome in RequestOutcome::ALL { + let _ = writeln!( + out, + "{}{{outcome=\"{}\"}} {}", + self.inner.metrics.requests_finished_total, + outcome.label(), + self.inner.finished_requests[outcome.index()].load(Ordering::Relaxed) + ); + } + + render_counter( + &mut out, + &self.inner.metrics.prompt_tokens_total, + "Prompt tokens observed at request termination.", + self.inner.prompt_tokens.load(Ordering::Relaxed), + ); + render_counter( + &mut out, + &self.inner.metrics.cached_prompt_tokens_total, + "Prompt tokens reported as prefix-cache hits.", + self.inner.cached_prompt_tokens.load(Ordering::Relaxed), + ); + render_counter( + &mut out, + &self.inner.metrics.completion_tokens_total, + "Completion tokens emitted by terminated requests.", + self.inner.completion_tokens.load(Ordering::Relaxed), + ); + render_histogram( + &mut out, + &self.inner.metrics.queue_wait_ms, + "Milliseconds between engine queue and scheduler admission.", + &self.inner.queue_wait_ms, + ); + render_histogram( + &mut out, + &self.inner.metrics.ttft_ms, + "Milliseconds between engine queue and first token emitted by the bridge.", + &self.inner.ttft_ms, + ); + render_histogram( + &mut out, + &self.inner.metrics.request_duration_ms, + "Milliseconds between engine queue and terminal event.", + &self.inner.request_duration_ms, + ); + out + } + + pub(crate) fn render_traces(&self) -> String { + let traces: Vec = self + .inner + .trace_buffer + .as_ref() + .and_then(|buffer| { + buffer + .lock() + .ok() + .map(|buffer| buffer.iter().cloned().collect()) + }) + .unwrap_or_default(); + json!({ + "enabled": self.inner.trace_buffer_capacity > 0, + "capacity": self.inner.trace_buffer_capacity, + "traces": traces, + }) + .to_string() + } +} + +pub(crate) async fn guard_metrics_request( + State(telemetry): State, + req: Request, + next: Next, +) -> Response { + if req.uri().path() == "/metrics" { + let response = next.run(req).await; + let (mut parts, body) = response.into_parts(); + return match to_bytes(body, usize::MAX).await { + Ok(bytes) => { + let mut text = String::from_utf8_lossy(&bytes).into_owned(); + if !text.ends_with('\n') { + text.push('\n'); + } + text.push_str(&telemetry.render()); + parts + .headers + .insert(CONTENT_TYPE, HeaderValue::from_static(PROMETHEUS_TEXT)); + Response::from_parts(parts, Body::from(text)) + } + Err(_) => ([(CONTENT_TYPE, PROMETHEUS_TEXT)], telemetry.render()).into_response(), + }; + } + next.run(req).await +} + +pub(crate) fn traces_router(telemetry: Telemetry) -> Router { + Router::new() + .route("/openinfer/traces", get(traces_handler)) + .with_state(telemetry) +} + +async fn traces_handler(State(telemetry): State) -> impl IntoResponse { + ([(CONTENT_TYPE, JSON)], telemetry.render_traces()) +} + +fn env_flag(name: &str) -> bool { + std::env::var(name) + .is_ok_and(|value| !matches!(value.as_str(), "" | "0" | "false" | "False" | "FALSE")) +} + +fn env_usize(name: &str) -> Option { + std::env::var(name).ok()?.parse().ok() +} + +fn normalize_latency_buckets(mut buckets: Vec) -> Vec { + buckets.retain(|bucket| bucket.is_finite() && *bucket >= 0.0); + buckets.sort_by(|left, right| left.partial_cmp(right).unwrap()); + buckets.dedup_by(|left, right| left == right); + if buckets.is_empty() { + LATENCY_BUCKETS_MS.to_vec() + } else { + buckets + } +} + +struct Histogram { + buckets: Vec, + bucket_hits: Vec, + count: AtomicU64, + sum_micros: AtomicU64, +} + +impl Histogram { + fn new(buckets: &[f64]) -> Self { + Self { + buckets: buckets.to_vec(), + bucket_hits: buckets.iter().map(|_| AtomicU64::new(0)).collect(), + count: AtomicU64::new(0), + sum_micros: AtomicU64::new(0), + } + } + + fn record(&self, value_ms: f64) { + if !value_ms.is_finite() || value_ms < 0.0 { + return; + } + self.count.fetch_add(1, Ordering::Relaxed); + self.sum_micros + .fetch_add((value_ms * 1_000.0).round() as u64, Ordering::Relaxed); + if let Some(index) = self.buckets.iter().position(|bucket| value_ms <= *bucket) { + self.bucket_hits[index].fetch_add(1, Ordering::Relaxed); + } + } +} + +fn render_counter(out: &mut String, name: &str, help: &str, value: u64) { + let _ = writeln!(out, "# HELP {name} {help}"); + let _ = writeln!(out, "# TYPE {name} counter"); + let _ = writeln!(out, "{name} {value}"); +} + +fn render_gauge(out: &mut String, name: &str, help: &str, value: i64) { + let _ = writeln!(out, "# HELP {name} {help}"); + let _ = writeln!(out, "# TYPE {name} gauge"); + let _ = writeln!(out, "{name} {value}"); +} + +fn render_histogram(out: &mut String, name: &str, help: &str, histogram: &Histogram) { + let _ = writeln!(out, "# HELP {name} {help}"); + let _ = writeln!(out, "# TYPE {name} histogram"); + + let mut cumulative = 0; + for (bucket, hits) in histogram.buckets.iter().zip(&histogram.bucket_hits) { + cumulative += hits.load(Ordering::Relaxed); + let _ = writeln!( + out, + "{name}_bucket{{le=\"{}\"}} {cumulative}", + bucket_label(*bucket) + ); + } + let count = histogram.count.load(Ordering::Relaxed); + let _ = writeln!(out, "{name}_bucket{{le=\"+Inf\"}} {count}"); + let _ = writeln!( + out, + "{name}_sum {:.3}", + histogram.sum_micros.load(Ordering::Relaxed) as f64 / 1_000.0 + ); + let _ = writeln!(out, "{name}_count {count}"); +} + +fn bucket_label(value: f64) -> String { + if value.fract() == 0.0 { + format!("{value:.0}") + } else { + value.to_string() + } +} + +#[cfg(test)] +mod tests { + use axum::Router; + use axum::body::{Body, to_bytes}; + use axum::http::{Request, StatusCode}; + use axum::middleware::from_fn_with_state; + use axum::routing::get; + use tower::ServiceExt; + + use super::*; + + #[test] + fn telemetry_renders_prometheus_counters_and_histograms() { + let telemetry = Telemetry::new(); + telemetry.request_started(); + telemetry.request_finished( + RequestOutcome::Length, + RequestMetrics { + queue_wait_ms: Some(3.0), + ttft_ms: Some(12.5), + duration_ms: Some(42.0), + prompt_tokens: 11, + cached_prompt_tokens: 4, + completion_tokens: 2, + }, + ); + + let text = telemetry.render(); + assert!(text.contains("openinfer_frontend_active_requests 0")); + assert!(text.contains("openinfer_frontend_requests_started_total 1")); + assert!(text.contains("openinfer_frontend_requests_finished_total{outcome=\"length\"} 1")); + assert!(text.contains("openinfer_frontend_prompt_tokens_total 11")); + assert!(text.contains("openinfer_frontend_cached_prompt_tokens_total 4")); + assert!(text.contains("openinfer_frontend_completion_tokens_total 2")); + assert!(text.contains("openinfer_frontend_ttft_ms_count 1")); + } + + #[test] + fn telemetry_options_customize_metric_prefix_and_latency_buckets() { + let telemetry = Telemetry::with_options(TelemetryOptions { + metric_prefix: "tenant_infer".to_string(), + latency_buckets_ms: vec![7.0, 1.0, 7.0], + ..TelemetryOptions::default() + }); + + telemetry.request_started(); + telemetry.request_finished( + RequestOutcome::Stop, + RequestMetrics { + ttft_ms: Some(6.0), + ..RequestMetrics::default() + }, + ); + + let text = telemetry.render(); + assert!(text.contains("tenant_infer_active_requests 0")); + assert!(text.contains("tenant_infer_ttft_ms_bucket{le=\"1\"} 0")); + assert!(text.contains("tenant_infer_ttft_ms_bucket{le=\"7\"} 1")); + assert!(text.contains("tenant_infer_ttft_ms_count 1")); + assert!(!text.contains("openinfer_frontend_active_requests")); + } + + #[tokio::test] + async fn telemetry_exports_traces_to_builtin_otlp_sink() { + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + let telemetry = Telemetry::with_options(TelemetryOptions { + opentelemetry_sink: Some(OpenTelemetrySink::new( + sender, + OpenTelemetryOptions { + service_name: "openinfer-test".to_string(), + }, + )), + ..TelemetryOptions::default() + }); + + assert!(telemetry.trace_enabled()); + telemetry.record_trace(json!({ + "request_id":"req-1", + "queued_at_unix_s": 1.0, + "terminal_at_unix_s": 1.020, + "finish_reason": "stop", + })); + + let payload = receiver.try_recv().unwrap(); + assert_eq!( + payload["resourceSpans"][0]["resource"]["attributes"][0]["value"]["stringValue"], + "openinfer-test" + ); + } + + #[test] + fn trace_buffer_keeps_latest_request_traces() { + let telemetry = Telemetry::with_options(TelemetryOptions { + trace_buffer_capacity: 2, + ..TelemetryOptions::default() + }); + telemetry.record_trace(json!({"request_id":"old"})); + telemetry.record_trace(json!({"request_id":"new-a"})); + telemetry.record_trace(json!({"request_id":"new-b"})); + + let traces: Value = serde_json::from_str(&telemetry.render_traces()).unwrap(); + assert_eq!(traces["enabled"], true); + assert_eq!(traces["capacity"], 2); + assert_eq!(traces["traces"].as_array().unwrap().len(), 2); + assert!(!traces.to_string().contains("old")); + assert!(traces.to_string().contains("new-a")); + assert!(traces.to_string().contains("new-b")); + } + + #[tokio::test] + async fn metrics_route_returns_text_exposition() { + let telemetry = Telemetry::new(); + telemetry.request_started(); + let router = Router::new() + .route("/metrics", get(|| async { "vllm metrics" })) + .layer(from_fn_with_state(telemetry, guard_metrics_request)); + + let response = router + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.headers().get(CONTENT_TYPE).unwrap(), + PROMETHEUS_TEXT + ); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let text = String::from_utf8(body.to_vec()).unwrap(); + assert!(text.contains("vllm metrics")); + assert!(text.contains("openinfer_frontend_active_requests 1")); + } + + #[tokio::test] + async fn traces_route_returns_buffered_traces() { + let telemetry = Telemetry::with_options(TelemetryOptions { + trace_buffer_capacity: 4, + ..TelemetryOptions::default() + }); + telemetry.record_trace(json!({"request_id":"req-1"})); + let router = traces_router(telemetry); + + let response = router + .oneshot( + Request::builder() + .uri("/openinfer/traces") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.headers().get(CONTENT_TYPE).unwrap(), JSON); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let traces: Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(traces["traces"][0]["request_id"], "req-1"); + } +} diff --git a/openinfer-vllm-frontend/src/telemetry/otlp.rs b/openinfer-vllm-frontend/src/telemetry/otlp.rs new file mode 100644 index 00000000..7fd9f240 --- /dev/null +++ b/openinfer-vllm-frontend/src/telemetry/otlp.rs @@ -0,0 +1,236 @@ +use serde_json::{Value, json}; +use tokio::sync::mpsc; +use uuid::Uuid; + +const DEFAULT_OTEL_SERVICE_NAME: &str = "openinfer"; +const OTEL_SCOPE_NAME: &str = "openinfer-vllm-frontend"; +const OTEL_REQUEST_SPAN_NAME: &str = "openinfer.request"; + +#[derive(Clone, Debug)] +pub struct OpenTelemetryOptions { + pub service_name: String, +} + +impl OpenTelemetryOptions { + pub fn from_env() -> Self { + Self { + service_name: env_non_empty("OPENINFER_OTEL_SERVICE_NAME") + .or_else(|| env_non_empty("OTEL_SERVICE_NAME")) + .unwrap_or_else(|| DEFAULT_OTEL_SERVICE_NAME.to_string()), + } + } +} + +impl Default for OpenTelemetryOptions { + fn default() -> Self { + Self { + service_name: DEFAULT_OTEL_SERVICE_NAME.to_string(), + } + } +} + +#[derive(Clone)] +pub struct OpenTelemetrySink { + sender: mpsc::Sender, + service_name: String, +} + +impl OpenTelemetrySink { + pub fn new(sender: mpsc::Sender, options: OpenTelemetryOptions) -> Self { + Self { + sender, + service_name: options.service_name, + } + } + + pub(crate) fn enqueue_trace(&self, trace: &Value) { + if let Ok(permit) = self.sender.try_reserve() { + permit.send(opentelemetry_trace_payload(&self.service_name, trace)); + } + } +} + +fn env_non_empty(name: &str) -> Option { + std::env::var(name).ok().filter(|value| !value.is_empty()) +} + +fn opentelemetry_trace_payload(service_name: &str, trace: &Value) -> Value { + json!({ + "resourceSpans": [{ + "resource": { + "attributes": [ + otel_string_attr("service.name", service_name), + otel_string_attr("telemetry.sdk.language", "rust"), + otel_string_attr("telemetry.sdk.name", "openinfer"), + ] + }, + "scopeSpans": [{ + "scope": {"name": OTEL_SCOPE_NAME}, + "spans": [opentelemetry_request_span(trace)] + }] + }] + }) +} + +fn opentelemetry_request_span(trace: &Value) -> Value { + let start = trace + .get("queued_at_unix_s") + .and_then(Value::as_f64) + .map(unix_s_to_nanos) + .unwrap_or_else(|| "0".to_string()); + let end = trace + .get("terminal_at_unix_s") + .and_then(Value::as_f64) + .map(unix_s_to_nanos) + .unwrap_or_else(|| start.clone()); + let finish_reason = trace + .get("finish_reason") + .and_then(Value::as_str) + .unwrap_or("unknown"); + json!({ + "traceId": Uuid::new_v4().simple().to_string(), + "spanId": Uuid::new_v4().simple().to_string()[..16].to_string(), + "name": OTEL_REQUEST_SPAN_NAME, + "kind": 2, + "startTimeUnixNano": start, + "endTimeUnixNano": end, + "attributes": opentelemetry_attributes(trace, finish_reason), + "events": opentelemetry_events(trace), + "status": {"code": opentelemetry_status_code(finish_reason)}, + }) +} + +fn opentelemetry_attributes(trace: &Value, finish_reason: &str) -> Vec { + let mut attributes = Vec::new(); + attributes.push(otel_string_attr("openinfer.finish_reason", finish_reason)); + if let Some(request_id) = trace.get("request_id").and_then(Value::as_str) { + attributes.push(otel_string_attr("openinfer.request_id", request_id)); + } + for (name, key) in [ + ("openinfer.prompt_tokens", "prompt_tokens"), + ("openinfer.cached_tokens", "cached_tokens"), + ("openinfer.completion_tokens", "completion_tokens"), + ] { + if let Some(value) = trace.get(key).and_then(Value::as_u64) { + attributes.push(otel_int_attr(name, value)); + } + } + if let Some(prefill_ms) = trace.get("prefill_ms").and_then(Value::as_f64) { + attributes.push(otel_double_attr("openinfer.prefill_ms", prefill_ms)); + } + attributes +} + +fn opentelemetry_events(trace: &Value) -> Vec { + let mut events = Vec::new(); + for (name, key) in [ + ("scheduled", "scheduled_at_unix_s"), + ("first_token", "first_token_emit_unix_s"), + ] { + if let Some(timestamp) = trace.get(key).and_then(Value::as_f64) { + events.push(json!({ + "name": name, + "timeUnixNano": unix_s_to_nanos(timestamp), + })); + } + } + events +} + +fn opentelemetry_status_code(finish_reason: &str) -> i32 { + match finish_reason { + "error" | "rejected" => 2, + _ => 1, + } +} + +fn otel_string_attr(key: &str, value: &str) -> Value { + json!({"key": key, "value": {"stringValue": value}}) +} + +fn otel_int_attr(key: &str, value: u64) -> Value { + json!({"key": key, "value": {"intValue": value.to_string()}}) +} + +fn otel_double_attr(key: &str, value: f64) -> Value { + json!({"key": key, "value": {"doubleValue": value}}) +} + +fn unix_s_to_nanos(value: f64) -> String { + if !value.is_finite() || value <= 0.0 { + return "0".to_string(); + } + ((value * 1_000_000_000.0).round() as u64).to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn opentelemetry_payload_uses_otlp_trace_shape() { + let payload = opentelemetry_trace_payload( + "openinfer-test", + &json!({ + "request_id": "req-1", + "queued_at_unix_s": 1.0, + "scheduled_at_unix_s": 1.001, + "first_token_emit_unix_s": 1.010, + "terminal_at_unix_s": 1.020, + "finish_reason": "stop", + "prompt_tokens": 11, + "cached_tokens": 4, + "completion_tokens": 2, + "prefill_ms": 9.0, + }), + ); + + let resource_span = &payload["resourceSpans"][0]; + assert_eq!( + resource_span["resource"]["attributes"][0]["value"]["stringValue"], + "openinfer-test" + ); + let span = &resource_span["scopeSpans"][0]["spans"][0]; + assert_eq!(span["name"], OTEL_REQUEST_SPAN_NAME); + assert_eq!(span["kind"], 2); + assert_eq!(span["startTimeUnixNano"], "1000000000"); + assert_eq!(span["endTimeUnixNano"], "1020000000"); + assert_eq!(span["events"][0]["name"], "scheduled"); + assert_eq!(span["events"][1]["name"], "first_token"); + assert!( + span["attributes"] + .to_string() + .contains("openinfer.request_id") + ); + assert!( + span["attributes"] + .to_string() + .contains("openinfer.prompt_tokens") + ); + } + + #[tokio::test] + async fn opentelemetry_sink_queues_payload_for_caller_exporter() { + let (sender, mut receiver) = mpsc::channel(1); + let sink = OpenTelemetrySink::new( + sender, + OpenTelemetryOptions { + service_name: "openinfer-test".to_string(), + }, + ); + + let trace = json!({ + "request_id": "req-1", + "queued_at_unix_s": 1.0, + "terminal_at_unix_s": 1.020, + "finish_reason": "stop", + }); + sink.enqueue_trace(&trace); + + let payload = receiver.try_recv().unwrap(); + assert_eq!( + payload["resourceSpans"][0]["resource"]["attributes"][0]["value"]["stringValue"], + "openinfer-test" + ); + } +} diff --git a/openinfer-vllm-frontend/src/telemetry/trace.rs b/openinfer-vllm-frontend/src/telemetry/trace.rs new file mode 100644 index 00000000..ed9cd672 --- /dev/null +++ b/openinfer-vllm-frontend/src/telemetry/trace.rs @@ -0,0 +1,160 @@ +use openinfer_engine::engine::{FinishReason, TokenEvent}; + +use super::{RequestMetrics, RequestOutcome}; + +pub(crate) struct RequestRecord { + pub(crate) outcome: RequestOutcome, + pub(crate) metrics: RequestMetrics, + pub(crate) trace: Option, +} + +pub(crate) struct RequestTrace { + request_id: String, + queued_at_unix_s: f64, + scheduled_at_unix_s: Option, + first_token_emit_unix_s: Option, + prompt_tokens: usize, + cached_tokens: usize, + completion_tokens: usize, +} + +impl RequestTrace { + pub(crate) fn new(request_id: String, queued_at_unix_s: f64, prompt_tokens: usize) -> Self { + Self { + request_id, + queued_at_unix_s, + scheduled_at_unix_s: None, + first_token_emit_unix_s: None, + prompt_tokens, + cached_tokens: 0, + completion_tokens: 0, + } + } + + pub(crate) fn observe_event(&mut self, event: &TokenEvent) -> Option { + match event { + TokenEvent::Scheduled { + queued_at_unix_s, + scheduled_at_unix_s, + prompt_tokens, + cached_tokens, + } => { + self.queued_at_unix_s = *queued_at_unix_s; + self.scheduled_at_unix_s = Some(*scheduled_at_unix_s); + self.prompt_tokens = *prompt_tokens; + self.cached_tokens = *cached_tokens; + None + } + TokenEvent::Token { .. } => { + if self.first_token_emit_unix_s.is_none() { + let first_token_emit_unix_s = now_secs_f64(); + self.first_token_emit_unix_s = Some(first_token_emit_unix_s); + } + self.completion_tokens = self.completion_tokens.saturating_add(1); + None + } + TokenEvent::PromptTokens { .. } => None, + TokenEvent::Finished { + finish_reason, + prompt_tokens, + completion_tokens, + } => { + self.observe_terminal(*prompt_tokens, *completion_tokens); + Some(outcome_from_finish_reason(*finish_reason)) + } + TokenEvent::Error { + prompt_tokens, + completion_tokens, + .. + } => { + self.observe_terminal(*prompt_tokens, *completion_tokens); + Some(RequestOutcome::Error) + } + TokenEvent::Rejected { + prompt_tokens, + completion_tokens, + .. + } => { + self.observe_terminal(*prompt_tokens, *completion_tokens); + Some(RequestOutcome::Rejected) + } + } + } + + pub(crate) fn finish( + &self, + outcome: RequestOutcome, + terminal_at_unix_s: f64, + include_trace: bool, + ) -> RequestRecord { + let metrics = self.metrics(terminal_at_unix_s); + RequestRecord { + outcome, + metrics, + trace: include_trace.then(|| self.to_json(outcome, terminal_at_unix_s)), + } + } + + fn observe_terminal(&mut self, prompt_tokens: usize, completion_tokens: usize) { + self.prompt_tokens = prompt_tokens; + self.completion_tokens = completion_tokens; + } + + fn metrics(&self, terminal_at_unix_s: f64) -> RequestMetrics { + RequestMetrics { + queue_wait_ms: self + .scheduled_at_unix_s + .map(|scheduled| ms_between(self.queued_at_unix_s, scheduled)), + ttft_ms: self + .first_token_emit_unix_s + .map(|first| ms_between(self.queued_at_unix_s, first)), + duration_ms: Some(ms_between(self.queued_at_unix_s, terminal_at_unix_s)), + prompt_tokens: self.prompt_tokens, + cached_prompt_tokens: self.cached_tokens, + completion_tokens: self.completion_tokens, + } + } + + fn to_json(&self, outcome: RequestOutcome, terminal_at_unix_s: f64) -> serde_json::Value { + let mut trace = serde_json::json!({ + "request_id": self.request_id, + "queued_at_unix_s": self.queued_at_unix_s, + "terminal_at_unix_s": terminal_at_unix_s, + "finish_reason": outcome.label(), + "prompt_tokens": self.prompt_tokens, + "cached_tokens": self.cached_tokens, + "completion_tokens": self.completion_tokens, + }); + if let Some(scheduled_at) = self.scheduled_at_unix_s { + trace["scheduled_at_unix_s"] = serde_json::json!(scheduled_at); + } + if let Some(first_token_at) = self.first_token_emit_unix_s { + trace["first_token_emit_unix_s"] = serde_json::json!(first_token_at); + if let Some(scheduled_at) = self.scheduled_at_unix_s { + trace["prefill_ms"] = serde_json::json!(ms_between(scheduled_at, first_token_at)); + } + } + trace + } +} + +fn outcome_from_finish_reason(reason: FinishReason) -> RequestOutcome { + match reason { + FinishReason::Length => RequestOutcome::Length, + FinishReason::Stop => RequestOutcome::Stop, + FinishReason::Error => RequestOutcome::Error, + } +} + +fn ms_between(start_unix_s: f64, end_unix_s: f64) -> f64 { + ((end_unix_s - start_unix_s) * 1_000.0).max(0.0) +} + +fn now_secs_f64() -> f64 { + use std::time::{SystemTime, UNIX_EPOCH}; + + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs_f64()) + .unwrap_or(0.0) +}