diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index d6f2b0db6..246ed58bb 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -700,6 +700,7 @@ if(DFLASH27B_TESTS) # ─── dflash_server: native C++ HTTP server ───────────────────────── if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/src/server/server_main.cpp") + find_package(CURL REQUIRED) add_executable(dflash_server src/server/server_main.cpp src/server/http_server.cpp @@ -713,7 +714,7 @@ if(DFLASH27B_TESTS) DFLASH27B_BACKEND_CUDA=1 DFLASH27B_CUDA_MIN_SM=${_dflash_cuda_min_sm}) endif() - target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread) + target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread CURL::libcurl) if(DFLASH27B_GPU_BACKEND STREQUAL "cuda") find_package(CUDAToolkit REQUIRED) target_link_libraries(dflash_server PRIVATE CUDA::cudart) @@ -780,7 +781,7 @@ if(DFLASH27B_TESTS) DFLASH27B_BACKEND_CUDA=1 DFLASH27B_CUDA_MIN_SM=${_dflash_cuda_min_sm}) endif() - target_link_libraries(test_server_unit PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET}) + target_link_libraries(test_server_unit PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} CURL::libcurl) if(DFLASH27B_GPU_BACKEND STREQUAL "cuda") find_package(CUDAToolkit REQUIRED) target_link_libraries(test_server_unit PRIVATE CUDA::cudart) diff --git a/server/src/qwen3/qwen3_drafter.cpp b/server/src/qwen3/qwen3_drafter.cpp index 4cf35431a..ff230788d 100644 --- a/server/src/qwen3/qwen3_drafter.cpp +++ b/server/src/qwen3/qwen3_drafter.cpp @@ -126,7 +126,16 @@ bool load_drafter(const std::string & gguf_path, int /*gpu_layers*/, bool load_drafter(const std::string & gguf_path, int /*gpu_layers*/, int gpu, DrafterContext & out) { - return load_drafter(gguf_path, /*gpu_layers=*/999, DrafterArch::Qwen3_0p6b, gpu, out); + DrafterArch arch = DrafterArch::Qwen3_0p6b; + { + std::string lower = gguf_path; + for (auto & c : lower) c = (char)std::tolower((unsigned char)c); + if (lower.find("qwen3.5") != std::string::npos || + lower.find("qwen35") != std::string::npos) { + arch = DrafterArch::Qwen35_0p8b; + } + } + return load_drafter(gguf_path, /*gpu_layers=*/999, arch, gpu, out); } bool load_drafter(const std::string & gguf_path, int /*gpu_layers*/, diff --git a/server/src/server/http_server.cpp b/server/src/server/http_server.cpp index ab37805bf..ac565ff05 100644 --- a/server/src/server/http_server.cpp +++ b/server/src/server/http_server.cpp @@ -7,6 +7,8 @@ #include "sse_emitter.h" #include "tool_hint.h" +#include + #include #include #include @@ -24,6 +26,218 @@ namespace dflash::common { +// ─── piecewise keep-ratio curve ───────────────────────────────────────── + +static float pflash_keep_ratio(const ServerConfig & cfg, int n_tokens) { + if (cfg.pflash_curve.empty()) return cfg.pflash_keep_ratio; + const auto & curve = cfg.pflash_curve; + if (n_tokens <= curve.front().first) return curve.front().second; + if (n_tokens >= curve.back().first) return curve.back().second; + for (size_t i = 0; i + 1 < curve.size(); ++i) { + if (n_tokens <= curve[i + 1].first) { + float t = (float)(n_tokens - curve[i].first) / + (float)(curve[i + 1].first - curve[i].first); + return curve[i].second + t * (curve[i + 1].second - curve[i].second); + } + } + return curve.back().second; +} + +// ─── curl helpers for upstream proxy ───────────────────────────────────── + +struct CurlWriteCtx { + int client_fd; + bool streaming; + bool first_chunk; + bool chat_rewrite; // rewrite completions → chat format + std::string buffer; // accumulates non-streaming response + std::string response_id; + std::string model; +}; + +static size_t curl_write_passthrough(char * ptr, size_t size, size_t nmemb, void * userdata) { + size_t total = size * nmemb; + auto * ctx = static_cast(userdata); + if (ctx->streaming) { + ::send(ctx->client_fd, ptr, total, MSG_NOSIGNAL); + } else { + ctx->buffer.append(ptr, total); + } + return total; +} + +static size_t curl_write_rewrite(char * ptr, size_t size, size_t nmemb, void * userdata) { + size_t total = size * nmemb; + auto * ctx = static_cast(userdata); + + if (!ctx->streaming) { + ctx->buffer.append(ptr, total); + return total; + } + + // Streaming: rewrite completions SSE chunks → chat completions format. + ctx->buffer.append(ptr, total); + std::string & buf = ctx->buffer; + size_t pos = 0; + while (true) { + size_t nl = buf.find('\n', pos); + if (nl == std::string::npos) break; + std::string line = buf.substr(pos, nl - pos); + pos = nl + 1; + if (line.empty() || line == "\r") { + std::string out = "\n"; + ::send(ctx->client_fd, out.data(), out.size(), MSG_NOSIGNAL); + continue; + } + if (line.size() > 0 && line.back() == '\r') line.pop_back(); + if (line.rfind("data: ", 0) != 0) { + line += "\n"; + ::send(ctx->client_fd, line.data(), line.size(), MSG_NOSIGNAL); + continue; + } + std::string payload = line.substr(6); + if (payload == "[DONE]") { + std::string out = "data: [DONE]\n\n"; + ::send(ctx->client_fd, out.data(), out.size(), MSG_NOSIGNAL); + continue; + } + try { + auto j = json::parse(payload); + j["object"] = "chat.completion.chunk"; + if (j.contains("choices") && j["choices"].is_array()) { + for (auto & c : j["choices"]) { + json delta; + if (ctx->first_chunk) { + delta["role"] = "assistant"; + ctx->first_chunk = false; + } + if (c.contains("text")) { + delta["content"] = c["text"]; + c.erase("text"); + } + c["delta"] = delta; + c.erase("index"); // re-add below + if (!c.contains("index")) c["index"] = 0; + } + } + std::string out = "data: " + j.dump() + "\n\n"; + ::send(ctx->client_fd, out.data(), out.size(), MSG_NOSIGNAL); + } catch (...) { + std::string out = line + "\n"; + ::send(ctx->client_fd, out.data(), out.size(), MSG_NOSIGNAL); + } + } + buf.erase(0, pos); + return total; +} + +static json rewrite_completions_to_chat(const json & comp_resp) { + json chat_resp; + chat_resp["id"] = comp_resp.value("id", ""); + chat_resp["object"] = "chat.completion"; + chat_resp["created"] = comp_resp.value("created", 0); + chat_resp["model"] = comp_resp.value("model", ""); + if (comp_resp.contains("usage")) chat_resp["usage"] = comp_resp["usage"]; + json choices = json::array(); + if (comp_resp.contains("choices") && comp_resp["choices"].is_array()) { + for (const auto & c : comp_resp["choices"]) { + json choice; + choice["index"] = c.value("index", 0); + choice["finish_reason"] = c.value("finish_reason", "stop"); + json msg = {{"role", "assistant"}, {"content", c.value("text", "")}}; + choice["message"] = msg; + choices.push_back(choice); + } + } + chat_resp["choices"] = choices; + return chat_resp; +} + +static bool curl_forward(int client_fd, const std::string & url, + const std::string & api_key, const json & body, + bool streaming, bool rewrite_to_chat, + const std::string & response_id, + const std::string & model) { + CURL * curl = curl_easy_init(); + if (!curl) return false; + + std::string body_str = body.dump(); + + struct curl_slist * headers = nullptr; + headers = curl_slist_append(headers, "Content-Type: application/json"); + if (!api_key.empty()) { + std::string auth = "Authorization: Bearer " + api_key; + headers = curl_slist_append(headers, auth.c_str()); + } + + CurlWriteCtx ctx; + ctx.client_fd = client_fd; + ctx.streaming = streaming; + ctx.first_chunk = true; + ctx.chat_rewrite = rewrite_to_chat; + ctx.response_id = response_id; + ctx.model = model; + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body_str.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body_str.size()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 600L); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ctx); + + if (rewrite_to_chat) { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_rewrite); + } else { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_passthrough); + } + + if (streaming) { + std::string sse_header = + "HTTP/1.1 200 OK\r\n" + "Content-Type: text/event-stream\r\n" + "Cache-Control: no-cache\r\n" + "Connection: keep-alive\r\n" + "\r\n"; + ::send(client_fd, sse_header.data(), sse_header.size(), MSG_NOSIGNAL); + } + + CURLcode res = curl_easy_perform(curl); + + if (!streaming && res == CURLE_OK) { + // Non-streaming: send accumulated response. + if (rewrite_to_chat) { + try { + json resp = json::parse(ctx.buffer); + json chat_resp = rewrite_completions_to_chat(resp); + std::string out = chat_resp.dump(); + std::string http = + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: " + std::to_string(out.size()) + "\r\n" + "\r\n" + out; + ::send(client_fd, http.data(), http.size(), MSG_NOSIGNAL); + } catch (...) { + std::string http = + "HTTP/1.1 502 Bad Gateway\r\n" + "Content-Type: application/json\r\n" + "\r\n{\"error\":\"upstream response parse failed\"}"; + ::send(client_fd, http.data(), http.size(), MSG_NOSIGNAL); + } + } else { + std::string http = + "HTTP/1.1 200 OK\r\n" + "Content-Type: application/json\r\n" + "Content-Length: " + std::to_string(ctx.buffer.size()) + "\r\n" + "\r\n" + ctx.buffer; + ::send(client_fd, http.data(), http.size(), MSG_NOSIGNAL); + } + } + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + return res == CURLE_OK; +} + // ─── /props constants ─────────────────────────────────────────────────── // // SERVER_NAME / SERVER_VERSION mirror the Python server's identity strings @@ -461,11 +675,13 @@ HttpServer::HttpServer(ModelBackend & backend, config.disk_cache_continued_interval, config.disk_cache_cold_max_tokens}, backend) { + curl_global_init(CURL_GLOBAL_DEFAULT); disk_cache_.init(); } HttpServer::~HttpServer() { shutdown(); + curl_global_cleanup(); } void HttpServer::shutdown() { @@ -726,6 +942,7 @@ bool HttpServer::route_request(int fd, const HttpRequest & hr) { try { json body = json::parse(hr.body); + req.raw_body = body; // Common fields. req.stream = body.value("stream", false); @@ -1107,10 +1324,9 @@ void HttpServer::worker_loop() { req.max_output, json_array_size(req.tools)); - // Send SSE headers. - if (req.stream) { + // Send SSE headers (skip when proxying — curl_forward handles its own headers). + if (req.stream && config_.pflash_upstream_base.empty()) { if (!send_sse_headers(fd)) { - // Client already disconnected before we started. finish_job(); continue; } @@ -1122,8 +1338,8 @@ void HttpServer::worker_loop() { &tool_memory_, req.stop_sequences); - // Emit initial SSE events. - if (req.stream) { + // Emit initial SSE events (skip when proxying). + if (req.stream && config_.pflash_upstream_base.empty()) { bool start_ok = true; for (const auto & chunk : emitter.emit_start()) { if (!send_all(fd, chunk.data(), chunk.size())) { @@ -1176,9 +1392,9 @@ void HttpServer::worker_loop() { // 3. Compress via typed API ModelBackend::CompressRequest creq; creq.input_ids = std::move(drafter_ids); - // Bandit: use per-session keep_ratio if session_id provided. + // Bandit overrides curve when session_id is present. creq.keep_ratio = req.session_id.empty() - ? config_.pflash_keep_ratio + ? pflash_keep_ratio(config_, n_prompt) : sessions_.get_keep_ratio(req.session_id); creq.drafter_path = config_.pflash_drafter_path; creq.drafter_gpu = config_.pflash_drafter_gpu; @@ -1206,7 +1422,55 @@ void HttpServer::worker_loop() { std::string compressed_text = drafter_tokenizer_->decode(cresult.compressed_ids); - // 5. Re-tokenize with target tokenizer + // 5. Query survival check: verify the last user + // message survived compression. If < 80% of its + // tokens are present, re-append the full query. + std::string last_user_text; + if (req.messages.is_array()) { + for (int mi = (int)req.messages.size() - 1; mi >= 0; --mi) { + if (req.messages[mi].value("role", "") == "user") { + auto & c = req.messages[mi]["content"]; + if (c.is_string()) { + last_user_text = c.get(); + } else if (c.is_array()) { + for (const auto & part : c) { + std::string ptype = part.value("type", ""); + if (ptype == "text" || ptype == "input_text" || + ptype == "output_text") { + last_user_text += part.value("text", ""); + } + } + } + break; + } + } + } + if (!last_user_text.empty() && drafter_tokenizer_) { + auto query_ids = drafter_tokenizer_->encode(last_user_text); + int query_kept = 0; + if (!query_ids.empty()) { + int qi = (int)query_ids.size() - 1; + for (int ki = (int)cresult.compressed_ids.size() - 1; ki >= 0 && qi >= 0; --ki) { + if (cresult.compressed_ids[ki] == query_ids[qi]) { + ++query_kept; + --qi; + } + } + } + float survival = (float)query_kept / std::max(1, (int)query_ids.size()); + std::fprintf(stderr, "[pflash] query survival: %d/%d (%.0f%%)\n", + query_kept, (int)query_ids.size(), survival * 100.0f); + if (survival < 0.80f && (int)query_ids.size() < 1000) { + compressed_text += "\n" + last_user_text; + std::fprintf(stderr, "[pflash] query below 80%% — re-appended full query (%d tokens)\n", + (int)query_ids.size()); + } else if (survival < 0.80f) { + std::fprintf(stderr, "[pflash] query below 80%% but too large to re-append (%d tokens)\n", + (int)query_ids.size()); + } + } + + // 6. Re-tokenize with target tokenizer effective_prompt = tokenizer_.encode(compressed_text); pflash_compressed = true; @@ -1229,6 +1493,56 @@ void HttpServer::worker_loop() { } } + // ── Upstream proxy: forward to remote server if configured ──── + if (!config_.pflash_upstream_base.empty()) { + const std::string & upstream = config_.pflash_upstream_base; + const std::string & upstream_key = config_.pflash_upstream_key; + const std::string & upstream_model = config_.pflash_upstream_model.empty() + ? req.model : config_.pflash_upstream_model; + + if (pflash_compressed) { + std::string compressed_text = tokenizer_.decode(effective_prompt); + compressed_text += "\n<|im_start|>assistant\n"; + + json comp_body; + comp_body["model"] = upstream_model; + comp_body["prompt"] = compressed_text; + comp_body["stream"] = req.stream; + if (req.raw_body.contains("max_tokens")) + comp_body["max_tokens"] = req.raw_body["max_tokens"]; + else + comp_body["max_tokens"] = req.max_output; + for (const char * key : {"temperature", "top_p", "top_k", "min_p", + "frequency_penalty", "presence_penalty", + "stop", "seed"}) { + if (req.raw_body.contains(key)) comp_body[key] = req.raw_body[key]; + } + + std::fprintf(stderr, + "[pflash-proxy] compressed forward → %s/completions prompt=%zu tokens model=%s\n", + upstream.c_str(), effective_prompt.size(), upstream_model.c_str()); + + curl_forward(fd, upstream + "/completions", + upstream_key, comp_body, + req.stream, /*rewrite_to_chat=*/true, + req.response_id, upstream_model); + } else { + json fwd_body = req.raw_body; + fwd_body["model"] = upstream_model; + + std::fprintf(stderr, + "[pflash-proxy] passthrough → %s/chat/completions model=%s\n", + upstream.c_str(), upstream_model.c_str()); + + curl_forward(fd, upstream + "/chat/completions", + upstream_key, fwd_body, + req.stream, /*rewrite_to_chat=*/false, + req.response_id, upstream_model); + } + finish_job(); + continue; + } + // Build generate request. // // Thinking-budget v2 (Level 2): when caller opts in via diff --git a/server/src/server/http_server.h b/server/src/server/http_server.h index 2fb3e4661..35ff4e74c 100644 --- a/server/src/server/http_server.h +++ b/server/src/server/http_server.h @@ -149,6 +149,13 @@ struct ServerConfig { bool pflash_remote_drafter = false; // use IPC drafter for mixed backends RemoteDraftConfig pflash_remote; // IPC binary/work-dir for remote PFlash drafter bool pflash_skip_park = false; // skip park/unpark for >=32GB GPUs + // Passthrough proxy — forward to upstream OpenAI-compatible server + std::string pflash_upstream_base; // e.g. "http://localhost:8080/v1" + std::string pflash_upstream_key; // Bearer token for upstream + std::string pflash_upstream_model; // model name in forwarded requests + // Piecewise keep-ratio curve: (token_threshold, keep_ratio) sorted ascending. + // If empty, uses pflash_keep_ratio as flat value. + std::vector> pflash_curve; bool lazy_draft = false; // park decode draft when idle to save VRAM // Disk prefix cache @@ -180,6 +187,8 @@ struct ParsedRequest { json tool_choice; // Original messages (for response formatting) json messages; + // Original request body (for upstream proxy forwarding) + json raw_body; // Response ID std::string response_id; // Thinking/reasoning state diff --git a/server/src/server/server_main.cpp b/server/src/server/server_main.cpp index 3dcb23a5a..ee12673fe 100644 --- a/server/src/server/server_main.cpp +++ b/server/src/server/server_main.cpp @@ -22,6 +22,7 @@ #include "gguf.h" +#include #include #include #include @@ -384,6 +385,30 @@ int main(int argc, char ** argv) { sconfig.pflash_drafter_path = argv[++i]; } else if (std::strcmp(argv[i], "--prefill-skip-park") == 0) { sconfig.pflash_skip_park = true; + } else if (std::strcmp(argv[i], "--prefill-upstream-base") == 0 && i + 1 < argc) { + sconfig.pflash_upstream_base = argv[++i]; + // Strip trailing slash + while (!sconfig.pflash_upstream_base.empty() && sconfig.pflash_upstream_base.back() == '/') + sconfig.pflash_upstream_base.pop_back(); + } else if (std::strcmp(argv[i], "--prefill-upstream-key") == 0 && i + 1 < argc) { + sconfig.pflash_upstream_key = argv[++i]; + } else if (std::strcmp(argv[i], "--prefill-upstream-model") == 0 && i + 1 < argc) { + sconfig.pflash_upstream_model = argv[++i]; + } else if (std::strcmp(argv[i], "--prefill-curve") == 0 && i + 1 < argc) { + sconfig.pflash_curve.clear(); + while (i + 1 < argc && argv[i + 1][0] != '-') { + const char * arg = argv[++i]; + const char * colon = std::strchr(arg, ':'); + if (!colon) { + std::fprintf(stderr, "[server] --prefill-curve: bad format '%s' (expected TOKENS:RATIO)\n", arg); + print_usage(argv[0]); + return 1; + } + int tok = std::atoi(arg); + float ratio = (float)std::atof(colon + 1); + sconfig.pflash_curve.push_back({tok, ratio}); + } + std::sort(sconfig.pflash_curve.begin(), sconfig.pflash_curve.end()); } else if (std::strcmp(argv[i], "--lazy-draft") == 0) { sconfig.lazy_draft = true; } else if (std::strcmp(argv[i], "--chat-template-file") == 0 && i + 1 < argc) { @@ -540,6 +565,17 @@ int main(int argc, char ** argv) { sconfig.pflash_threshold, sconfig.pflash_keep_ratio, sconfig.pflash_drafter_gpu, (int)sconfig.pflash_skip_park); + if (!sconfig.pflash_curve.empty()) { + std::fprintf(stderr, "[server] pflash curve:"); + for (const auto & p : sconfig.pflash_curve) + std::fprintf(stderr, " %d:%.3f", p.first, p.second); + std::fprintf(stderr, "\n"); + } + if (!sconfig.pflash_upstream_base.empty()) { + std::fprintf(stderr, "[server] pflash upstream: %s model=%s\n", + sconfig.pflash_upstream_base.c_str(), + sconfig.pflash_upstream_model.c_str()); + } } // Create backend. diff --git a/server/test/test_server_unit.cpp b/server/test/test_server_unit.cpp index 1415aab30..a0b868a49 100644 --- a/server/test/test_server_unit.cpp +++ b/server/test/test_server_unit.cpp @@ -953,6 +953,74 @@ static void test_pflash_threshold_always_mode() { TEST_ASSERT(should); } +static void test_pflash_config_upstream_defaults() { + ServerConfig cfg; + TEST_ASSERT(cfg.pflash_upstream_base.empty()); + TEST_ASSERT(cfg.pflash_upstream_key.empty()); + TEST_ASSERT(cfg.pflash_upstream_model.empty()); + TEST_ASSERT(cfg.pflash_curve.empty()); +} + +static void test_pflash_curve_interpolation() { + ServerConfig cfg; + cfg.pflash_curve = {{10000, 0.50f}, {40000, 0.20f}, {100000, 0.10f}}; + + // Replicate the piecewise logic from http_server.cpp + auto keep = [&](int n) -> float { + const auto & curve = cfg.pflash_curve; + if (n <= curve.front().first) return curve.front().second; + if (n >= curve.back().first) return curve.back().second; + for (size_t i = 0; i + 1 < curve.size(); ++i) { + if (n <= curve[i + 1].first) { + float t = (float)(n - curve[i].first) / + (float)(curve[i + 1].first - curve[i].first); + return curve[i].second + t * (curve[i + 1].second - curve[i].second); + } + } + return curve.back().second; + }; + + // Below first breakpoint + TEST_ASSERT(keep(5000) == 0.50f); + // At first breakpoint + TEST_ASSERT(keep(10000) == 0.50f); + // Midpoint between 10k and 40k + float mid = keep(25000); + TEST_ASSERT(mid > 0.20f && mid < 0.50f); + // At second breakpoint + TEST_ASSERT(std::fabs(keep(40000) - 0.20f) < 0.001f); + // Above last breakpoint + TEST_ASSERT(keep(200000) == 0.10f); +} + +static void test_pflash_curve_empty_uses_flat() { + ServerConfig cfg; + cfg.pflash_keep_ratio = 0.05f; + // With empty curve, should fall back to flat ratio + TEST_ASSERT(cfg.pflash_curve.empty()); + TEST_ASSERT(cfg.pflash_keep_ratio == 0.05f); +} + +static void test_pflash_upstream_proxy_config() { + ServerConfig cfg; + cfg.pflash_upstream_base = "http://localhost:8080/v1"; + cfg.pflash_upstream_key = "test-key"; + cfg.pflash_upstream_model = "test-model"; + + TEST_ASSERT(!cfg.pflash_upstream_base.empty()); + TEST_ASSERT(cfg.pflash_upstream_key == "test-key"); + TEST_ASSERT(cfg.pflash_upstream_model == "test-model"); +} + +static void test_pflash_raw_body_preserved() { + ParsedRequest req; + req.raw_body = {{"model", "test"}, {"messages", json::array()}, {"temperature", 0.7}}; + + TEST_ASSERT(req.raw_body.contains("model")); + TEST_ASSERT(req.raw_body.contains("temperature")); + TEST_ASSERT(req.raw_body["temperature"].get() > 0.6f); +} + static void test_pflash_placement_same_backend_local() { DevicePlacement target; target.backend = compiled_placement_backend(); @@ -2526,6 +2594,11 @@ int main() { RUN_TEST(test_pflash_compress_result_defaults); RUN_TEST(test_pflash_threshold_auto_mode); RUN_TEST(test_pflash_threshold_always_mode); + RUN_TEST(test_pflash_config_upstream_defaults); + RUN_TEST(test_pflash_curve_interpolation); + RUN_TEST(test_pflash_curve_empty_uses_flat); + RUN_TEST(test_pflash_upstream_proxy_config); + RUN_TEST(test_pflash_raw_body_preserved); RUN_TEST(test_pflash_placement_same_backend_local); RUN_TEST(test_pflash_placement_mixed_backend_remote); RUN_TEST(test_pflash_placement_auto_draft_follows_target);