Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ log = "0.4"
logforth = { version = "0.29", features = ["starter-log", "layout-text", "diagnostic-task-local"] }
libc = "0.2"
memmap2 = "0.9"
metrics = "0.24.6"
mio = "=1.1.1"
mockall = "0.13.1"
nvtx = "1.3.0"
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Organized by domain (model line / subsystem / playbook / lesson) instead of by l
| `subsystems/frontend/simulated-inference-engine.md` | CPU-only simulated model crate for vLLM/OpenAI frontend and `vllm bench serve` validation without CUDA, real model weights, or real-model performance claims. |
| `subsystems/frontend/cpu-profiling-baseline.md` | Frontend CPU profiling baseline using `openinfer-sim` with fixed TTFT=5ms/TPOT=12ms: 200 req / concurrency=16 shows ~150ms TTFT overhead (no dominant hotspot), heap allocation ~10%, stream polling ~7.5%, IPC ~1%; reproducible benchmark command and perf evidence documented. |
| `subsystems/frontend/startup-time.md` | Qwen3-4B warm startup-to-ready 3.25s → ~1.45s: frontend tokenizer load runs concurrently with the engine load (HTTP still binds only after the engine registers), and the source safetensors mmap is kept alive to dodge ~0.4s of munmap stalling the next cudaMalloc. |
| `subsystems/frontend/telemetry-system.md` | Lightweight telemetry: Grafana/Prometheus `/metrics`, optional OTLP payload sink, opt-in request logs/traces, sparse `tracing` spans, and a small `metrics` facade at the engine boundary. |

## subsystems / correctness

Expand Down
178 changes: 178 additions & 0 deletions docs/subsystems/frontend/telemetry-system.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions openinfer-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version = "0.1.0"
edition = "2024"

[dependencies]
metrics = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt"] }
Expand Down
232 changes: 230 additions & 2 deletions openinfer-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,65 @@ pub type RequestTag = Arc<str>;
pub type TokenStreamSender = mpsc::UnboundedSender<(RequestTag, TokenEvent)>;
pub type TokenStreamReceiver = mpsc::UnboundedReceiver<(RequestTag, TokenEvent)>;

// Request tracing level policy:
// - DEBUG span/event: successful request lifecycle, useful while debugging.
// - WARN terminal event: rejected requests.
// - ERROR terminal event: execution failures.
// High-volume success counts and token totals belong to `metrics`; the shared
// serving path intentionally does not emit per-token tracing events.
macro_rules! request_span {
($request_id:expr) => {
tracing::debug_span!(
"openinfer.request",
request_id = $request_id,
queued_at_unix_s = tracing::field::Empty,
scheduled_at_unix_s = tracing::field::Empty,
terminal_at_unix_s = tracing::field::Empty,
finish_reason = tracing::field::Empty,
prompt_tokens = tracing::field::Empty,
cached_prompt_tokens = tracing::field::Empty,
completion_tokens = tracing::field::Empty,
)
};
}

macro_rules! request_scheduled_event {
($span:expr, $queued_at_unix_s:expr, $scheduled_at_unix_s:expr, $prompt_tokens:expr, $cached_tokens:expr) => {
tracing::debug!(
parent: $span,
queued_at_unix_s = $queued_at_unix_s,
scheduled_at_unix_s = $scheduled_at_unix_s,
prompt_tokens = $prompt_tokens as u64,
cached_prompt_tokens = $cached_tokens as u64,
"openinfer_request_scheduled"
)
};
}

macro_rules! request_terminal_event {
($level:ident, $span:expr, $terminal_at_unix_s:expr, $finish_reason:expr, $prompt_tokens:expr, $completion_tokens:expr) => {
tracing::$level!(
parent: $span,
terminal_at_unix_s = $terminal_at_unix_s,
finish_reason = $finish_reason,
prompt_tokens = $prompt_tokens as u64,
completion_tokens = $completion_tokens as u64,
"openinfer_request_finished"
)
};
($level:ident, $span:expr, $terminal_at_unix_s:expr, $finish_reason:expr, $prompt_tokens:expr, $completion_tokens:expr, $message:expr) => {
tracing::$level!(
parent: $span,
terminal_at_unix_s = $terminal_at_unix_s,
finish_reason = $finish_reason,
prompt_tokens = $prompt_tokens as u64,
completion_tokens = $completion_tokens as u64,
message = $message.as_str(),
"openinfer_request_finished"
)
};
}

/// Per-request handle the scheduler holds to emit [`TokenEvent`]s.
///
/// Drop-in for the former `UnboundedSender<TokenEvent>`: it keeps the same
Expand All @@ -203,11 +262,18 @@ pub struct TokenSink {
tag: RequestTag,
tx: TokenStreamSender,
cancelled: Arc<AtomicBool>,
span: tracing::Span,
}

