diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index d42762f3..ced5b164 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -726,6 +726,18 @@ if(DFLASH27B_TESTS) else() target_link_libraries(dflash_server PRIVATE hip::host) endif() + + # Copy share/status.html next to the binary so it can be found at runtime. + add_custom_command(TARGET dflash_server POST_BUILD + COMMAND ${CMAKE_COMMAND} -E make_directory + "$/share" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "${CMAKE_CURRENT_SOURCE_DIR}/share/status.html" + "$/share/status.html" + COMMENT "Copying status.html to build/share/" + ) + install(FILES "${CMAKE_CURRENT_SOURCE_DIR}/share/status.html" + DESTINATION share) endif() if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/src/ipc/backend_ipc_main.cpp") diff --git a/server/share/status.html b/server/share/status.html new file mode 100644 index 00000000..dbfe6f3a --- /dev/null +++ b/server/share/status.html @@ -0,0 +1,343 @@ + + + + + +DFlash Server Status + + + +

⚡ DFlash Server Status + disconnected +

+ +
+

Current Request

+
+ idle + + 0 + total requests + +
+ +
+ + + +
+

Prefill Performance

+
+
+
Prefill tok/s
+
+
+ +
+

Decode Performance

+
+
+
Decode tok/s
+
Accept Rate %
+
+
+ + + + diff --git a/server/src/common/dflash_spec_decode.cpp b/server/src/common/dflash_spec_decode.cpp index 141e45e9..3b075493 100644 --- a/server/src/common/dflash_spec_decode.cpp +++ b/server/src/common/dflash_spec_decode.cpp @@ -169,6 +169,11 @@ bool run_dflash_spec_decode( } } + // Notify observer with draft tokens for this step. + if (io.observer) { + io.observer("draft", draft_tok); + } + // ── Verify pass: speculative target forward over q_len tokens ──── if (!target.snapshot_kv()) { std::fprintf(stderr, "dflash-spec snapshot_kv failed\n"); @@ -234,6 +239,12 @@ bool run_dflash_spec_decode( n_generated += emitted; n_accept_sum += std::min(accept_n, emitted); n_draft_steps++; + + // Notify observer with accepted tokens for this step. + if (io.observer) { + io.observer("verify", replay_tok); + } + if (io.cancelled) break; if (hit_eos) break; } diff --git a/server/src/common/model_backend.h b/server/src/common/model_backend.h index b808d0c3..af5f6ba7 100644 --- a/server/src/common/model_backend.h +++ b/server/src/common/model_backend.h @@ -27,6 +27,14 @@ namespace dflash::common { // Return true to continue generation, false to abort. using TokenCallback = std::function; +// Inference observer callback for live status updates. Called by backends +// at each spec-decode step to report phase/detail. When empty, backends +// skip the call (zero overhead). +// phase: "draft", "verify", "accept", "prefill_chunk" +// detail: JSON string with step-specific data +using InferenceObserver = std::function & tokens)>; + // ─── I/O handle passed to backend methods that need protocol output ───── struct DaemonIO { int stream_fd = -1; @@ -37,6 +45,10 @@ struct DaemonIO { TokenCallback on_token; mutable bool cancelled = false; + // Optional inference observer for /status page. When set, backends call + // this at each spec-decode step with draft tokens and phase info. + InferenceObserver observer; + // Write a single int32 to the stream fd (token or -1 sentinel). // Also invokes on_token if set. Sets cancelled=true if on_token // returns false (client disconnected). diff --git a/server/src/qwen35/qwen35_backend.cpp b/server/src/qwen35/qwen35_backend.cpp index 4a3d9674..1c442d83 100644 --- a/server/src/qwen35/qwen35_backend.cpp +++ b/server/src/qwen35/qwen35_backend.cpp @@ -1315,6 +1315,11 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, } } + // Notify observer with draft tokens for this step. + if (io.observer) { + io.observer("draft", draft_tok); + } + // 4. Verify: snapshot KV, run target forward over draft tokens if (!target->snapshot_kv()) { step_graph_destroy(draft_sg); @@ -1391,6 +1396,12 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, n_generated += emitted; n_accept_sum += std::min(accept_n, emitted); n_draft_steps++; + + // Notify observer with accepted tokens for this step. + if (io.observer) { + io.observer("verify", replay_tok); + } + if (io.cancelled) break; if (hit_eos) break; } diff --git a/server/src/server/http_server.cpp b/server/src/server/http_server.cpp index e751cad7..0cd5f2a8 100644 --- a/server/src/server/http_server.cpp +++ b/server/src/server/http_server.cpp @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include #include @@ -20,6 +22,7 @@ #include #include #include +#include #include namespace dflash::common { @@ -44,6 +47,9 @@ static constexpr char kServerName[] = "luce-dflash"; static const std::vector kApiEndpoints = { "GET /health", "GET /props", + "GET /status", + "GET /status/events", + "GET /status/json", "GET /v1/models", "POST /v1/chat/completions", "POST /v1/messages", @@ -465,6 +471,136 @@ HttpServer::HttpServer(ModelBackend & backend, config.disk_cache_cold_max_tokens}, backend) { disk_cache_.init(); + status_html_path_ = resolve_status_html(); +} + +// Resolve path to share/status.html at startup. +std::string HttpServer::resolve_status_html() { + // 1. DFLASH_SHARE_DIR env var + if (const char * dir = std::getenv("DFLASH_SHARE_DIR")) { + std::string path = std::string(dir) + "/status.html"; + struct stat st; + if (::stat(path.c_str(), &st) == 0) return path; + } + // 2. share/ relative to /proc/self/exe (build dir or installed prefix) + char exe_buf[1024] = {}; + ssize_t len = ::readlink("/proc/self/exe", exe_buf, sizeof(exe_buf) - 1); + if (len > 0) { + exe_buf[len] = '\0'; + std::string exe_dir(exe_buf); + auto slash = exe_dir.rfind('/'); + if (slash != std::string::npos) { + exe_dir = exe_dir.substr(0, slash); + // 2a. /share/status.html (build directory layout) + { + std::string path = exe_dir + "/share/status.html"; + struct stat st; + if (::stat(path.c_str(), &st) == 0) return path; + } + // 2b. /../share/status.html (installed prefix layout) + { + std::string path = exe_dir + "/../share/status.html"; + struct stat st; + if (::stat(path.c_str(), &st) == 0) return path; + } + } + } + // 3. ./share/status.html (development) + { + struct stat st; + if (::stat("share/status.html", &st) == 0) return "share/status.html"; + } + return {}; +} + +// Send data to an SSE client fd with a short (1s) timeout to avoid stalling +// the inference worker. Returns false if the send fails or times out. +static bool sse_try_send(int fd, const void * data, size_t len) { + const char * p = static_cast(data); + size_t sent = 0; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1); + while (sent < len) { + auto remaining = std::chrono::duration_cast( + deadline - std::chrono::steady_clock::now()).count(); + if (remaining <= 0) return false; + + struct pollfd pfd = {fd, POLLOUT, 0}; + int ret; + do { + ret = poll(&pfd, 1, static_cast(std::min(remaining, (long)50))); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) return false; + if (ret == 0) continue; + + ssize_t n = ::send(fd, p + sent, len - sent, MSG_NOSIGNAL); + if (n < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; + return false; + } + sent += n; + } + return true; +} + +// Broadcast current status as SSE event to all connected /status/events clients. +void HttpServer::broadcast_status() { + std::string event = status_.to_sse_event(); + std::lock_guard lk(sse_mu_); + std::vector dead; + for (int fd : sse_fds_) { + if (!sse_try_send(fd, event.data(), event.size())) { + dead.push_back(fd); + } + } + for (int fd : dead) { + ::close(fd); + sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), + sse_fds_.end()); + } +} + +// Broadcast a token text delta as an incremental SSE event. +void HttpServer::broadcast_token(const std::string & text) { + // Token text may contain incomplete UTF-8 (tokens can split multi-byte + // codepoints). Manually build the SSE payload with json string escaping + // that replaces invalid UTF-8 with U+FFFD instead of throwing. + json j; + j["text"] = text; + std::string event = "event: token\ndata: " + + j.dump(-1, ' ', false, json::error_handler_t::replace) + "\n\n"; + std::lock_guard lk(sse_mu_); + std::vector dead; + for (int fd : sse_fds_) { + if (!sse_try_send(fd, event.data(), event.size())) { + dead.push_back(fd); + } + } + for (int fd : dead) { + ::close(fd); + sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), + sse_fds_.end()); + } +} + +// Send an SSE comment as a heartbeat to detect disconnected clients when idle. +// Uses non-blocking sends to avoid stalling the worker thread on slow clients. +void HttpServer::sse_heartbeat() { + static const char ping[] = ":heartbeat\n\n"; + std::lock_guard lk(sse_mu_); + std::vector dead; + for (int fd : sse_fds_) { + // Non-blocking send: if the socket buffer can't accept 12 bytes + // immediately, the client is too far behind — treat as dead. + ssize_t n = ::send(fd, ping, sizeof(ping) - 1, MSG_NOSIGNAL | MSG_DONTWAIT); + if (n <= 0) { + dead.push_back(fd); + } + } + for (int fd : dead) { + ::close(fd); + sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), + sse_fds_.end()); + } } HttpServer::~HttpServer() { @@ -483,6 +619,13 @@ void HttpServer::shutdown() { worker_thread_.join(); } + // Close SSE client connections. + { + std::lock_guard lk(sse_mu_); + for (int fd : sse_fds_) ::close(fd); + sse_fds_.clear(); + } + // Drain any pending jobs. { std::lock_guard lk(queue_mu_); @@ -679,6 +822,61 @@ void HttpServer::handle_client(int fd) { return; } + // Status page: serve HTML file from disk. + if (hr.method == "GET" && hr.path == "/status") { + if (status_html_path_.empty()) { + send_error(fd, 404, + "status.html not found. Set DFLASH_SHARE_DIR or place it in share/status.html"); + ::close(fd); + return; + } + std::ifstream ifs(status_html_path_); + if (!ifs.is_open()) { + send_error(fd, 500, "failed to open status.html"); + ::close(fd); + return; + } + std::ostringstream oss; + oss << ifs.rdbuf(); + send_response(fd, 200, "text/html; charset=utf-8", oss.str()); + ::close(fd); + return; + } + + // Status JSON snapshot (for non-SSE clients / debugging). + if (hr.method == "GET" && hr.path == "/status/json") { + send_response(fd, 200, "application/json", + status_.to_json().dump(-1, ' ', false, json::error_handler_t::replace) + "\n"); + ::close(fd); + return; + } + + // Status SSE stream: hold connection open and push updates. + if (hr.method == "GET" && hr.path == "/status/events") { + // Send SSE headers. + const char * headers = + "HTTP/1.1 200 OK\r\n" + "Content-Type: text/event-stream\r\n" + "Cache-Control: no-cache\r\n" + "Connection: keep-alive\r\n" + "Access-Control-Allow-Origin: *\r\n" + "\r\n"; + if (!send_all(fd, headers, std::strlen(headers))) { + ::close(fd); + return; + } + // Send initial state immediately. + std::string initial = status_.to_sse_event(); + send_all(fd, initial.data(), initial.size()); + // Register for future broadcasts. The fd is NOT closed here — it stays + // open until the client disconnects (detected on next broadcast send). + { + std::lock_guard lk(sse_mu_); + sse_fds_.push_back(fd); + } + return; // Do NOT close fd — it's now owned by the SSE broadcast loop. + } + // Models endpoint. if (hr.method == "GET" && hr.path == "/v1/models") { // Codex sends ?client_version= — serve the Codex-specific schema. @@ -1114,6 +1312,37 @@ void HttpServer::worker_loop() { const auto & req = job->req; auto started_at = std::chrono::steady_clock::now(); + // Track live status for /status page. RAII guard ensures idle on all paths. + std::string prompt_excerpt; + if (!req.prompt_tokens.empty()) { + // Decode first ~40 tokens as a prompt excerpt (cheap, bounded). + const int excerpt_len = std::min((int)req.prompt_tokens.size(), 40); + std::vector excerpt_toks(req.prompt_tokens.begin(), + req.prompt_tokens.begin() + excerpt_len); + prompt_excerpt = tokenizer_.decode(excerpt_toks); + if (prompt_excerpt.size() > 200) prompt_excerpt.resize(200); + } + { + ServerStatus::RequestInfo info; + info.model = req.model; + info.format = api_format_name(req.format); + info.session_id = req.session_id; + info.max_output = req.max_output; + info.temperature = req.sampler.temp; + info.top_p = req.sampler.top_p; + info.top_k = req.sampler.top_k; + info.thinking_enabled = req.thinking_enabled; + status_.set_running(prompt_excerpt, (int)req.prompt_tokens.size(), req.stream, info); + } + // Store messages JSON for request inspection (truncate to avoid huge payloads). + if (!req.messages.is_null()) { + std::string msg_str = req.messages.dump(); + if (msg_str.size() > 4096) msg_str.resize(4096); + status_.set_messages(msg_str); + } + broadcast_status(); + StatusGuard status_guard{status_}; + auto finish_job = [&]() { std::lock_guard lk(job->mu); job->done = true; @@ -1440,10 +1669,25 @@ void HttpServer::worker_loop() { snap_slot, snap_cut); + // Update status page with cache/pflash/spec-decode flags. + status_.set_flags(using_restore, pflash_compressed, !config_.draft_path.empty()); + broadcast_status(); + // Set up DaemonIO with on_token callback for streaming + disconnect. DaemonIO io; io.stream_fd = -1; // no pipe — we write SSE directly + // Inference observer: updates status page with draft tokens per step. + io.observer = [&](const char * phase, const std::vector & tokens) { + std::vector token_strs; + token_strs.reserve(tokens.size()); + for (int32_t t : tokens) { + token_strs.push_back(tokenizer_.token_text(t)); + } + status_.set_draft_tokens(token_strs); + broadcast_status(); + }; + int completion_tokens = 0; bool client_disconnected = false; @@ -1451,6 +1695,12 @@ void HttpServer::worker_loop() { if (client_disconnected) return false; completion_tokens++; + // Update status page every 10 tokens (low overhead). + if (completion_tokens % 10 == 0) { + status_.update_completion_tokens(completion_tokens); + broadcast_status(); + } + // Skip EOS/EOT/special tokens — don't forward to SSE. int32_t eos = tokenizer_.eos_id(); int32_t eot = tokenizer_.eos_chat_id(); @@ -1460,6 +1710,7 @@ void HttpServer::worker_loop() { // Gemma4 thinking channel: map <|channel> → , \n if (raw == "<|channel>") { + broadcast_token(""); if (req.stream) { auto chunks = emitter.emit_token(""); for (const auto & chunk : chunks) @@ -1468,6 +1719,7 @@ void HttpServer::worker_loop() { return true; } if (raw == "") { + broadcast_token("\n"); if (req.stream) { auto chunks = emitter.emit_token("\n"); for (const auto & chunk : chunks) @@ -1484,6 +1736,7 @@ void HttpServer::worker_loop() { // reasoning_content with empty visible content. Forward the text // form into the emitter so parse_reasoning() can split correctly. if (raw == "" || raw == "") { + broadcast_token(raw == "" ? "\n" : ""); if (req.stream) { auto chunks = emitter.emit_token( raw == "" ? "\n" : ""); @@ -1502,6 +1755,11 @@ void HttpServer::worker_loop() { std::string text = tokenizer_.token_text(token); + // Send token text to status page clients (browser accumulates). + if (!text.empty()) { + broadcast_token(text); + } + if (req.stream && !text.empty()) { auto chunks = emitter.emit_token(text); for (const auto & chunk : chunks) { @@ -1534,6 +1792,10 @@ void HttpServer::worker_loop() { backend_.unpark("draft"); // reload decode draft (~3.3 GB) } + // Transition status to decode phase. + status_.set_decode(); + broadcast_status(); + GenerateResult result; if (using_restore) { result = backend_.restore_and_generate_with_empty_spec_fallback(cache_slot, gen_req, io); @@ -1630,6 +1892,31 @@ void HttpServer::worker_loop() { // message_delta usage, Responses response.completed usage). // See docs/specs/thinking-budget.md §6.3. GenTimings gen_timings{ result.prefill_s, result.decode_s }; + + // Record performance for /status page. + if (result.ok) { + PerfRecord perf; + perf.prompt_tokens = (int)req.prompt_tokens.size(); + perf.completion_tokens = completion_tokens; + // Use actual prefilled token count: on cache hit the backend only + // prefills the delta beyond the cached prefix, so dividing the full + // prompt size by delta time would be wrong. + const int prefill_tokens = using_restore + ? std::max(0, (int)effective_prompt.size() - prefix_len) + : (int)effective_prompt.size(); + perf.prefill_tok_s = (result.prefill_s > 0.0) + ? (double)prefill_tokens / result.prefill_s : 0.0; + perf.decode_tok_s = (result.decode_s > 0.0) + ? (double)completion_tokens / result.decode_s : 0.0; + perf.accept_rate = result.accept_rate; + perf.cache_hit = using_restore; + perf.pflash = pflash_compressed; + perf.spec_decode = result.spec_decode_ran; + perf.timestamp = std::chrono::steady_clock::now(); + status_.record_perf(perf); + status_.update_completion_tokens(completion_tokens); + broadcast_status(); + } if (req.stream && !client_disconnected) { auto final_chunks = emitter.emit_finish(completion_tokens, &gen_timings); for (const auto & chunk : final_chunks) { @@ -1961,7 +2248,15 @@ void HttpServer::enqueue(ServerJob * job) { ServerJob * HttpServer::dequeue() { std::unique_lock lk(queue_mu_); - queue_cv_.wait(lk, [this]() { return queue_head_ != nullptr || stopping_.load(); }); + // Use timed wait so the worker periodically wakes to send SSE heartbeats. + while (!queue_head_ && !stopping_.load()) { + if (queue_cv_.wait_for(lk, std::chrono::seconds(30)) == std::cv_status::timeout) { + // Send SSE heartbeat (comment line) to detect disconnected clients. + lk.unlock(); + sse_heartbeat(); + lk.lock(); + } + } if (!queue_head_) return nullptr; ServerJob * j = queue_head_; queue_head_ = j->next; diff --git a/server/src/server/http_server.h b/server/src/server/http_server.h index 71c544ac..a33578ce 100644 --- a/server/src/server/http_server.h +++ b/server/src/server/http_server.h @@ -23,6 +23,7 @@ #include "common/pflash_drafter_ipc.h" #include "model_card.h" #include "adaptive_keep_ratio.h" +#include "server_status.h" #include #include @@ -289,6 +290,26 @@ class HttpServer { // Per-session adaptive keep_ratio bandit state. HttpServerSessions sessions_; + // Live status tracker (read by /status/json, written by worker thread). + ServerStatus status_; + + // SSE client connections for /status/events push. + std::mutex sse_mu_; + std::vector sse_fds_; + + // Broadcast current status to all SSE clients. Removes dead fds. + void broadcast_status(); + + // Broadcast incremental token text to SSE clients. + void broadcast_token(const std::string & text); + + // Send SSE heartbeat comment to prune disconnected clients. + void sse_heartbeat(); + + // Resolve and cache path to share/status.html. + std::string status_html_path_; + std::string resolve_status_html(); + // Track prompt tokens for each snapshot slot (for shutdown save). std::unordered_map> slot_tokens_; diff --git a/server/src/server/server_status.h b/server/src/server/server_status.h new file mode 100644 index 00000000..98794e91 --- /dev/null +++ b/server/src/server/server_status.h @@ -0,0 +1,241 @@ +// Server status tracking for the /status introspection page. +// +// Thread-safe status tracker: worker thread writes, HTTP client threads read. +// Designed for minimal overhead on the inference hot path. + +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace dflash::common { + +using json = nlohmann::json; + +// Performance record for one completed request. +struct PerfRecord { + double prefill_tok_s = 0.0; + double decode_tok_s = 0.0; + float accept_rate = 0.0f; + int prompt_tokens = 0; + int completion_tokens = 0; + bool cache_hit = false; + bool pflash = false; + bool spec_decode = false; + std::chrono::steady_clock::time_point timestamp; +}; + +// Live inference phase. +enum class InferencePhase { + IDLE, + PREFILL, + DECODE, +}; + +static inline const char * phase_name(InferencePhase p) { + switch (p) { + case InferencePhase::IDLE: return "idle"; + case InferencePhase::PREFILL: return "prefill"; + case InferencePhase::DECODE: return "decode"; + default: return "unknown"; + } +} + +class ServerStatus { +public: + static constexpr int kMaxHistory = 50; + + // Request details passed at set_running time. + struct RequestInfo { + std::string model; + std::string format; // "chat", "anthropic", "responses" + std::string session_id; + int max_output = 0; + float temperature = 0.0f; + float top_p = 1.0f; + int top_k = 0; + bool thinking_enabled = false; + }; + + // Called by worker thread to update live state. + void set_running(const std::string & prompt_excerpt, int prompt_tokens, + bool is_stream, const RequestInfo & info) { + std::lock_guard lk(mu_); + phase_ = InferencePhase::PREFILL; + prompt_excerpt_ = prompt_excerpt; + prompt_tokens_ = prompt_tokens; + completion_tokens_ = 0; + is_stream_ = is_stream; + draft_tokens_.clear(); + request_info_ = info; + cache_hit_ = false; + pflash_ = false; + spec_decode_ = false; + started_at_ = std::chrono::steady_clock::now(); + } + + void set_messages(const std::string & messages_json) { + std::lock_guard lk(mu_); + messages_json_ = messages_json; + } + + void set_decode() { + std::lock_guard lk(mu_); + phase_ = InferencePhase::DECODE; + } + + void set_flags(bool cache_hit, bool pflash, bool spec_decode) { + std::lock_guard lk(mu_); + cache_hit_ = cache_hit; + pflash_ = pflash; + spec_decode_ = spec_decode; + } + + void update_completion_tokens(int n) { + std::lock_guard lk(mu_); + completion_tokens_ = n; + } + + void set_draft_tokens(const std::vector & tokens) { + std::lock_guard lk(mu_); + draft_tokens_ = tokens; + } + + void set_idle() { + std::lock_guard lk(mu_); + phase_ = InferencePhase::IDLE; + prompt_excerpt_.clear(); + draft_tokens_.clear(); + } + + void record_perf(const PerfRecord & rec) { + std::lock_guard lk(mu_); + if ((int)perf_history_.size() >= kMaxHistory) { + perf_history_.erase(perf_history_.begin()); + } + perf_history_.push_back(rec); + total_requests_++; + } + + // Snapshot current state as JSON (thread-safe). + json to_json() const { + InferencePhase phase; + std::string prompt_excerpt; + int prompt_tokens = 0; + int completion_tokens = 0; + bool is_stream = false; + std::vector draft_tokens; + std::vector history; + int total_requests = 0; + double elapsed_s = 0.0; + RequestInfo info; + bool cache_hit = false, pflash = false, spec_decode = false; + std::string messages_json; + + { + std::lock_guard lk(mu_); + phase = phase_; + prompt_excerpt = prompt_excerpt_; + prompt_tokens = prompt_tokens_; + completion_tokens = completion_tokens_; + is_stream = is_stream_; + draft_tokens = draft_tokens_; + history = perf_history_; + total_requests = total_requests_; + info = request_info_; + cache_hit = cache_hit_; + pflash = pflash_; + spec_decode = spec_decode_; + messages_json = messages_json_; + if (phase != InferencePhase::IDLE) { + elapsed_s = std::chrono::duration( + std::chrono::steady_clock::now() - started_at_).count(); + } + } + + json j; + j["phase"] = phase_name(phase); + j["total_requests"] = total_requests; + + if (phase != InferencePhase::IDLE) { + j["current"] = { + {"prompt_excerpt", prompt_excerpt}, + {"prompt_tokens", prompt_tokens}, + {"completion_tokens", completion_tokens}, + {"stream", is_stream}, + {"elapsed_s", elapsed_s}, + {"draft_tokens", draft_tokens}, + {"model", info.model}, + {"format", info.format}, + {"max_output", info.max_output}, + {"temperature", info.temperature}, + {"top_p", info.top_p}, + {"top_k", info.top_k}, + {"thinking_enabled", info.thinking_enabled}, + {"session_id", info.session_id}, + {"cache_hit", cache_hit}, + {"pflash", pflash}, + {"spec_decode", spec_decode}, + {"messages", messages_json}, + }; + } else { + j["current"] = nullptr; + } + + json perf = json::array(); + for (const auto & r : history) { + perf.push_back({ + {"prefill_tok_s", r.prefill_tok_s}, + {"decode_tok_s", r.decode_tok_s}, + {"accept_rate", r.accept_rate}, + {"prompt_tokens", r.prompt_tokens}, + {"completion_tokens", r.completion_tokens}, + {"cache_hit", r.cache_hit}, + {"pflash", r.pflash}, + {"spec_decode", r.spec_decode}, + }); + } + j["perf_history"] = perf; + + return j; + } + + std::string to_sse_event() const { + std::string data = to_json().dump(-1, ' ', false, json::error_handler_t::replace); + return "event: status\ndata: " + data + "\n\n"; + } + +private: + mutable std::mutex mu_; + + // Live state. + InferencePhase phase_ = InferencePhase::IDLE; + std::string prompt_excerpt_; + int prompt_tokens_ = 0; + int completion_tokens_ = 0; + bool is_stream_ = false; + std::vector draft_tokens_; + std::chrono::steady_clock::time_point started_at_; + RequestInfo request_info_; + bool cache_hit_ = false; + bool pflash_ = false; + bool spec_decode_ = false; + std::string messages_json_; + + // History. + std::vector perf_history_; + int total_requests_ = 0; +}; + +// RAII guard that resets status to idle on scope exit. +struct StatusGuard { + ServerStatus & status; + ~StatusGuard() { status.set_idle(); } +}; + +} // namespace dflash::common