impl TokenSink {
pub fn new(tag: RequestTag, tx: TokenStreamSender, cancelled: Arc<AtomicBool>) -> Self {
Self { tag, tx, cancelled }
let span = request_span!(tag.as_ref());
Self {
tag,
tx,
cancelled,
span,
}
}

/// Emit one event for this request. Returns `Err` (handing the event back)
Expand All @@ -219,6 +285,7 @@ impl TokenSink {
if self.cancelled.load(Ordering::Acquire) {
return Err(mpsc::error::SendError(event));
}
self.observe_event(&event);
self.tx.send((self.tag.clone(), event)).map_err(|err| {
let (_, event) = err.0;
mpsc::error::SendError(event)
Expand All @@ -235,6 +302,155 @@ impl TokenSink {
&self.tag
}

fn observe_event(&self, event: &TokenEvent) {
match event {
TokenEvent::Scheduled {
queued_at_unix_s,
scheduled_at_unix_s,
prompt_tokens,
cached_tokens,
} => {
metrics::counter!("openinfer_engine_requests_scheduled_total").increment(1);
metrics::counter!("openinfer_engine_cached_prompt_tokens_total")
.increment(*cached_tokens as u64);
metrics::histogram!("openinfer_engine_queue_wait_ms")
.record((scheduled_at_unix_s - queued_at_unix_s).max(0.0) * 1_000.0);
self.span.record("queued_at_unix_s", *queued_at_unix_s);
self.span
.record("scheduled_at_unix_s", *scheduled_at_unix_s);
self.span.record("prompt_tokens", *prompt_tokens as u64);
self.span
.record("cached_prompt_tokens", *cached_tokens as u64);
request_scheduled_event!(
&self.span,
*queued_at_unix_s,
*scheduled_at_unix_s,
*prompt_tokens,
*cached_tokens
);
}
TokenEvent::Token { .. } | TokenEvent::PromptTokens { .. } => {}
TokenEvent::Finished {
finish_reason,
prompt_tokens,
completion_tokens,
} => {
let terminal_at_unix_s = unix_now_s();
let finish_reason = finish_reason_label(*finish_reason);
metrics::counter!(
"openinfer_engine_requests_finished_total",
"outcome" => finish_reason
)
.increment(1);
metrics::counter!("openinfer_engine_prompt_tokens_total")
.increment(*prompt_tokens as u64);
metrics::counter!("openinfer_engine_completion_tokens_total")
.increment(*completion_tokens as u64);
self.record_terminal_fields(
terminal_at_unix_s,
finish_reason,
*prompt_tokens,
*completion_tokens,
);
if finish_reason == "error" {
request_terminal_event!(
error,
&self.span,
terminal_at_unix_s,
finish_reason,
*prompt_tokens,
*completion_tokens
);
} else {
request_terminal_event!(
debug,
&self.span,
terminal_at_unix_s,
finish_reason,
*prompt_tokens,
*completion_tokens
);
}
}
TokenEvent::Error {
message,
prompt_tokens,
completion_tokens,
} => {
let terminal_at_unix_s = unix_now_s();
metrics::counter!(
"openinfer_engine_requests_finished_total",
"outcome" => "error"
)
.increment(1);
metrics::counter!("openinfer_engine_prompt_tokens_total")
.increment(*prompt_tokens as u64);
metrics::counter!("openinfer_engine_completion_tokens_total")
.increment(*completion_tokens as u64);
self.record_terminal_fields(
terminal_at_unix_s,
"error",
*prompt_tokens,
*completion_tokens,
);
request_terminal_event!(
error,
&self.span,
terminal_at_unix_s,
"error",
*prompt_tokens,
*completion_tokens,
message
);
}
TokenEvent::Rejected {
message,
prompt_tokens,
completion_tokens,
} => {
let terminal_at_unix_s = unix_now_s();
metrics::counter!(
"openinfer_engine_requests_finished_total",
"outcome" => "rejected"
)
.increment(1);
metrics::counter!("openinfer_engine_prompt_tokens_total")
.increment(*prompt_tokens as u64);
metrics::counter!("openinfer_engine_completion_tokens_total")
.increment(*completion_tokens as u64);
self.record_terminal_fields(
terminal_at_unix_s,
"rejected",
*prompt_tokens,
*completion_tokens,
);
request_terminal_event!(
warn,
&self.span,
terminal_at_unix_s,
"rejected",
*prompt_tokens,
*completion_tokens,
message
);
}
}
}

fn record_terminal_fields(
&self,
terminal_at_unix_s: f64,
finish_reason: &'static str,
prompt_tokens: usize,
completion_tokens: usize,
) {
self.span.record("terminal_at_unix_s", terminal_at_unix_s);
self.span.record("finish_reason", finish_reason);
self.span.record("prompt_tokens", prompt_tokens as u64);
self.span
.record("completion_tokens", completion_tokens as u64);
}

/// A sink backed by its own private channel, for direct drivers
/// (benchmarks, integration tests, the simulator) that consume one
/// request's events without the shared frontend demux. The returned
Expand All @@ -246,6 +462,14 @@ impl TokenSink {
}
}

fn finish_reason_label(reason: FinishReason) -> &'static str {
match reason {
FinishReason::Length => "length",
FinishReason::Stop => "stop",
FinishReason::Error => "error",
}
}

/// Seconds since `UNIX_EPOCH` as `f64` — the clock base for `TokenEvent`
/// timestamps.
pub fn unix_now_s() -> f64 {
Expand Down Expand Up @@ -371,7 +595,7 @@ impl EngineHandle {
&self,
req: GenerateRequest,
) -> std::result::Result<(), mpsc::error::SendError<GenerateRequest>> {
match self.inner.submit_tx.as_ref() {
let result = match self.inner.submit_tx.as_ref() {
Some(submit_tx) => submit_tx.send(req),
None => match self.inner.command_tx.as_ref() {
Some(command_tx) => command_tx
Expand All @@ -382,7 +606,11 @@ impl EngineHandle {
}),
None => Err(mpsc::error::SendError(req)),
},
};
if result.is_ok() {
metrics::counter!("openinfer_engine_requests_submitted_total").increment(1);
}
result
}

pub fn supports_lora_control(&self) -> bool {
Expand Down
Loading