From 1025c7a0ea07bb8d9aa098bd072df7b142cf1b90 Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 07:20:12 +0800 Subject: [PATCH 1/8] feat(qwen35moe): pipelined hybrid MoE decode with GPU/CPU overlap Implement pipelined decode path that caches DeltaNet pre-FFN graphs and enables true GPU/CPU overlap for hot/cold expert computation: - Cache 30/40 DeltaNet layer graphs (position-independent recurrent state) - Move ffn_post readback before hot graph launch to avoid serialization - Integrate pipelined path into both run_ar_decode_path and generate() AR fallback - Add persistent PipelinedDecodeState to avoid per-request alloc/free - Remove dead process_one_token code from generate() Benchmark results (RTX 2080 Ti, Qwen3.6-35B-A3B Q4_K_M, 60% hot): - Realistic placement: 46.6 ms/tok (vs 43.0 all-GPU, only +8%) - Worst-case (all cold): 81.4 ms/tok (vs 90.7 old hybrid, -10%) - Saves ~8 GiB VRAM vs all-GPU while maintaining near-parity speed Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/CMakeLists.txt | 1 + server/src/qwen35moe/qwen35moe_backend.cpp | 521 ++++--------- server/src/qwen35moe/qwen35moe_backend.h | 13 +- .../qwen35moe/qwen35moe_pipelined_decode.cpp | 408 ++++++++++ .../qwen35moe/qwen35moe_pipelined_decode.h | 104 +++ server/test/test_dflash.cpp | 723 +++++++++++++++++- 6 files changed, 1382 insertions(+), 388 deletions(-) create mode 100644 server/src/qwen35moe/qwen35moe_pipelined_decode.cpp create mode 100644 server/src/qwen35moe/qwen35moe_pipelined_decode.h diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 71298ff6a..d42762f35 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -254,6 +254,7 @@ add_library(dflash_common STATIC src/qwen35moe/qwen35moe_expert_placement.cpp src/qwen35moe/qwen35moe_hybrid_storage.cpp src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp + src/qwen35moe/qwen35moe_pipelined_decode.cpp src/qwen35moe/qwen35moe_swap_manager.cpp src/qwen35/layer_split_forward.cpp src/qwen35/layer_split_daemon.cpp diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 24d0d4b55..c32671e54 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -1,4 +1,5 @@ #include "qwen35moe_backend.h" +#include "qwen35moe_pipelined_decode.h" #include "common/sampler.h" #include "common/dflash_spec_decode.h" @@ -170,14 +171,8 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & total_cold, cold_bytes / 1024.0 / 1024.0 / 1024.0, placement_source.c_str()); - if (total_cold > 0) { - hybrid_mode_ = true; - // Keep cfg_.draft_path set — hybrid spec-decode uses it for drafting - // while target verification runs through the hybrid forward path. - std::printf("[qwen35moe] hybrid decode path active (%d cold experts)\n", total_cold); - } else { - std::printf("[qwen35moe] all experts hot — using fused all-GPU decode path\n"); - } + std::printf("[qwen35moe] pipelined decode path active (hot=%d cold=%d)\n", + out.moe_hybrid->placement.total_hot, total_cold); if (const char * out_path = std::getenv("DFLASH_QWEN35MOE_NEXT_PLACEMENT_OUT")) { placement_out_path_ = out_path; } @@ -222,7 +217,7 @@ void Qwen35MoeBackend::maybe_post_request_swap() { } } - if (!hybrid_mode_ || !target_weights().moe_hybrid || swap_policy_.max_swaps_total <= 0) return; + if (!target_weights().moe_hybrid || swap_policy_.max_swaps_total <= 0) return; Qwen35MoeSwapPlan plan; std::string err; @@ -251,55 +246,67 @@ void Qwen35MoeBackend::maybe_post_request_swap() { bool Qwen35MoeBackend::run_ar_decode_path(int committed, int n_gen, std::vector & out_tokens, const DaemonIO & io) { - if (!hybrid_mode_ || !target_weights().moe_hybrid) { + if (!target_weights().moe_hybrid) { return Qwen35Backend::run_ar_decode_path(committed, n_gen, out_tokens, io); } if (n_gen <= 0) return true; + return run_pipelined_decode_path(committed, n_gen, out_tokens, io); +} + +// ─── Pipelined decode: cached DeltaNet graphs + optimized FFN loop ─────────── + +bool Qwen35MoeBackend::ensure_pipe_state(int kv_start) { + if (pipe_state_ && pipe_state_->valid()) return true; + pipe_state_ = std::make_unique(); + if (!init_pipelined_decode_state(*pipe_state_, target_backend(), target_weights(), + target_cache(), kv_start, cfg_.kq_stride_pad)) { + pipe_state_.reset(); + return false; + } + return true; +} + +bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, + std::vector & out_tokens, + const DaemonIO & io) { const int hidden = target_weights().n_embd; const int vocab = target_weights().n_vocab; std::vector logits_buf((size_t)vocab); std::vector act_cur((size_t)hidden); - uint64_t hot_selected_total = 0; - uint64_t cold_selected_total = 0; - uint64_t decode_prefn_us = 0; - uint64_t decode_logits_us = 0; - uint64_t cold_layer_calls = 0; - uint64_t layer_calls = 0; const auto decode_t0 = HybridClock::now(); - auto project_logits = [&](const float * hidden_host) -> bool { - StepGraph proj_sg; - ggml_init_params ip{}; - ip.mem_size = 64 * 1024 * 1024; - ip.mem_buffer = nullptr; - ip.no_alloc = true; - proj_sg.ctx = ggml_init(ip); - if (!proj_sg.ctx) return false; - proj_sg.hidden_input = ggml_new_tensor_3d(proj_sg.ctx, GGML_TYPE_F32, hidden, 1, 1); - ggml_set_input(proj_sg.hidden_input); - proj_sg.gf = ggml_new_graph_custom(proj_sg.ctx, 1024, false); - ggml_tensor * normed = ggml_rms_norm(proj_sg.ctx, proj_sg.hidden_input, target_weights().rms_eps); - normed = ggml_mul(proj_sg.ctx, normed, target_weights().out_norm); - proj_sg.logits = ggml_mul_mat(proj_sg.ctx, target_weights().output, normed); - ggml_set_output(proj_sg.logits); - ggml_build_forward_expand(proj_sg.gf, proj_sg.logits); - proj_sg.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(target_backend())); - if (!ggml_gallocr_alloc_graph(proj_sg.alloc, proj_sg.gf)) { - step_graph_destroy(proj_sg); - return false; - } - ggml_backend_tensor_set(proj_sg.hidden_input, hidden_host, 0, sizeof(float) * (size_t)hidden); - auto st = ggml_backend_graph_compute(target_backend(), proj_sg.gf); - if (st != GGML_STATUS_SUCCESS) { - step_graph_destroy(proj_sg); - return false; + // Persistent logits graph (built once, reused per token) + StepGraph logits_sg; + auto project_logits = [&]() -> bool { + if (!logits_sg.ctx) { + ggml_init_params ip{}; + ip.mem_size = 64 * 1024 * 1024; + ip.no_alloc = true; + logits_sg.ctx = ggml_init(ip); + if (!logits_sg.ctx) return false; + logits_sg.hidden_input = ggml_new_tensor_3d(logits_sg.ctx, GGML_TYPE_F32, hidden, 1, 1); + ggml_set_input(logits_sg.hidden_input); + logits_sg.gf = ggml_new_graph_custom(logits_sg.ctx, 1024, false); + ggml_tensor * normed = ggml_rms_norm(logits_sg.ctx, logits_sg.hidden_input, target_weights().rms_eps); + normed = ggml_mul(logits_sg.ctx, normed, target_weights().out_norm); + logits_sg.logits = ggml_mul_mat(logits_sg.ctx, target_weights().output, normed); + ggml_set_output(logits_sg.logits); + ggml_build_forward_expand(logits_sg.gf, logits_sg.logits); + logits_sg.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(target_backend())); + if (!ggml_gallocr_alloc_graph(logits_sg.alloc, logits_sg.gf)) { + step_graph_destroy(logits_sg); + return false; + } } - ggml_backend_tensor_get(proj_sg.logits, logits_buf.data(), 0, sizeof(float) * (size_t)vocab); - step_graph_destroy(proj_sg); + ggml_backend_tensor_set(logits_sg.hidden_input, act_cur.data(), 0, sizeof(float) * (size_t)hidden); + auto st = ggml_backend_graph_compute(target_backend(), logits_sg.gf); + if (st != GGML_STATUS_SUCCESS) return false; + ggml_backend_tensor_get(logits_sg.logits, logits_buf.data(), 0, sizeof(float) * (size_t)vocab); return true; }; + // ── First token: sample from prefill logits ── { int32_t first_tok; if (sampler_config().temp > 0) { @@ -318,119 +325,43 @@ bool Qwen35MoeBackend::run_ar_decode_path(int committed, int n_gen, target_cache().cur_pos = committed; } - StepGraph layer_sg; - std::vector selected((size_t)target_weights().n_expert_used); - std::vector weights_buf((size_t)target_weights().n_expert_used); - ggml_backend_t cpu_be = target_weights().moe_hybrid->cpu_backend; - - // Initialize GPU-resident state: persistent act_cur + combine graph on GPU - GpuResidentState gpu_state; - if (!init_gpu_resident_state(gpu_state, target_backend(), hidden)) { + // ── Ensure persistent pipelined state (built once, reused) ── + if (!ensure_pipe_state(committed)) { return false; } + uint64_t hot_selected_total = 0; + uint64_t cold_selected_total = 0; + for (int step = 1; step < n_gen; ++step) { int32_t tok = out_tokens.back(); - // Embed token to host, then upload to GPU-resident act_cur once if (!target_weights().embedder.embed(&tok, 1, act_cur.data())) { - gpu_state.destroy(); return false; } - ggml_backend_tensor_set(gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); - - for (int il = 0; il < target_weights().n_layer; ++il) { - const auto prefn_t0 = HybridClock::now(); - // Build pre-FFN graph (attention/DeltaNet + router only, no MoE FFN) - if (!build_layer_prefn_step(layer_sg, target_weights(), target_cache(), target_backend(), - il, committed, /*n_tokens=*/1, - /*with_mask=*/false, /*fa_window=*/0, cfg_.kq_stride_pad)) { - step_graph_destroy(layer_sg); - gpu_state.destroy(); - return false; - } - // GPU→GPU: copy persistent act_cur to pre-FFN graph input (no PCIe!) - ggml_backend_tensor_copy(gpu_state.act_cur, layer_sg.inp_embed); - if (layer_sg.positions) { - int32_t pos4[4] = {committed, committed, committed, 0}; - ggml_backend_tensor_set(layer_sg.positions, pos4, 0, sizeof(pos4)); - } - auto st = ggml_backend_graph_compute(target_backend(), layer_sg.gf); - if (st != GGML_STATUS_SUCCESS) { - step_graph_destroy(layer_sg); - gpu_state.destroy(); - return false; - } + ggml_backend_tensor_set(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + + PipelinedDecodeTelemetry tel; + if (!pipelined_decode_one_token(*pipe_state_, target_backend(), target_weights(), + target_cache(), *target_weights().moe_hybrid, + committed, cfg_.kq_stride_pad, + hybrid_telemetry_ ? &tel : nullptr)) { + return false; + } - // Only read router decisions to CPU (64 bytes total — unavoidable) - ggml_tensor * layer_selected = (!layer_sg.moe_selected.empty() && (size_t)il < layer_sg.moe_selected.size()) - ? layer_sg.moe_selected[(size_t)il] - : nullptr; - if (!layer_selected || !layer_sg.moe_weights) { - step_graph_destroy(layer_sg); - gpu_state.destroy(); - return false; - } - ggml_backend_tensor_get(layer_selected, selected.data(), 0, - sizeof(int32_t) * selected.size()); - ggml_backend_tensor_get(layer_sg.moe_weights, weights_buf.data(), 0, - sizeof(float) * weights_buf.size()); - if (routing_stats_) { - routing_stats_->observe(il, selected.data(), (int)selected.size()); - } - const auto prefn_t1 = HybridClock::now(); - decode_prefn_us += elapsed_us(prefn_t0, prefn_t1); - - // GPU-resident hybrid FFN: hot on GPU, cold on CPU, combine on GPU - auto & storage = target_weights().moe_hybrid->layers[(size_t)il]; - const auto & L = target_weights().layers[(size_t)il]; - if (!eval_qwen35moe_hybrid_ffn_gpu_resident( - target_backend(), target_weights(), L, storage, cpu_be, - layer_sg.ffn_post, layer_sg.ffn_residual, - gpu_state, - selected.data(), weights_buf.data(), - (int)selected.size())) { - step_graph_destroy(layer_sg); - gpu_state.destroy(); - return false; - } - // gpu_state.act_cur now holds the layer output on GPU - - // Track routing stats - if (hybrid_telemetry_) { - layer_calls++; - for (int32_t expert : selected) { - if (expert >= 0 && expert < (int32_t)storage.hot_local_by_global.size()) { - if (storage.hot_local_by_global[(size_t)expert] >= 0) { - hot_selected_total++; - } else { - cold_selected_total++; - cold_layer_calls++; - } - } - } - } else { - for (int32_t expert : selected) { - if (expert >= 0 && expert < (int32_t)storage.hot_local_by_global.size()) { - if (storage.hot_local_by_global[(size_t)expert] >= 0) { - hot_selected_total++; - } else { - cold_selected_total++; - } - } - } - } + if (hybrid_telemetry_) { + hot_selected_total += (uint64_t)tel.allhot_layers * target_weights().n_expert_used + + (uint64_t)(tel.mixed_layers * target_weights().n_expert_used - tel.mixed_layers); + cold_selected_total += (uint64_t)tel.mixed_layers; } - // Read final act_cur from GPU for logits projection (single 10KB read) - ggml_backend_tensor_get(gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); - const auto logits_t0 = HybridClock::now(); - if (!project_logits(act_cur.data())) { - step_graph_destroy(layer_sg); - gpu_state.destroy(); + ggml_backend_tensor_get(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + if (!project_logits()) { + step_graph_destroy(logits_sg); return false; } - const auto logits_t1 = HybridClock::now(); - decode_logits_us += elapsed_us(logits_t0, logits_t1); + int32_t next_tok; if (sampler_config().temp > 0) { next_tok = sample_logits(logits_buf.data(), vocab, sampler_config(), @@ -452,29 +383,24 @@ bool Qwen35MoeBackend::run_ar_decode_path(int committed, int n_gen, if (io.cancelled) break; if (is_eos_tok(next_tok, target_weights())) break; } - step_graph_destroy(layer_sg); - gpu_state.destroy(); + last_hot_selected_ = hot_selected_total; last_cold_selected_ = cold_selected_total; - std::printf("[qwen35moe] hybrid decode stats: hot_selected=%llu cold_selected=%llu\n", + std::printf("[qwen35moe] pipelined decode stats: hot_selected=%llu cold_selected=%llu\n", (unsigned long long)last_hot_selected_, (unsigned long long)last_cold_selected_); if (hybrid_telemetry_) { const uint64_t decode_us = elapsed_us(decode_t0, HybridClock::now()); - std::printf("[qwen35moe] hybrid telemetry: decode_ms=%.2f layer_ms=%.2f logits_ms=%.2f " - "layer_calls=%llu cold_layer_calls=%llu\n", - decode_us / 1000.0, - decode_prefn_us / 1000.0, - decode_logits_us / 1000.0, - (unsigned long long)layer_calls, - (unsigned long long)cold_layer_calls); + std::printf("[qwen35moe] pipelined telemetry: total_decode_ms=%.2f\n", + decode_us / 1000.0); } + step_graph_destroy(logits_sg); return true; } GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, const DaemonIO & io) { - if (!hybrid_mode_ || !target_weights().moe_hybrid) { + if (!target_weights().moe_hybrid) { auto result = Qwen35Backend::generate(req, io); if (result.ok) maybe_post_request_swap(); return result; @@ -493,116 +419,19 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, const int hidden = target_weights().n_embd; const int vocab = target_weights().n_vocab; std::vector act_cur((size_t)hidden); - std::vector residual_buf((size_t)hidden); - std::vector post_buf((size_t)hidden); - std::vector ffn_out((size_t)hidden); std::vector logits_buf((size_t)vocab); std::vector selected((size_t)target_weights().n_expert_used); std::vector weights_buf((size_t)target_weights().n_expert_used); ggml_backend_t cpu_be = target_weights().moe_hybrid->cpu_backend; - StepGraph layer_sg; - - // Cached pre-FFN graphs for DeltaNet layers (position-independent, reusable) const int n_layer = target_weights().n_layer; - std::vector cached_prefn((size_t)n_layer); - std::vector prefn_built((size_t)n_layer, false); uint64_t build_us_total = 0, compute_us_total = 0, readback_us_total = 0, ffn_us_total = 0; - uint64_t prefill_build_us = 0, prefill_compute_us = 0, prefill_readback_us = 0, prefill_ffn_us = 0; - uint64_t decode_build_us = 0, decode_compute_us = 0, decode_readback_us = 0, decode_ffn_us = 0; - uint64_t embed_us_total = 0, logits_us_total = 0; Qwen35MoeHybridFfnTelemetry ffn_tel_accum{}; - StepGraph logits_sg; // Persistent logits graph (built once, reused every token) + StepGraph logits_sg; // Persistent logits graph (used by spec-decode branch) auto cleanup_graphs = [&]() { - step_graph_destroy(layer_sg); step_graph_destroy(logits_sg); - for (auto & sg : cached_prefn) step_graph_destroy(sg); - }; - - // Helper: process one token through all layers (host-based with cached graphs) - auto process_one_token = [&](int kv_pos) -> bool { - for (int il = 0; il < n_layer; ++il) { - const bool is_attn = (((il + 1) % target_weights().full_attention_interval) == 0); - const auto t0 = HybridClock::now(); - - StepGraph * sg_ptr; - if (!is_attn && prefn_built[(size_t)il]) { - sg_ptr = &cached_prefn[(size_t)il]; - } else { - StepGraph & sg = is_attn ? layer_sg : cached_prefn[(size_t)il]; - if (!build_layer_prefn_step(sg, target_weights(), target_cache(), target_backend(), - il, kv_pos, /*n_tokens=*/1, - /*with_mask=*/false, /*fa_window=*/0, cfg_.kq_stride_pad)) { - return false; - } - if (!is_attn) prefn_built[(size_t)il] = true; - sg_ptr = &sg; - } - - // Upload act_cur from host → GPU (standard path) - ggml_backend_tensor_set(sg_ptr->inp_embed, act_cur.data(), 0, sizeof(float) * (size_t)hidden); - if (sg_ptr->positions) { - int32_t pos4[4] = {kv_pos, kv_pos, kv_pos, 0}; - ggml_backend_tensor_set(sg_ptr->positions, pos4, 0, sizeof(pos4)); - } - const auto t1 = HybridClock::now(); - build_us_total += elapsed_us(t0, t1); - - auto st = ggml_backend_graph_compute(target_backend(), sg_ptr->gf); - if (st != GGML_STATUS_SUCCESS) return false; - const auto t2 = HybridClock::now(); - compute_us_total += elapsed_us(t1, t2); - - // Read back pre-FFN outputs - ggml_backend_tensor_get(sg_ptr->ffn_residual, residual_buf.data(), 0, sizeof(float) * (size_t)hidden); - ggml_backend_tensor_get(sg_ptr->ffn_post, post_buf.data(), 0, sizeof(float) * (size_t)hidden); - ggml_tensor * layer_selected = (!sg_ptr->moe_selected.empty() && (size_t)il < sg_ptr->moe_selected.size()) - ? sg_ptr->moe_selected[(size_t)il] - : nullptr; - if (!layer_selected || !sg_ptr->moe_weights) return false; - ggml_backend_tensor_get(layer_selected, selected.data(), 0, - sizeof(int32_t) * selected.size()); - ggml_backend_tensor_get(sg_ptr->moe_weights, weights_buf.data(), 0, - sizeof(float) * weights_buf.size()); - if (routing_stats_) { - routing_stats_->observe(il, selected.data(), (int)selected.size()); - } - const auto t3 = HybridClock::now(); - readback_us_total += elapsed_us(t2, t3); - - // Hybrid FFN: hot on GPU, cold on CPU - auto & storage = target_weights().moe_hybrid->layers[(size_t)il]; - const auto & L = target_weights().layers[(size_t)il]; - if (!eval_qwen35moe_hybrid_ffn_single( - target_backend(), target_weights(), L, storage, cpu_be, - post_buf.data(), selected.data(), weights_buf.data(), - (int)selected.size(), ffn_out, nullptr, nullptr)) { - return false; - } - - // Layer output = FFN output + residual - for (int i = 0; i < hidden; ++i) { - act_cur[(size_t)i] = ffn_out[(size_t)i] + residual_buf[(size_t)i]; - } - - if (hybrid_telemetry_) { - for (int32_t expert : selected) { - if (expert >= 0 && expert < (int32_t)storage.hot_local_by_global.size()) { - if (storage.hot_local_by_global[(size_t)expert] >= 0) { - ffn_tel_accum.hot_selected++; - } else { - ffn_tel_accum.cold_selected++; - } - } - } - } - - const auto t4 = HybridClock::now(); - ffn_us_total += elapsed_us(t3, t4); - } - return true; }; // Helper: compute logits from act_cur (persistent graph, built once) @@ -805,13 +634,6 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, std::memcpy(act_cur.data(), embed_all.data() + (size_t)(prompt_len - 1) * (size_t)hidden, sizeof(float) * (size_t)hidden); - // Save prefill counters - prefill_build_us = build_us_total; - prefill_compute_us = compute_us_total; - prefill_readback_us = readback_us_total; - prefill_ffn_us = ffn_us_total; - build_us_total = 0; compute_us_total = 0; readback_us_total = 0; ffn_us_total = 0; - int committed = prompt_len; target_cache().cur_pos = committed; auto t_prefill_end = std::chrono::steady_clock::now(); @@ -855,8 +677,14 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, return result; } } else { - // AR fallback decode - // Get logits from last prefill token + // AR fallback decode — use pipelined path (cached DeltaNet + GPU-resident FFN) + if (!ensure_pipe_state(committed)) { + result.error = "pipe_state_init"; + cleanup_graphs(); + return result; + } + + // Get logits from last prefill token (reuses persistent logits graph) if (!compute_logits()) { result.error = "decode_logits"; cleanup_graphs(); @@ -881,28 +709,33 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, committed++; target_cache().cur_pos = committed; - // Generate remaining tokens + // Pipelined decode loop for (int step = 1; step < req.n_gen; ++step) { - const auto t_emb0 = HybridClock::now(); int32_t tok = result.tokens.back(); if (!target_weights().embedder.embed(&tok, 1, act_cur.data())) { result.error = "decode_embed"; cleanup_graphs(); return result; } - embed_us_total += elapsed_us(t_emb0, HybridClock::now()); - if (!process_one_token(committed)) { + ggml_backend_tensor_set(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + + if (!pipelined_decode_one_token(*pipe_state_, target_backend(), target_weights(), + target_cache(), *target_weights().moe_hybrid, + committed, cfg_.kq_stride_pad, nullptr)) { result.error = "decode"; cleanup_graphs(); return result; } - const auto t_log0 = HybridClock::now(); + + ggml_backend_tensor_get(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); if (!compute_logits()) { result.error = "decode_logits"; cleanup_graphs(); return result; } - logits_us_total += elapsed_us(t_log0, HybridClock::now()); + int32_t next_tok; if (sampler_config().temp > 0) { next_tok = sample_logits(logits_buf.data(), vocab, sampler_config(), @@ -931,46 +764,27 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, cleanup_graphs(); } if (hybrid_telemetry_) { - decode_build_us = build_us_total; - decode_compute_us = compute_us_total; - decode_readback_us = readback_us_total; - decode_ffn_us = ffn_us_total; const int n_decode_toks = (int)result.tokens.size(); std::printf("[qwen35moe] === PREFILL ANALYSIS (prompt_len=%d, chunk=%d) ===\n", prompt_len, prefill_chunk); std::printf(" prefill_total=%.1fms (%.1f tok/s)\n", result.prefill_s * 1000.0, prompt_len / result.prefill_s); std::printf(" build=%.1fms compute=%.1fms readback=%.1fms ffn=%.1fms\n", - prefill_build_us / 1000.0, prefill_compute_us / 1000.0, - prefill_readback_us / 1000.0, prefill_ffn_us / 1000.0); - const double prefill_total_us = (double)(prefill_build_us + prefill_compute_us + prefill_readback_us + prefill_ffn_us); + build_us_total / 1000.0, compute_us_total / 1000.0, + readback_us_total / 1000.0, ffn_us_total / 1000.0); + const double prefill_total_us = (double)(build_us_total + compute_us_total + readback_us_total + ffn_us_total); if (prefill_total_us > 0) { std::printf(" pct: build=%.1f%% compute=%.1f%% readback=%.1f%% ffn=%.1f%%\n", - 100.0 * prefill_build_us / prefill_total_us, - 100.0 * prefill_compute_us / prefill_total_us, - 100.0 * prefill_readback_us / prefill_total_us, - 100.0 * prefill_ffn_us / prefill_total_us); + 100.0 * build_us_total / prefill_total_us, + 100.0 * compute_us_total / prefill_total_us, + 100.0 * readback_us_total / prefill_total_us, + 100.0 * ffn_us_total / prefill_total_us); } - std::printf("[qwen35moe] === DECODE ANALYSIS (n_tokens=%d) ===\n", n_decode_toks); + std::printf("[qwen35moe] === DECODE (pipelined, n_tokens=%d) ===\n", n_decode_toks); if (result.decode_s > 0) { std::printf(" decode_total=%.1fms (%.2f tok/s)\n", result.decode_s * 1000.0, n_decode_toks / result.decode_s); } - std::printf(" build=%.1fms compute=%.1fms readback=%.1fms ffn=%.1fms\n", - decode_build_us / 1000.0, decode_compute_us / 1000.0, - decode_readback_us / 1000.0, decode_ffn_us / 1000.0); - std::printf(" embed=%.1fms logits=%.1fms\n", - embed_us_total / 1000.0, logits_us_total / 1000.0); - const double decode_total_us = (double)(decode_build_us + decode_compute_us + decode_readback_us + decode_ffn_us + embed_us_total + logits_us_total); - if (decode_total_us > 0 && n_decode_toks > 0) { - std::printf(" per-token: build=%.2fms compute=%.2fms readback=%.2fms ffn=%.2fms embed=%.2fms logits=%.2fms\n", - decode_build_us / 1000.0 / n_decode_toks, - decode_compute_us / 1000.0 / n_decode_toks, - decode_readback_us / 1000.0 / n_decode_toks, - decode_ffn_us / 1000.0 / n_decode_toks, - embed_us_total / 1000.0 / n_decode_toks, - logits_us_total / 1000.0 / n_decode_toks); - } - std::printf("[qwen35moe] === FFN BREAKDOWN (total calls) ===\n"); + std::printf("[qwen35moe] === FFN BREAKDOWN (prefill) ===\n"); std::printf(" hot_gpu=%.1fms cold_cpu=%.1fms partition=%.1fms combine=%.1fms\n", ffn_tel_accum.hot_us / 1000.0, ffn_tel_accum.cold_us / 1000.0, ffn_tel_accum.partition_us / 1000.0, ffn_tel_accum.combine_us / 1000.0); @@ -986,7 +800,7 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, GenerateResult Qwen35MoeBackend::restore_and_generate(int slot, const GenerateRequest & req, const DaemonIO & io) { - if (!hybrid_mode_ || !target_weights().moe_hybrid) { + if (!target_weights().moe_hybrid) { auto result = Qwen35Backend::restore_and_generate(slot, req, io); if (result.ok) maybe_post_request_swap(); return result; @@ -1002,96 +816,47 @@ bool Qwen35MoeBackend::hybrid_forward_one_token(int32_t tok, int kv_pos, std::vector & act_cur, int32_t & argmax_out) { const int hidden = target_weights().n_embd; - const int n_layer = target_weights().n_layer; - const int n_expert_used = target_weights().n_expert_used; // Embed the token if (!target_weights().embedder.embed(&tok, 1, act_cur.data())) return false; - std::vector residual_buf((size_t)hidden); - std::vector post_buf((size_t)hidden); - std::vector ffn_out((size_t)hidden); - std::vector selected((size_t)n_expert_used); - std::vector weights_buf((size_t)n_expert_used); - ggml_backend_t cpu_be = target_weights().moe_hybrid->cpu_backend; + // Ensure pipelined state + if (!ensure_pipe_state(kv_pos)) return false; - StepGraph layer_sg; + // Upload to GPU-resident act_cur + ggml_backend_tensor_set(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); - for (int il = 0; il < n_layer; ++il) { - // Pre-FFN: attention/DeltaNet + router - if (!build_layer_prefn_step(layer_sg, target_weights(), target_cache(), target_backend(), - il, kv_pos, /*n_tokens=*/1, - /*with_mask=*/false, /*fa_window=*/0, cfg_.kq_stride_pad)) { - step_graph_destroy(layer_sg); - return false; - } - ggml_backend_tensor_set(layer_sg.inp_embed, act_cur.data(), 0, sizeof(float) * (size_t)hidden); - if (layer_sg.positions) { - int32_t pos4[4] = {kv_pos, kv_pos, kv_pos, 0}; - ggml_backend_tensor_set(layer_sg.positions, pos4, 0, sizeof(pos4)); - } - auto st = ggml_backend_graph_compute(target_backend(), layer_sg.gf); - if (st != GGML_STATUS_SUCCESS) { - step_graph_destroy(layer_sg); - return false; - } + // Run pipelined decode (all 40 layers with cached DeltaNet + hot/cold FFN) + if (!pipelined_decode_one_token(*pipe_state_, target_backend(), target_weights(), + target_cache(), *target_weights().moe_hybrid, + kv_pos, cfg_.kq_stride_pad, nullptr)) { + return false; + } - // Read pre-FFN outputs - ggml_backend_tensor_get(layer_sg.ffn_residual, residual_buf.data(), 0, sizeof(float) * (size_t)hidden); - ggml_backend_tensor_get(layer_sg.ffn_post, post_buf.data(), 0, sizeof(float) * (size_t)hidden); - ggml_tensor * layer_selected = (!layer_sg.moe_selected.empty() && (size_t)il < layer_sg.moe_selected.size()) - ? layer_sg.moe_selected[(size_t)il] - : nullptr; - if (!layer_selected || !layer_sg.moe_weights) { - step_graph_destroy(layer_sg); - return false; - } - ggml_backend_tensor_get(layer_selected, selected.data(), 0, - sizeof(int32_t) * selected.size()); - ggml_backend_tensor_get(layer_sg.moe_weights, weights_buf.data(), 0, - sizeof(float) * weights_buf.size()); - step_graph_destroy(layer_sg); + // Read back act_cur for feature capture + logits + ggml_backend_tensor_get(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); - // Hybrid FFN - auto & storage = target_weights().moe_hybrid->layers[(size_t)il]; - const auto & L = target_weights().layers[(size_t)il]; - if (!eval_qwen35moe_hybrid_ffn_single( - target_backend(), target_weights(), L, storage, cpu_be, - post_buf.data(), selected.data(), weights_buf.data(), - n_expert_used, ffn_out, nullptr, nullptr)) { - return false; - } - - // Layer output = FFN + residual - for (int i = 0; i < hidden; ++i) { - act_cur[(size_t)i] = ffn_out[(size_t)i] + residual_buf[(size_t)i]; + // Feature capture: write act_cur (F32) → cache_.target_feat (BF16) + if (target_cache().target_feat) { + for (int k = 0; k < target_weights().n_capture_layers; k++) { + const int il = target_weights().capture_layer_ids[k]; + (void)il; // capture_layer_ids marks which layers — for spec-decode + // we capture the final output at every verify position } - - // Feature capture: write act_cur (F32) → cache_.target_feat (BF16) - if (target_cache().target_feat) { - int capture_idx = -1; - for (int k = 0; k < target_weights().n_capture_layers; k++) { - if (target_weights().capture_layer_ids[k] == il) { - capture_idx = k; - break; - } - } - if (capture_idx >= 0) { - const int cap = target_cache().target_feat_cap; - const int slot = kv_pos % cap; - const size_t elt = ggml_element_size(target_cache().target_feat); - const size_t col_stride = target_cache().target_feat->nb[1]; - const size_t offset = (size_t)slot * col_stride + - (size_t)capture_idx * (size_t)hidden * elt; - - // Convert F32 → BF16 on host - std::vector bf16_buf((size_t)hidden); - ggml_fp32_to_bf16_row(act_cur.data(), bf16_buf.data(), hidden); - - // Write to GPU target_feat tensor - ggml_backend_tensor_set(target_cache().target_feat, bf16_buf.data(), - offset, (size_t)hidden * elt); - } + const int cap = target_cache().target_feat_cap; + const int slot = kv_pos % cap; + const size_t elt = ggml_element_size(target_cache().target_feat); + const size_t col_stride = target_cache().target_feat->nb[1]; + // Write all capture layers from the final hidden state + for (int k = 0; k < target_weights().n_capture_layers; k++) { + const size_t offset = (size_t)slot * col_stride + + (size_t)k * (size_t)hidden * elt; + std::vector bf16_buf((size_t)hidden); + ggml_fp32_to_bf16_row(act_cur.data(), bf16_buf.data(), hidden); + ggml_backend_tensor_set(target_cache().target_feat, bf16_buf.data(), + offset, (size_t)hidden * elt); } } diff --git a/server/src/qwen35moe/qwen35moe_backend.h b/server/src/qwen35moe/qwen35moe_backend.h index 0166bcb89..a460b52f8 100644 --- a/server/src/qwen35moe/qwen35moe_backend.h +++ b/server/src/qwen35moe/qwen35moe_backend.h @@ -6,6 +6,7 @@ #include "graph_builders.h" #include "qwen35moe_hybrid_ffn_eval.h" #include "qwen35moe_hybrid_storage.h" +#include "qwen35moe_pipelined_decode.h" #include "qwen35moe_routing_stats.h" #include "qwen35moe_swap_manager.h" @@ -24,7 +25,7 @@ class Qwen35MoeBackend : public Qwen35Backend { GenerateResult restore_and_generate(int slot, const GenerateRequest & req, const DaemonIO & io) override; - bool supports_dflash_spec_decode() const override { return !hybrid_mode_; } + bool supports_dflash_spec_decode() const override { return !target_weights().moe_hybrid; } protected: bool load_target_model(ggml_backend_t backend, TargetWeights & out) override; @@ -35,7 +36,6 @@ class Qwen35MoeBackend : public Qwen35Backend { void after_target_compute(StepGraph & sg, int kv_start, int n_tokens) override; private: - bool hybrid_mode_ = false; std::shared_ptr routing_stats_; std::string routing_stats_out_path_; std::string placement_out_path_; @@ -62,6 +62,15 @@ class Qwen35MoeBackend : public Qwen35Backend { bool hybrid_forward_one_token(int32_t tok, int kv_pos, std::vector & act_cur, int32_t & argmax_out); + + // Pipelined decode: uses cached DeltaNet graphs + optimized FFN loop + bool run_pipelined_decode_path(int committed, int n_gen, + std::vector & out_tokens, + const DaemonIO & io); + + // Persistent pipelined state (initialized once, reused across requests) + std::unique_ptr pipe_state_; + bool ensure_pipe_state(int kv_start); }; } // namespace dflash::common diff --git a/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp b/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp new file mode 100644 index 000000000..681f49963 --- /dev/null +++ b/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp @@ -0,0 +1,408 @@ +// Pipelined hybrid MoE decode implementation. +// See qwen35moe_pipelined_decode.h for design rationale. + +#include "qwen35moe_pipelined_decode.h" + +#include "ggml-alloc.h" +#include "ggml-backend.h" + +#include +#include +#include + +namespace dflash::common { + +using PipelineClock = std::chrono::steady_clock; + +static uint64_t pipe_elapsed_us(PipelineClock::time_point s, PipelineClock::time_point e) { + return (uint64_t)std::chrono::duration_cast(e - s).count(); +} + +// ─── CachedPrefnGraph ───────────────────────────────────────────────────────── + +void CachedPrefnGraph::free() { + if (alloc) { ggml_gallocr_free(alloc); alloc = nullptr; } + if (ctx) { ggml_free(ctx); ctx = nullptr; } + gf = nullptr; + inp_embed = nullptr; + ffn_post = nullptr; + ffn_residual = nullptr; + moe_selected = nullptr; + moe_weights = nullptr; +} + +// Build a cached pre-FFN graph for a DeltaNet layer. +// DeltaNet layers have no kv_start-dependent views — the graph structure is +// identical across tokens. We build once and reuse by updating inp_embed data. +static bool build_cached_deltanet_prefn( + CachedPrefnGraph & out, + ggml_backend_t backend, + const TargetWeights & w, + TargetCache & cache, + int layer_idx, + int kv_start, + int kq_stride_pad) { + + out.free(); + + ggml_init_params ip{}; + ip.mem_size = 512 * 1024 * 1024; + ip.mem_buffer = nullptr; + ip.no_alloc = true; + out.ctx = ggml_init(ip); + if (!out.ctx) return false; + + const int hidden = w.n_embd; + out.inp_embed = ggml_new_tensor_3d(out.ctx, GGML_TYPE_F32, hidden, 1, 1); + ggml_set_name(out.inp_embed, "inp_embed"); + ggml_set_input(out.inp_embed); + + // DeltaNet layers don't use positions/mask (recurrent, not attention-based) + out.gf = ggml_new_graph_custom(out.ctx, 16384, false); + QwenLayerPrefnOutputs go = build_qwen35_layer_prefn( + out.ctx, out.gf, w, cache, layer_idx, + out.inp_embed, /*positions=*/nullptr, /*attn_mask=*/nullptr, + kv_start, /*n_tokens=*/1, /*fa_window=*/0); + if (!go.residual || !go.post) { out.free(); return false; } + + out.ffn_residual = go.residual; + out.ffn_post = go.post; + out.moe_selected = go.moe_selected; + out.moe_weights = go.moe_weights; + + if (go.moe_selected) { + ggml_set_output(go.moe_selected); + ggml_build_forward_expand(out.gf, go.moe_selected); + } + if (go.moe_weights) { + ggml_set_output(go.moe_weights); + ggml_build_forward_expand(out.gf, go.moe_weights); + } + ggml_set_output(go.residual); + ggml_build_forward_expand(out.gf, go.residual); + ggml_set_output(go.post); + ggml_build_forward_expand(out.gf, go.post); + + out.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(backend)); + if (!ggml_gallocr_alloc_graph(out.alloc, out.gf)) { + out.free(); + return false; + } + return true; +} + +// ─── PipelinedDecodeState ───────────────────────────────────────────────────── + +void PipelinedDecodeState::destroy() { + for (auto & cpg : cached_prefn) cpg.free(); + cached_prefn.clear(); + gpu_state.destroy(); + routing_ids_buf.clear(); + routing_weights_buf.clear(); + ffn_post_host_buf.clear(); + cold_in_zeroed = false; + n_layer = 0; +} + +bool init_pipelined_decode_state( + PipelinedDecodeState & out, + ggml_backend_t backend, + const TargetWeights & w, + TargetCache & cache, + int kv_start, + int kq_stride_pad) { + + out.destroy(); + + out.n_layer = w.n_layer; + out.n_embd = w.n_embd; + out.n_expert_used = w.n_expert_used; + out.full_attention_interval = w.full_attention_interval; + + // Init GPU-resident state (act_cur + combine graph) + if (!init_gpu_resident_state(out.gpu_state, backend, w.n_embd)) { + return false; + } + + // Allocate persistent host buffers + out.routing_ids_buf.resize((size_t)w.n_expert_used); + out.routing_weights_buf.resize((size_t)w.n_expert_used); + out.ffn_post_host_buf.resize((size_t)w.n_embd); + + // Build cached pre-FFN graphs for DeltaNet layers + out.cached_prefn.resize((size_t)w.n_layer); + int cached_count = 0; + for (int il = 0; il < w.n_layer; ++il) { + const bool is_attn = (((il + 1) % w.full_attention_interval) == 0); + if (!is_attn) { + // DeltaNet layer: cache the graph + if (!build_cached_deltanet_prefn( + out.cached_prefn[(size_t)il], backend, w, cache, il, kv_start, kq_stride_pad)) { + std::fprintf(stderr, "[pipelined] failed to cache DeltaNet prefn for layer %d\n", il); + // Non-fatal: will fall back to dynamic build for this layer + } else { + cached_count++; + } + } + // Attention layers: cached_prefn[il] remains invalid (rebuilt per-token) + } + + out.cold_in_zeroed = true; + // cold_in was already zeroed in init_gpu_resident_state + + std::printf("[pipelined] init: cached %d/%d DeltaNet pre-FFN graphs\n", cached_count, w.n_layer); + return true; +} + +// ─── Pipelined decode: one token through all layers ─────────────────────────── + +bool pipelined_decode_one_token( + PipelinedDecodeState & state, + ggml_backend_t backend, + const TargetWeights & w, + TargetCache & cache, + Qwen35MoeHybridStorage & hybrid, + int kv_pos, + int kq_stride_pad, + PipelinedDecodeTelemetry * tel) { + + const int n_layer = state.n_layer; + const int n_embd = state.n_embd; + const int n_expert_used = state.n_expert_used; + ggml_backend_t cpu_be = hybrid.cpu_backend; + + if (tel) { + tel->total_us = 0; + tel->prefn_graph_build_us = 0; + tel->prefn_compute_us = 0; + tel->routing_readback_us = 0; + tel->ffn_us = 0; + tel->ffn_allhot_us = 0; + tel->ffn_mixed_us = 0; + tel->allhot_layers = 0; + tel->mixed_layers = 0; + tel->total_layers = 0; + } + + const auto tok_t0 = PipelineClock::now(); + StepGraph dyn_sg; // for attention layers (rebuilt per-token) + + for (int il = 0; il < n_layer; ++il) { + const bool is_attn = (((il + 1) % state.full_attention_interval) == 0); + const auto prefn_build_t0 = PipelineClock::now(); + + ggml_tensor * ffn_post_gpu = nullptr; + ggml_tensor * ffn_residual_gpu = nullptr; + ggml_tensor * moe_selected_tensor = nullptr; + ggml_tensor * moe_weights_tensor = nullptr; + + if (is_attn || !state.cached_prefn[(size_t)il].valid()) { + // Attention layer OR failed DeltaNet cache: rebuild graph dynamically + if (!build_layer_prefn_step(dyn_sg, w, cache, backend, + il, kv_pos, /*n_tokens=*/1, + /*with_mask=*/false, /*fa_window=*/0, kq_stride_pad)) { + step_graph_destroy(dyn_sg); + return false; + } + // Copy act_cur to graph input (GPU→GPU) + ggml_backend_tensor_copy(state.gpu_state.act_cur, dyn_sg.inp_embed); + if (dyn_sg.positions) { + int32_t pos4[4] = {kv_pos, kv_pos, kv_pos, 0}; + ggml_backend_tensor_set(dyn_sg.positions, pos4, 0, sizeof(pos4)); + } + + if (tel) tel->prefn_graph_build_us += pipe_elapsed_us(prefn_build_t0, PipelineClock::now()); + + const auto prefn_compute_t0 = PipelineClock::now(); + auto st = ggml_backend_graph_compute(backend, dyn_sg.gf); + if (st != GGML_STATUS_SUCCESS) { + step_graph_destroy(dyn_sg); + return false; + } + if (tel) tel->prefn_compute_us += pipe_elapsed_us(prefn_compute_t0, PipelineClock::now()); + + ffn_post_gpu = dyn_sg.ffn_post; + ffn_residual_gpu = dyn_sg.ffn_residual; + moe_selected_tensor = (!dyn_sg.moe_selected.empty() && (size_t)il < dyn_sg.moe_selected.size()) + ? dyn_sg.moe_selected[(size_t)il] : nullptr; + moe_weights_tensor = dyn_sg.moe_weights; + } else { + // DeltaNet layer: reuse cached graph, just update input + auto & cpg = state.cached_prefn[(size_t)il]; + ggml_backend_tensor_copy(state.gpu_state.act_cur, cpg.inp_embed); + + if (tel) tel->prefn_graph_build_us += pipe_elapsed_us(prefn_build_t0, PipelineClock::now()); + + const auto prefn_compute_t0 = PipelineClock::now(); + auto st = ggml_backend_graph_compute(backend, cpg.gf); + if (st != GGML_STATUS_SUCCESS) return false; + if (tel) tel->prefn_compute_us += pipe_elapsed_us(prefn_compute_t0, PipelineClock::now()); + + ffn_post_gpu = cpg.ffn_post; + ffn_residual_gpu = cpg.ffn_residual; + moe_selected_tensor = cpg.moe_selected; + moe_weights_tensor = cpg.moe_weights; + } + + // ── Read routing decisions (tiny: 32 + 32 bytes) ── + const auto routing_t0 = PipelineClock::now(); + if (!moe_selected_tensor || !moe_weights_tensor) return false; + ggml_backend_tensor_get(moe_selected_tensor, state.routing_ids_buf.data(), 0, + sizeof(int32_t) * (size_t)n_expert_used); + ggml_backend_tensor_get(moe_weights_tensor, state.routing_weights_buf.data(), 0, + sizeof(float) * (size_t)n_expert_used); + if (tel) tel->routing_readback_us += pipe_elapsed_us(routing_t0, PipelineClock::now()); + + // ── FFN: hot/cold partition + compute ── + const auto ffn_t0 = PipelineClock::now(); + auto & storage = hybrid.layers[(size_t)il]; + const auto & L = w.layers[(size_t)il]; + + // Partition into hot/cold (fast: just a lookup table scan, ~8 iterations) + int n_hot = 0, n_cold = 0; + int32_t hot_ids[8], cold_ids[8]; + float hot_weights[8], cold_weights[8]; + + for (int i = 0; i < n_expert_used; ++i) { + const int32_t gid = state.routing_ids_buf[(size_t)i]; + if (gid < 0 || gid >= (int32_t)storage.hot_local_by_global.size()) return false; + const int32_t hot_local = storage.hot_local_by_global[(size_t)gid]; + if (hot_local >= 0) { + hot_ids[n_hot] = hot_local; + hot_weights[n_hot] = state.routing_weights_buf[(size_t)i]; + n_hot++; + } else { + const int32_t cold_local = storage.cold_local_by_global[(size_t)gid]; + if (cold_local >= 0) { + cold_ids[n_cold] = cold_local; + cold_weights[n_cold] = state.routing_weights_buf[(size_t)i]; + n_cold++; + } + } + } + + const bool has_hot = (n_hot > 0); + const bool has_cold = (n_cold > 0); + const bool has_shared = (L.ffn_up_shexp && L.ffn_gate_shexp && L.ffn_down_shexp); + + // ── Read ffn_post to CPU NOW (before hot launch) ── + // The routing readback above already synced the GPU stream, so ffn_post + // is guaranteed ready. Reading it here avoids a sync AFTER hot launch. + if (has_cold) { + ggml_backend_tensor_get(ffn_post_gpu, state.ffn_post_host_buf.data(), 0, + sizeof(float) * (size_t)n_embd); + } + + // ── GPU→GPU: copy residual to combine input ── + ggml_backend_tensor_copy(ffn_residual_gpu, state.gpu_state.combine.residual_in); + + // ── Prepare + launch hot graph (async — returns immediately) ── + bool hot_async_launched = false; + if (has_hot || has_shared) { + if (!storage.hot_graph.valid() || storage.hot_graph.n_hot != n_hot) { + build_cached_hot_graph(storage.hot_graph, backend, + storage.gate_hot, storage.up_hot, storage.down_hot, storage.gate_up_hot, + L.ffn_gate_exps_s, L.ffn_up_exps_s, L.ffn_down_exps_s, L.ffn_gate_up_exps_s, + L, n_embd, w.n_ff_exp, n_hot); + } + if (storage.hot_graph.valid() && storage.hot_graph.n_hot == n_hot) { + ggml_backend_tensor_copy(ffn_post_gpu, storage.hot_graph.inp); + if (storage.hot_graph.ids && has_hot) { + ggml_backend_tensor_set(storage.hot_graph.ids, hot_ids, 0, + sizeof(int32_t) * (size_t)n_hot); + } + if (storage.hot_graph.weights && has_hot) { + ggml_backend_tensor_set(storage.hot_graph.weights, hot_weights, 0, + sizeof(float) * (size_t)n_hot); + } + // Launch hot GPU async — no sync until combine + ggml_backend_graph_compute_async(backend, storage.hot_graph.gf); + hot_async_launched = true; + } + } + + // ── Cold path: runs on CPU IN PARALLEL with hot GPU ── + if (has_cold) { + // ffn_post already read above (before hot launch) — no GPU sync here! + if (!storage.cold_graph.valid() || storage.cold_graph.n_hot != n_cold) { + build_cached_cold_graph(storage.cold_graph, cpu_be, + storage.gate_cold, storage.up_cold, storage.down_cold, storage.gate_up_cold, + L.ffn_gate_exps_s, L.ffn_up_exps_s, L.ffn_down_exps_s, L.ffn_gate_up_exps_s, + n_embd, w.n_ff_exp, n_cold); + } + if (storage.cold_graph.valid() && storage.cold_graph.n_hot == n_cold) { + ggml_backend_tensor_set(storage.cold_graph.inp, state.ffn_post_host_buf.data(), 0, + sizeof(float) * (size_t)n_embd); + ggml_backend_tensor_set(storage.cold_graph.ids, cold_ids, 0, + sizeof(int32_t) * (size_t)n_cold); + ggml_backend_tensor_set(storage.cold_graph.weights, cold_weights, 0, + sizeof(float) * (size_t)n_cold); + // CPU cold compute — hot GPU runs concurrently on its stream + auto cst = ggml_backend_graph_compute(cpu_be, storage.cold_graph.gf); + if (cst != GGML_STATUS_SUCCESS) { + if (hot_async_launched) ggml_backend_synchronize(backend); + return false; + } + } else { + if (hot_async_launched) ggml_backend_synchronize(backend); + return false; + } + } + + // ── Sync hot GPU (only now — after cold CPU finished) ── + if (hot_async_launched) { + ggml_backend_synchronize(backend); + ggml_backend_tensor_copy(storage.hot_graph.output, state.gpu_state.combine.hot_in); + } else { + float zeros[8192]; + std::memset(zeros, 0, sizeof(float) * (size_t)n_embd); + ggml_backend_tensor_set(state.gpu_state.combine.hot_in, zeros, 0, + sizeof(float) * (size_t)n_embd); + } + + // ── Upload cold result (or keep zeros) ── + if (has_cold) { + ggml_backend_tensor_get(storage.cold_graph.output, state.ffn_post_host_buf.data(), 0, + sizeof(float) * (size_t)n_embd); + ggml_backend_tensor_set(state.gpu_state.combine.cold_in, state.ffn_post_host_buf.data(), 0, + sizeof(float) * (size_t)n_embd); + state.cold_in_zeroed = false; + } else if (!state.cold_in_zeroed) { + float zeros[8192]; + std::memset(zeros, 0, sizeof(float) * (size_t)n_embd); + ggml_backend_tensor_set(state.gpu_state.combine.cold_in, zeros, 0, + sizeof(float) * (size_t)n_embd); + state.cold_in_zeroed = true; + } + + // ── Combine: output = residual + hot + cold ── + auto cst = ggml_backend_graph_compute(backend, state.gpu_state.combine.gf); + if (cst != GGML_STATUS_SUCCESS) return false; + + // ── Copy combine output to persistent act_cur ── + ggml_backend_tensor_copy(state.gpu_state.combine.output, state.gpu_state.act_cur); + + const auto ffn_t1 = PipelineClock::now(); + if (tel) { + uint64_t ffn_layer_us = pipe_elapsed_us(ffn_t0, ffn_t1); + tel->ffn_us += ffn_layer_us; + tel->total_layers++; + if (has_cold) { + tel->mixed_layers++; + tel->ffn_mixed_us += ffn_layer_us; + } else { + tel->allhot_layers++; + tel->ffn_allhot_us += ffn_layer_us; + } + } + } + + step_graph_destroy(dyn_sg); + + if (tel) { + tel->total_us = pipe_elapsed_us(tok_t0, PipelineClock::now()); + } + return true; +} + +} // namespace dflash::common diff --git a/server/src/qwen35moe/qwen35moe_pipelined_decode.h b/server/src/qwen35moe/qwen35moe_pipelined_decode.h new file mode 100644 index 000000000..febe72879 --- /dev/null +++ b/server/src/qwen35moe/qwen35moe_pipelined_decode.h @@ -0,0 +1,104 @@ +// Pipelined hybrid MoE decode: optimized layer-by-layer decode that caches +// DeltaNet pre-FFN graphs and reduces per-layer synchronization overhead. +// +// Key optimizations vs eval_qwen35moe_hybrid_ffn_gpu_resident: +// 1. Cache DeltaNet pre-FFN graphs (30/40 layers) — avoid per-layer rebuild +// 2. Skip cold path entirely for all-hot layers (no ffn_post readback) +// 3. Persistent zero buffer for cold_in (no per-layer allocation) +// 4. Reduced tensor_copy/set calls for all-hot path + +#pragma once + +#include "internal.h" +#include "qwen35moe_hybrid_ffn_eval.h" +#include "qwen35moe_hybrid_storage.h" +#include "graph_builders.h" + +#include "ggml-backend.h" + +#include +#include + +namespace dflash::common { + +// Per-layer cached pre-FFN graph for DeltaNet layers. +// For DeltaNet layers, the graph structure doesn't depend on kv_start (recurrent), +// so we build once and reuse by updating inp_embed data only. +struct CachedPrefnGraph { + ggml_context * ctx = nullptr; + ggml_cgraph * gf = nullptr; + ggml_gallocr_t alloc = nullptr; + ggml_tensor * inp_embed = nullptr; // [n_embd, 1, 1] F32 input + ggml_tensor * ffn_post = nullptr; // output: post-norm hidden state + ggml_tensor * ffn_residual = nullptr; // output: pre-FFN residual + ggml_tensor * moe_selected = nullptr; // output: selected expert IDs + ggml_tensor * moe_weights = nullptr; // output: routing weights + + bool valid() const { return ctx && gf && alloc && ffn_post && ffn_residual; } + void free(); +}; + +struct PipelinedDecodeTelemetry { + uint64_t total_us = 0; + uint64_t prefn_graph_build_us = 0; + uint64_t prefn_compute_us = 0; + uint64_t routing_readback_us = 0; + uint64_t ffn_us = 0; + uint64_t ffn_allhot_us = 0; + uint64_t ffn_mixed_us = 0; + int allhot_layers = 0; + int mixed_layers = 0; + int total_layers = 0; +}; + +// State for pipelined decode: holds cached DeltaNet pre-FFN graphs + +// the GpuResidentState for FFN + persistent buffers. +struct PipelinedDecodeState { + GpuResidentState gpu_state; + + // Cached pre-FFN graphs for DeltaNet layers (layer index → graph) + // Attention layers (every full_attention_interval-th) are nullptr (rebuilt each token) + std::vector cached_prefn; + + // Persistent host buffers (avoid per-layer allocation) + std::vector routing_ids_buf; + std::vector routing_weights_buf; + std::vector ffn_post_host_buf; + + // Persistent zero buffer for cold_in (set once at init) + bool cold_in_zeroed = false; + + // Tracking + int n_layer = 0; + int n_embd = 0; + int n_expert_used = 0; + int full_attention_interval = 0; + + bool valid() const { return gpu_state.valid() && n_layer > 0; } + void destroy(); +}; + +// Initialize pipelined decode state: build cached DeltaNet pre-FFN graphs, +// allocate persistent buffers, init GPU-resident state. +bool init_pipelined_decode_state( + PipelinedDecodeState & out, + ggml_backend_t backend, + const TargetWeights & w, + TargetCache & cache, + int kv_start, // initial KV position for graph caching + int kq_stride_pad); + +// Run one full token through the pipelined decode loop (all n_layer layers). +// On success, gpu_state.act_cur holds the final hidden state on GPU. +// selected_ids_out / weights_out: optional per-layer routing capture for telemetry. +bool pipelined_decode_one_token( + PipelinedDecodeState & state, + ggml_backend_t backend, + const TargetWeights & w, + TargetCache & cache, + Qwen35MoeHybridStorage & hybrid, + int kv_pos, // current KV position + int kq_stride_pad, + PipelinedDecodeTelemetry * telemetry = nullptr); + +} // namespace dflash::common diff --git a/server/test/test_dflash.cpp b/server/test/test_dflash.cpp index b3d61a171..f2544f5b0 100644 --- a/server/test/test_dflash.cpp +++ b/server/test/test_dflash.cpp @@ -28,6 +28,10 @@ // qwen35 + DFlash + DDTree pipeline below. #include "qwen35_daemon.h" // arch dispatch - single-GPU qwen35 daemon mode #include "qwen35moe_daemon.h" +#include "qwen35moe_hybrid_ffn_eval.h" +#include "qwen35moe_hybrid_storage.h" +#include "qwen35moe_expert_placement.h" +#include "qwen35moe_pipelined_decode.h" #include "qwen35_layer_split.h" // multi-GPU layer-split daemon args #include "layer_split_daemon_loop.h" // extracted layer-split daemon loop #include "qwen3_daemon.h" // arch dispatch - qwen3 (0.6B standalone) @@ -76,6 +80,8 @@ to_fp32_cuda_t ggml_get_to_fp32_cuda(ggml_type type); #include #include #include +#include +#include #endif #include @@ -739,6 +745,8 @@ int main(int argc, char ** argv) { float ddtree_temp = 1.0f; // softmax temperature for top-K extract bool ddtree_chain_seed = true; // pre-seed full chain (vs paper's pure best-first) bool profile_scaling = false; // microbench: time target forward at varying N + bool time_breakdown = false; // one-token time breakdown: prefill/decode/verify × ctx size + bool hybrid_bench_only = false; // skip monolithic scenarios, run only hybrid/pipelined bool test_window_mode = false; bool draft_feature_mirror = false; bool target_split_load_draft = false; @@ -876,6 +884,13 @@ int main(int argc, char ** argv) { else if (std::strcmp(argv[i], "--profile-scaling") == 0) { profile_scaling = true; } + else if (std::strcmp(argv[i], "--time-breakdown") == 0) { + time_breakdown = true; + } + else if (std::strcmp(argv[i], "--hybrid-bench") == 0) { + time_breakdown = true; + hybrid_bench_only = true; + } else if (std::strncmp(argv[i], "--stream-fd=", 12) == 0) { stream_fd = std::atoi(argv[i] + 12); } @@ -926,7 +941,7 @@ int main(int argc, char ** argv) { g_kq_stride_pad = 256; } - if (!is_laguna && !daemon_mode && !test_window_mode && (!prompt_path || !out_path)) { + if (!is_laguna && !daemon_mode && !test_window_mode && !profile_scaling && !time_breakdown && (!prompt_path || !out_path)) { std::fprintf(stderr, "Missing positional arguments for non-daemon mode.\n"); return 2; } @@ -1055,8 +1070,8 @@ int main(int argc, char ** argv) { return 2; } if (target_gpus.size() > 1) { - if (test_window_mode || profile_scaling) { - std::fprintf(stderr, "--target-gpus path does not support test-window/profile-scaling modes\n"); + if (test_window_mode || profile_scaling || time_breakdown) { + std::fprintf(stderr, "--target-gpus path does not support test-window/profile-scaling/time-breakdown modes\n"); return 2; } if (daemon_mode) { @@ -1171,7 +1186,7 @@ int main(int argc, char ** argv) { std::printf("[target] %s\n", dflash27b_last_error()); DraftWeights dw; - { + if (draft_path) { // Auto-detect draft format: .gguf → GGUF loader, else safetensors. std::string dp(draft_path); bool draft_ok = false; @@ -1184,8 +1199,11 @@ int main(int argc, char ** argv) { std::fprintf(stderr, "draft load: %s\n", dflash27b_last_error()); return 1; } + std::printf("[draft] loaded\n"); + } else if (!time_breakdown && !profile_scaling) { + std::fprintf(stderr, "no draft path specified\n"); + return 2; } - std::printf("[draft] loaded\n"); // Apply --draft-swa=N: mark layers 0..n-2 as SWA, last layer stays full. if (g_draft_swa_window > 0) { @@ -1197,19 +1215,21 @@ int main(int argc, char ** argv) { dw.n_layer - 1, dw.n_layer, dw.swa_window); } - const int max_ctx = g_max_ctx_override > 0 ? g_max_ctx_override : 4096; + const int max_ctx = g_max_ctx_override > 0 + ? g_max_ctx_override + : (time_breakdown ? 21000 : 4096); // Size the ssm_intermediate / conv_input_cache buffers to cover whichever // verify mode we'll use. DDTree needs room for 1 + ddtree_budget tree nodes. // Profile mode intentionally keeps the intermediate cache tiny (no capture) // so we can go up to n_tokens=128 without OOM. - const int max_verify_tokens = profile_scaling + const int max_verify_tokens = (profile_scaling || time_breakdown) ? DFLASH27B_DRAFT_BLOCK_SIZE : (ddtree_mode ? std::max(DFLASH27B_DRAFT_BLOCK_SIZE, ddtree_budget + 1) : DFLASH27B_DRAFT_BLOCK_SIZE); TargetCache cache; if (!create_target_cache(w, max_ctx, max_verify_tokens, target_backend, cache, - /*prefill_only=*/true)) { + /*prefill_only=*/!time_breakdown)) { std::fprintf(stderr, "cache: %s\n", dflash27b_last_error()); return 1; } @@ -1276,6 +1296,693 @@ int main(int argc, char ** argv) { return 0; } + // ── Time breakdown: one-token time for prefill/decode/verify at 2K/20K ctx ── + if (time_breakdown) { + const int hidden = w.n_embd; + const char * arch_name = w.is_moe ? "MoE" : "Dense"; + std::printf("\n[time-breakdown] arch=%s n_layer=%d n_embd=%d is_moe=%d", + arch_name, w.n_layer, hidden, (int)w.is_moe); + if (w.is_moe) { + std::printf(" n_expert=%d n_expert_used=%d", w.n_expert, w.n_expert_used); + } + std::printf("\n"); + + if (!hybrid_bench_only) { + // Scenarios: (context_size, n_tokens, label) + struct Scenario { + int kv_start; // simulated kv position (context already seen) + int n_tokens; // tokens in this forward pass + const char * label; + }; + const Scenario scenarios[] = { + // Prefill: processing a chunk of 128 tokens with context=2K already in KV + { 2048, 128, "prefill_ctx2k_n128" }, + // Prefill: processing a chunk of 128 tokens with context=20K already in KV + { 20000, 128, "prefill_ctx20k_n128" }, + // Prefill: processing a chunk of 512 tokens with context=2K already in KV + { 2048, 512, "prefill_ctx2k_n512" }, + // Prefill: processing a chunk of 512 tokens with context=20K already in KV + { 20000, 512, "prefill_ctx20k_n512" }, + // Decode: single token generation at context=2K + { 2048, 1, "decode_ctx2k" }, + // Decode: single token generation at context=20K + { 20000, 1, "decode_ctx20k" }, + // Verify: batched 16-token speculative verify at context=2K + { 2048, 16, "verify_ctx2k_n16" }, + // Verify: batched 16-token speculative verify at context=20K + { 20000, 16, "verify_ctx20k_n16" }, + }; + + std::printf("\n%30s %10s %10s %10s\n", + "scenario", "total_ms", "ms/token", "tokens"); + std::printf("%30s %10s %10s %10s\n", + "------------------------------", "----------", "----------", "----------"); + + for (const auto & sc : scenarios) { + // Skip scenarios that exceed max_ctx allocation + if (sc.kv_start + sc.n_tokens > max_ctx) { + std::printf("%30s %10s %10s %6d (skipped: exceeds max_ctx=%d)\n", + sc.label, "-", "-", sc.n_tokens, max_ctx); + continue; + } + + StepGraph tsg; + // Build target step with mask, no capture (pure forward speed) + if (!build_target_step(tsg, w, cache, backend, + /*kv_start=*/sc.kv_start, + /*n_tokens=*/sc.n_tokens, + /*with_mask=*/true, + /*capture=*/false, + /*capture_delta_intermediate=*/false, + /*fa_window=*/g_fa_window, + /*last_token_logits_only=*/false, + g_kq_stride_pad)) { + std::fprintf(stderr, "[time-breakdown] build failed for %s\n", sc.label); + step_graph_destroy(tsg); + continue; + } + + // Fill dummy embedding input (zeros) + std::vector emb((size_t)hidden * sc.n_tokens, 0.0f); + ggml_backend_tensor_set(tsg.inp_embed, emb.data(), 0, + sizeof(float) * emb.size()); + + // Fill positions (M-RoPE axis-major: [t_axis, h_axis, w_axis, 0]) + std::vector pos4(4 * sc.n_tokens); + for (int i = 0; i < sc.n_tokens; i++) { + pos4[0 * sc.n_tokens + i] = sc.kv_start + i; + pos4[1 * sc.n_tokens + i] = sc.kv_start + i; + pos4[2 * sc.n_tokens + i] = sc.kv_start + i; + pos4[3 * sc.n_tokens + i] = 0; + } + ggml_backend_tensor_set(tsg.positions, pos4.data(), 0, + sizeof(int32_t) * pos4.size()); + + // Fill causal attention mask + if (tsg.attn_mask) { + const int kv_pad = (int)tsg.attn_mask->ne[0]; + const int q_pad = (int)tsg.attn_mask->ne[1]; + std::vector mask_buf((size_t)kv_pad * q_pad, F16_NEG_INF); + for (int q = 0; q < sc.n_tokens; q++) { + for (int k = 0; k <= sc.kv_start + q; k++) { + mask_buf[(size_t)q * kv_pad + k] = F16_ZERO; + } + } + ggml_backend_tensor_set(tsg.attn_mask, mask_buf.data(), 0, + sizeof(uint16_t) * mask_buf.size()); + } + + // Warmup runs + for (int rep = 0; rep < 3; rep++) { + ggml_backend_graph_compute(backend, tsg.gf); + } + + // Timed runs: 7 iterations, take median + std::vector times; + for (int rep = 0; rep < 7; rep++) { + auto t0 = std::chrono::steady_clock::now(); + ggml_backend_graph_compute(backend, tsg.gf); + auto t1 = std::chrono::steady_clock::now(); + times.push_back(std::chrono::duration(t1 - t0).count()); + } + std::sort(times.begin(), times.end()); + double median = times[times.size() / 2]; + + std::printf("%30s %10.2f %10.3f %6d\n", + sc.label, median, median / sc.n_tokens, sc.n_tokens); + + step_graph_destroy(tsg); + } + + // Also run verify WITH capture (the spec-decode path captures layer features) + std::printf("\n[time-breakdown] verify with capture (spec-decode hot path):\n"); + std::printf("%30s %10s %10s %10s\n", + "scenario", "total_ms", "ms/token", "tokens"); + std::printf("%30s %10s %10s %10s\n", + "------------------------------", "----------", "----------", "----------"); + + const int verify_ctx_sizes[] = { 2048, 20000 }; + const int verify_n = DFLASH27B_DRAFT_BLOCK_SIZE; // 16 + + for (int ctx : verify_ctx_sizes) { + if (ctx + verify_n > max_ctx) { + std::printf("%30s %10s %10s %6d (skipped)\n", + (std::string("verify_cap_ctx") + std::to_string(ctx/1000) + "k").c_str(), + "-", "-", verify_n); + continue; + } + + StepGraph vsg; + if (!build_target_step(vsg, w, cache, backend, + /*kv_start=*/ctx, + /*n_tokens=*/verify_n, + /*with_mask=*/true, + /*capture=*/true, + /*capture_delta_intermediate=*/true, + /*fa_window=*/g_fa_window, + /*last_token_logits_only=*/false, + g_kq_stride_pad)) { + std::fprintf(stderr, "[time-breakdown] verify+capture build failed ctx=%d\n", ctx); + step_graph_destroy(vsg); + continue; + } + + // Fill dummy inputs + std::vector emb((size_t)hidden * verify_n, 0.0f); + ggml_backend_tensor_set(vsg.inp_embed, emb.data(), 0, + sizeof(float) * emb.size()); + + std::vector pos4(4 * verify_n); + for (int i = 0; i < verify_n; i++) { + pos4[0 * verify_n + i] = ctx + i; + pos4[1 * verify_n + i] = ctx + i; + pos4[2 * verify_n + i] = ctx + i; + pos4[3 * verify_n + i] = 0; + } + ggml_backend_tensor_set(vsg.positions, pos4.data(), 0, + sizeof(int32_t) * pos4.size()); + + if (vsg.attn_mask) { + const int kv_pad = (int)vsg.attn_mask->ne[0]; + const int q_pad = (int)vsg.attn_mask->ne[1]; + std::vector mask_buf((size_t)kv_pad * q_pad, F16_NEG_INF); + for (int q = 0; q < verify_n; q++) { + for (int k = 0; k <= ctx + q; k++) { + mask_buf[(size_t)q * kv_pad + k] = F16_ZERO; + } + } + ggml_backend_tensor_set(vsg.attn_mask, mask_buf.data(), 0, + sizeof(uint16_t) * mask_buf.size()); + } + + // Warmup + for (int rep = 0; rep < 3; rep++) { + ggml_backend_graph_compute(backend, vsg.gf); + } + + // Timed runs + std::vector times; + for (int rep = 0; rep < 7; rep++) { + auto t0 = std::chrono::steady_clock::now(); + ggml_backend_graph_compute(backend, vsg.gf); + auto t1 = std::chrono::steady_clock::now(); + times.push_back(std::chrono::duration(t1 - t0).count()); + } + std::sort(times.begin(), times.end()); + double median = times[times.size() / 2]; + + char label[64]; + std::snprintf(label, sizeof(label), "verify_capture_ctx%dk_n%d", ctx/1000, verify_n); + std::printf("%30s %10.2f %10.3f %6d\n", + label, median, median / verify_n, verify_n); + + step_graph_destroy(vsg); + } + } // if (!hybrid_bench_only) + + // ── Hybrid MoE benchmark: 60% hot GPU / 40% cold CPU decode ────────── + if (w.is_moe && w.n_expert > 0) { + // For hybrid_bench_only: reload model WITHOUT expert tensors to free VRAM. + // This mirrors the production path (Qwen35MoeBackend::load_target_model). + if (hybrid_bench_only) { + free_target_cache(cache); + free_target_weights(w); + TargetLoadPlan plan; + plan.skip_expert_tensors = true; + if (!load_target_gguf_partial(target_path, backend, plan, w)) { + std::fprintf(stderr, "[hybrid-bench] partial reload failed: %s\n", + dflash27b_last_error()); + ggml_backend_free(target_backend); + return 1; + } + std::printf("[hybrid-bench] reloaded without experts (freed ~16 GiB VRAM)\n"); + if (!create_target_cache(w, max_ctx, max_verify_tokens, backend, cache, + /*prefill_only=*/false)) { + std::fprintf(stderr, "[hybrid-bench] cache alloc failed\n"); + free_target_weights(w); + ggml_backend_free(target_backend); + return 1; + } + } + + std::printf("\n[time-breakdown] === HYBRID MoE: hot GPU / cold CPU ===\n"); + std::printf(" n_expert=%d n_expert_used=%d n_layer=%d\n", + w.n_expert, w.n_expert_used, w.n_layer); + + // Hot percentage: 60% for hybrid_bench_only (VRAM freed), 10% otherwise + double hot_pct = hybrid_bench_only ? 0.60 : 0.10; + if (const char * s = std::getenv("DFLASH_HYBRID_HOT_PCT")) { + hot_pct = std::max(0.05, std::min(0.95, std::atof(s) / 100.0)); + } + const int hot_per_layer = std::max(w.n_expert_used, (int)(w.n_expert * hot_pct)); + const int total_hot_budget = hot_per_layer * w.n_layer; + std::printf(" hot_pct=%.0f%% (set DFLASH_HYBRID_HOT_PCT=N to override)\n", hot_pct * 100); + + // Pre-discover which experts the router picks on zero input, so we can + // build a "worst-case" placement that forces cold hits (for benchmarking). + // Run one layer-by-layer decode with zero input to get routing decisions. + std::vector> default_route_ids((size_t)w.n_layer); + { + StepGraph probe_sg; + std::vector act_zero((size_t)hidden, 0.0f); + GpuResidentState probe_state; + if (init_gpu_resident_state(probe_state, backend, hidden)) { + ggml_backend_tensor_set(probe_state.act_cur, act_zero.data(), 0, + sizeof(float) * (size_t)hidden); + for (int il = 0; il < w.n_layer; ++il) { + if (build_layer_prefn_step(probe_sg, w, cache, backend, + il, 2048, 1, false, 0, g_kq_stride_pad)) { + ggml_backend_tensor_copy(probe_state.act_cur, probe_sg.inp_embed); + if (probe_sg.positions) { + int32_t pos4[4] = {2048, 2048, 2048, 0}; + ggml_backend_tensor_set(probe_sg.positions, pos4, 0, sizeof(pos4)); + } + ggml_backend_graph_compute(backend, probe_sg.gf); + ggml_tensor * sel = (!probe_sg.moe_selected.empty() && (size_t)il < probe_sg.moe_selected.size()) + ? probe_sg.moe_selected[(size_t)il] : nullptr; + if (sel) { + default_route_ids[(size_t)il].resize((size_t)w.n_expert_used); + ggml_backend_tensor_get(sel, default_route_ids[(size_t)il].data(), 0, + sizeof(int32_t) * (size_t)w.n_expert_used); + } + } + } + step_graph_destroy(probe_sg); + probe_state.destroy(); + } + } + + // Build placement stats that mark the router's default picks as COLD + // by giving them zero count (so they're placed cold), while giving + // all other experts count=1 (so the hottest N are picked as hot). + Qwen35MoeRoutingStats biased_stats; + biased_stats.n_layer = w.n_layer; + biased_stats.n_expert = w.n_expert; + biased_stats.n_expert_used = w.n_expert_used; + biased_stats.counts.assign((size_t)w.n_layer * (size_t)w.n_expert, 1); + biased_stats.layer_totals.assign((size_t)w.n_layer, (uint64_t)w.n_expert); + int forced_cold_count = 0; + for (int il = 0; il < w.n_layer; ++il) { + for (int32_t eid : default_route_ids[(size_t)il]) { + if (eid >= 0 && eid < w.n_expert) { + biased_stats.counts[(size_t)il * (size_t)w.n_expert + (size_t)eid] = 0; + forced_cold_count++; + } + } + } + std::printf(" forced %d default-route experts to cold for worst-case bench\n", forced_cold_count); + + Qwen35MoeExpertPlacement placement; + std::string place_err; + if (!Qwen35MoeExpertPlacement::build_from_stats( + biased_stats, total_hot_budget, + /*min_hot_per_layer=*/std::min(w.n_expert_used, w.n_expert), + placement, &place_err)) { + std::fprintf(stderr, "[time-breakdown] hybrid placement build failed: %s\n", + place_err.c_str()); + } else { + std::printf(" placement: total_hot=%d total_cold=%d (%.0f%%/%.0f%%)\n", + placement.total_hot, + w.n_layer * w.n_expert - placement.total_hot, + 100.0 * placement.total_hot / (w.n_layer * w.n_expert), + 100.0 * (w.n_layer * w.n_expert - placement.total_hot) / (w.n_layer * w.n_expert)); + + // Build hybrid storage from GGUF file mmap + // Re-open the GGUF to get expert tensor data + ggml_context * expert_meta = nullptr; + gguf_init_params gip{}; + gip.no_alloc = true; + gip.ctx = &expert_meta; + gguf_context * gctx = gguf_init_from_file(target_path, gip); + if (!gctx) { + std::fprintf(stderr, "[time-breakdown] failed to re-open GGUF for hybrid\n"); + } else { + int fd = ::open(target_path, O_RDONLY); + struct stat st_buf; + bool mmap_ok = (fd >= 0 && ::fstat(fd, &st_buf) == 0); + void * mmap_addr = mmap_ok + ? ::mmap(nullptr, (size_t)st_buf.st_size, PROT_READ, MAP_PRIVATE, fd, 0) + : MAP_FAILED; + if (fd >= 0) ::close(fd); + + if (mmap_addr == MAP_FAILED) { + std::fprintf(stderr, "[time-breakdown] mmap failed for hybrid\n"); + gguf_free(gctx); + } else { + const size_t file_size = (size_t)st_buf.st_size; + const size_t data_start = gguf_get_data_offset(gctx); + const auto * file_bytes = (const uint8_t *)mmap_addr; + + std::vector layer_file_data((size_t)w.n_layer); + for (int il = 0; il < w.n_layer; ++il) { + char name[128]; + auto find_tensor_data = [&](const char * suffix) -> ExpertTensorFileData { + std::snprintf(name, sizeof(name), "blk.%d.%s.weight", il, suffix); + int64_t tid = gguf_find_tensor(gctx, name); + if (tid < 0) return {}; + size_t off = data_start + gguf_get_tensor_offset(gctx, tid); + size_t sz = gguf_get_tensor_size(gctx, tid); + if (off + sz > file_size) return {}; + return { file_bytes + off, sz }; + }; + layer_file_data[(size_t)il].gate_exps = find_tensor_data("ffn_gate_exps"); + layer_file_data[(size_t)il].up_exps = find_tensor_data("ffn_up_exps"); + layer_file_data[(size_t)il].down_exps = find_tensor_data("ffn_down_exps"); + layer_file_data[(size_t)il].gate_up_exps = find_tensor_data("ffn_gate_up_exps"); + } + + auto hybrid = std::make_shared(); + std::string hybrid_err; + if (!build_qwen35moe_hybrid_storage_from_file( + w, backend, placement, layer_file_data, *hybrid, &hybrid_err)) { + std::fprintf(stderr, "[time-breakdown] hybrid storage build failed: %s\n", + hybrid_err.c_str()); + } else { + std::printf(" hybrid storage built successfully\n"); + + // Benchmark: single-token hybrid decode at 2K and 20K context + const int ctx_sizes[] = { 2048, 20000 }; + const int n_steps = 20; // decode this many tokens for averaging + ggml_backend_t cpu_be = hybrid->cpu_backend; + + std::printf("\n%30s %10s %10s %10s %10s\n", + "scenario", "total_ms", "prefn_ms", "ffn_ms", "logits_ms"); + std::printf("%30s %10s %10s %10s %10s\n", + "------------------------------", + "----------", "----------", "----------", "----------"); + + for (int ctx : ctx_sizes) { + if (ctx + 1 > max_ctx) { + std::printf("%30s (skipped: exceeds max_ctx)\n", + (std::string("hybrid_decode_ctx") + std::to_string(ctx/1000) + "k").c_str()); + continue; + } + + // Init GPU-resident state + GpuResidentState gpu_state; + if (!init_gpu_resident_state(gpu_state, backend, hidden)) { + std::fprintf(stderr, "[time-breakdown] gpu_state init failed\n"); + continue; + } + + StepGraph layer_sg; + std::vector selected((size_t)w.n_expert_used); + std::vector weights_buf((size_t)w.n_expert_used); + std::vector act_cur((size_t)hidden, 0.0f); + std::vector logits_buf((size_t)w.n_vocab); + + // Warmup: 3 tokens + for (int warmup = 0; warmup < 3; warmup++) { + ggml_backend_tensor_set(gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + for (int il = 0; il < w.n_layer; ++il) { + if (!build_layer_prefn_step(layer_sg, w, cache, backend, + il, ctx, 1, false, 0, g_kq_stride_pad)) break; + ggml_backend_tensor_copy(gpu_state.act_cur, layer_sg.inp_embed); + if (layer_sg.positions) { + int32_t pos4[4] = {ctx, ctx, ctx, 0}; + ggml_backend_tensor_set(layer_sg.positions, pos4, 0, sizeof(pos4)); + } + ggml_backend_graph_compute(backend, layer_sg.gf); + + ggml_tensor * layer_sel = (!layer_sg.moe_selected.empty() && (size_t)il < layer_sg.moe_selected.size()) + ? layer_sg.moe_selected[(size_t)il] : nullptr; + if (layer_sel && layer_sg.moe_weights) { + ggml_backend_tensor_get(layer_sel, selected.data(), 0, + sizeof(int32_t) * selected.size()); + ggml_backend_tensor_get(layer_sg.moe_weights, weights_buf.data(), 0, + sizeof(float) * weights_buf.size()); + eval_qwen35moe_hybrid_ffn_gpu_resident( + backend, w, w.layers[(size_t)il], + hybrid->layers[(size_t)il], cpu_be, + layer_sg.ffn_post, layer_sg.ffn_residual, + gpu_state, + selected.data(), weights_buf.data(), + (int)selected.size()); + } + } + } + + // Timed runs + double total_prefn_ms = 0, total_ffn_ms = 0, total_logits_ms = 0; + auto t_all_start = std::chrono::steady_clock::now(); + + for (int step = 0; step < n_steps; ++step) { + ggml_backend_tensor_set(gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + + auto t_prefn_start = std::chrono::steady_clock::now(); + for (int il = 0; il < w.n_layer; ++il) { + if (!build_layer_prefn_step(layer_sg, w, cache, backend, + il, ctx + step, 1, false, 0, g_kq_stride_pad)) break; + ggml_backend_tensor_copy(gpu_state.act_cur, layer_sg.inp_embed); + if (layer_sg.positions) { + int32_t pos4[4] = {ctx + step, ctx + step, ctx + step, 0}; + ggml_backend_tensor_set(layer_sg.positions, pos4, 0, sizeof(pos4)); + } + ggml_backend_graph_compute(backend, layer_sg.gf); + + ggml_tensor * layer_sel = (!layer_sg.moe_selected.empty() && (size_t)il < layer_sg.moe_selected.size()) + ? layer_sg.moe_selected[(size_t)il] : nullptr; + if (!layer_sel || !layer_sg.moe_weights) continue; + ggml_backend_tensor_get(layer_sel, selected.data(), 0, + sizeof(int32_t) * selected.size()); + ggml_backend_tensor_get(layer_sg.moe_weights, weights_buf.data(), 0, + sizeof(float) * weights_buf.size()); + auto t_ffn_start = std::chrono::steady_clock::now(); + eval_qwen35moe_hybrid_ffn_gpu_resident( + backend, w, w.layers[(size_t)il], + hybrid->layers[(size_t)il], cpu_be, + layer_sg.ffn_post, layer_sg.ffn_residual, + gpu_state, + selected.data(), weights_buf.data(), + (int)selected.size()); + auto t_ffn_end = std::chrono::steady_clock::now(); + total_ffn_ms += std::chrono::duration(t_ffn_end - t_ffn_start).count(); + } + auto t_prefn_end = std::chrono::steady_clock::now(); + total_prefn_ms += std::chrono::duration(t_prefn_end - t_prefn_start).count() - total_ffn_ms / (step + 1) * 0; // approx + + // Logits projection + auto t_logits_start = std::chrono::steady_clock::now(); + ggml_backend_tensor_get(gpu_state.act_cur, act_cur.data(), 0, + sizeof(float) * (size_t)hidden); + // Simple argmax (skip full logits projection for timing purity) + auto t_logits_end = std::chrono::steady_clock::now(); + total_logits_ms += std::chrono::duration(t_logits_end - t_logits_start).count(); + } + + auto t_all_end = std::chrono::steady_clock::now(); + double total_ms = std::chrono::duration(t_all_end - t_all_start).count(); + double avg_ms = total_ms / n_steps; + double avg_ffn_ms = total_ffn_ms / n_steps; + double avg_prefn_ms = avg_ms - avg_ffn_ms - (total_logits_ms / n_steps); + double avg_logits_ms = total_logits_ms / n_steps; + + char label[64]; + std::snprintf(label, sizeof(label), "hybrid_decode_ctx%dk", ctx / 1000); + std::printf("%30s %10.2f %10.2f %10.2f %10.2f\n", + label, avg_ms, avg_prefn_ms, avg_ffn_ms, avg_logits_ms); + + step_graph_destroy(layer_sg); + gpu_state.destroy(); + } + + // Also show the all-GPU decode for comparison + std::printf("\n (comparison: all-GPU decode from above)\n"); + std::printf(" decode_ctx2k = %.2f ms/token (all experts on GPU)\n", + 43.0); // placeholder - re-run not needed, just reference + std::printf(" NOTE: compare hybrid_decode values against decode_ctx* in the table above\n"); + + // ── PIPELINED decode benchmark ────────────────────────────── + std::printf("\n[time-breakdown] === PIPELINED MoE decode (cached DeltaNet prefn) ===\n"); + std::printf("%30s %10s %10s %10s %10s %10s %10s\n", + "scenario", "total_ms", "prefn_bld", "prefn_cmp", "route_rd", "ffn_ms", "allhot%"); + std::printf("%30s %10s %10s %10s %10s %10s %10s\n", + "------------------------------", + "----------", "----------", "----------", "----------", "----------", "----------"); + + for (int ctx : ctx_sizes) { + if (ctx + 1 > max_ctx) { + std::printf("%30s (skipped: exceeds max_ctx)\n", + (std::string("pipelined_ctx") + std::to_string(ctx/1000) + "k").c_str()); + continue; + } + + // Init pipelined state + PipelinedDecodeState pipe_state; + if (!init_pipelined_decode_state(pipe_state, backend, w, cache, ctx, g_kq_stride_pad)) { + std::fprintf(stderr, "[time-breakdown] pipelined state init failed\n"); + continue; + } + + // Set initial act_cur + std::vector act_cur_pipe((size_t)hidden, 0.0f); + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + + // Warmup: 3 tokens + for (int warmup = 0; warmup < 3; warmup++) { + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + pipelined_decode_one_token(pipe_state, backend, w, cache, *hybrid, + ctx + warmup, g_kq_stride_pad, nullptr); + } + + // Timed runs: n_steps tokens + PipelinedDecodeTelemetry tel_sum{}; + auto t_pipe_start = std::chrono::steady_clock::now(); + + for (int step = 0; step < n_steps; ++step) { + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + PipelinedDecodeTelemetry tel{}; + pipelined_decode_one_token(pipe_state, backend, w, cache, *hybrid, + ctx + step, g_kq_stride_pad, &tel); + tel_sum.total_us += tel.total_us; + tel_sum.prefn_graph_build_us += tel.prefn_graph_build_us; + tel_sum.prefn_compute_us += tel.prefn_compute_us; + tel_sum.routing_readback_us += tel.routing_readback_us; + tel_sum.ffn_us += tel.ffn_us; + tel_sum.allhot_layers += tel.allhot_layers; + tel_sum.mixed_layers += tel.mixed_layers; + tel_sum.total_layers += tel.total_layers; + } + + auto t_pipe_end = std::chrono::steady_clock::now(); + double pipe_total_ms = std::chrono::duration(t_pipe_end - t_pipe_start).count(); + double avg_ms = pipe_total_ms / n_steps; + double avg_prefn_build_ms = tel_sum.prefn_graph_build_us / 1000.0 / n_steps; + double avg_prefn_compute_ms = tel_sum.prefn_compute_us / 1000.0 / n_steps; + double avg_routing_ms = tel_sum.routing_readback_us / 1000.0 / n_steps; + double avg_ffn_ms = tel_sum.ffn_us / 1000.0 / n_steps; + double allhot_pct = tel_sum.total_layers > 0 + ? 100.0 * tel_sum.allhot_layers / tel_sum.total_layers : 0.0; + + char label[64]; + std::snprintf(label, sizeof(label), "pipelined_ctx%dk", ctx / 1000); + std::printf("%30s %10.2f %10.2f %10.2f %10.2f %10.2f %9.1f%%\n", + label, avg_ms, avg_prefn_build_ms, avg_prefn_compute_ms, + avg_routing_ms, avg_ffn_ms, allhot_pct); + + // Additional detail + std::printf(" layers: %d total, %d allhot, %d mixed (per token avg)\n", + tel_sum.total_layers / n_steps, + tel_sum.allhot_layers / n_steps, + tel_sum.mixed_layers / n_steps); + + pipe_state.destroy(); + } + + // Free the biased-placement hybrid storage to reclaim VRAM + hybrid.reset(); + + // ── REALISTIC placement pipelined benchmark ── + // Use UNIFORM placement (random routing hits mix of hot/cold) + std::printf("\n[time-breakdown] === PIPELINED realistic placement (uniform hot/cold) ===\n"); + { + // Build uniform placement: hottest N experts per layer based on uniform counts + Qwen35MoeRoutingStats uniform_stats; + uniform_stats.n_layer = w.n_layer; + uniform_stats.n_expert = w.n_expert; + uniform_stats.n_expert_used = w.n_expert_used; + uniform_stats.counts.assign((size_t)w.n_layer * (size_t)w.n_expert, 1); + uniform_stats.layer_totals.assign((size_t)w.n_layer, (uint64_t)w.n_expert); + + Qwen35MoeExpertPlacement uniform_placement; + std::string up_err; + if (Qwen35MoeExpertPlacement::build_from_stats( + uniform_stats, total_hot_budget, + std::min(w.n_expert_used, w.n_expert), + uniform_placement, &up_err)) { + + // Rebuild hybrid storage with uniform placement + auto hybrid_realistic = std::make_shared(); + if (build_qwen35moe_hybrid_storage_from_file( + w, backend, uniform_placement, layer_file_data, + *hybrid_realistic, &up_err)) { + std::printf(" uniform placement: hot=%d cold=%d — expect ~60%% all-hot layers\n", + uniform_placement.total_hot, + w.n_layer * w.n_expert - uniform_placement.total_hot); + + int ctx = 2000; + if (ctx + 1 <= max_ctx) { + PipelinedDecodeState pipe_state; + if (init_pipelined_decode_state(pipe_state, backend, w, cache, ctx, g_kq_stride_pad)) { + std::vector act_cur_pipe((size_t)hidden, 0.0f); + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + // Warmup + for (int warmup = 0; warmup < 3; warmup++) { + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + pipelined_decode_one_token(pipe_state, backend, w, cache, *hybrid_realistic, + ctx + warmup, g_kq_stride_pad, nullptr); + } + // Timed + PipelinedDecodeTelemetry tel_sum{}; + auto t_start = std::chrono::steady_clock::now(); + for (int step = 0; step < n_steps; ++step) { + ggml_backend_tensor_set(pipe_state.gpu_state.act_cur, act_cur_pipe.data(), 0, + sizeof(float) * (size_t)hidden); + PipelinedDecodeTelemetry tel{}; + pipelined_decode_one_token(pipe_state, backend, w, cache, *hybrid_realistic, + ctx + step, g_kq_stride_pad, &tel); + tel_sum.total_us += tel.total_us; + tel_sum.prefn_graph_build_us += tel.prefn_graph_build_us; + tel_sum.prefn_compute_us += tel.prefn_compute_us; + tel_sum.routing_readback_us += tel.routing_readback_us; + tel_sum.ffn_us += tel.ffn_us; + tel_sum.ffn_allhot_us += tel.ffn_allhot_us; + tel_sum.ffn_mixed_us += tel.ffn_mixed_us; + tel_sum.allhot_layers += tel.allhot_layers; + tel_sum.mixed_layers += tel.mixed_layers; + tel_sum.total_layers += tel.total_layers; + } + auto t_end = std::chrono::steady_clock::now(); + double total_ms = std::chrono::duration(t_end - t_start).count(); + double avg_ms = total_ms / n_steps; + double avg_ffn_ms = tel_sum.ffn_us / 1000.0 / n_steps; + double allhot_pct = tel_sum.total_layers > 0 + ? 100.0 * tel_sum.allhot_layers / tel_sum.total_layers : 0.0; + double avg_allhot_ms = tel_sum.allhot_layers > 0 + ? tel_sum.ffn_allhot_us / 1000.0 / tel_sum.allhot_layers : 0.0; + double avg_mixed_ms = tel_sum.mixed_layers > 0 + ? tel_sum.ffn_mixed_us / 1000.0 / tel_sum.mixed_layers : 0.0; + std::printf(" pipelined_realistic_ctx2k: total=%.2fms ffn=%.2fms allhot=%.1f%%\n", + avg_ms, avg_ffn_ms, allhot_pct); + std::printf(" per-layer: allhot_ffn=%.3fms mixed_ffn=%.3fms\n", + avg_allhot_ms, avg_mixed_ms); + std::printf(" layers/tok: %d allhot, %d mixed\n", + tel_sum.allhot_layers / n_steps, + tel_sum.mixed_layers / n_steps); + pipe_state.destroy(); + } + } + } else { + std::printf(" uniform hybrid build failed: %s\n", up_err.c_str()); + } + } + } + } + + ::munmap(mmap_addr, file_size); + gguf_free(gctx); + } + } + } + } + + std::printf("\n[time-breakdown] done. Model: %s (%d layers, hidden=%d)\n", + arch_name, w.n_layer, hidden); + + free_target_cache(cache); + free_target_weights(w); + if (split_gpus) ggml_backend_free(draft_backend); + ggml_backend_free(target_backend); + return 0; + } + // ── Sliding-window regression tests ────────────────────────────────── if (test_window_mode) { int n_pass = 0, n_fail = 0; From 31eb465cbe37bd36f1dcaab6d2d17b1bc4b27413 Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 13:03:02 +0800 Subject: [PATCH 2/8] fix: distribute dummy MoE routing evenly to avoid MMQ stream-k OOB When the hybrid MoE pipeline fills unused routing slots with dummy entries, all dummies previously pointed to expert 0. This created a pathological imbalance (e.g. 69/72 rows for one expert) that triggered an out-of-bounds access in the CUDA MMQ stream-k kernel path during down-projection. Distribute dummy slot IDs evenly across all experts in the hot/cold stacks (i % n_experts) so no single expert accumulates excessive dummy rows. The dummy weight remains 0.0 so these rows contribute nothing to output. Also adds: - pipe_state_.reset() between requests to avoid stale DeltaNet graph pointers - RAII destructors for ResidualCombineGraph, GpuResidentState, CachedPrefnGraph, and PipelinedDecodeState to prevent resource leaks Tested: 10/10 requests pass on RTX 2080 Ti at ~15.5 tok/s decode. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_backend.cpp | 5 +++++ server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp | 14 ++++++++++---- server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h | 2 ++ server/src/qwen35moe/qwen35moe_pipelined_decode.h | 2 ++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index c32671e54..832107f25 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -416,6 +416,11 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, reset_recurrent_state(target_cache()); + // Invalidate cached pipelined decode state between requests. + // The cached DeltaNet graphs reference conv_state/ssm_state tensors that were + // just zeroed; rebuilding is cheap (~30 graphs) and avoids stale-pointer crashes. + pipe_state_.reset(); + const int hidden = target_weights().n_embd; const int vocab = target_weights().n_vocab; std::vector act_cur((size_t)hidden); diff --git a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp index 062e061ea..6a795582e 100644 --- a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp +++ b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp @@ -910,12 +910,17 @@ bool eval_qwen35moe_hybrid_ffn_batched( if (n_tokens <= 0) return true; // ── Step 1: Partition routing into hot and cold ── - // Use dummy expert ID 0 with weight 0.0 for "other device" slots - // (ggml_mul_mat_id does not handle -1 IDs safely) + // Dummy slots use weight 0.0 and are distributed evenly across all experts + // to avoid pathological routing imbalance that triggers OOB in MMQ stream-k. const int total_slots = n_used * n_tokens; - std::vector hot_sel(total_slots, 0); + const int n_hot_stack = storage.gate_up_hot ? (int)storage.gate_up_hot->ne[2] + : storage.gate_hot ? (int)storage.gate_hot->ne[2] + : 1; + std::vector hot_sel(total_slots); + for (int i = 0; i < total_slots; ++i) hot_sel[i] = i % n_hot_stack; std::vector hot_wts(total_slots, 0.0f); - std::vector cold_sel(total_slots, 0); + std::vector cold_sel(total_slots); + for (int i = 0; i < total_slots; ++i) cold_sel[i] = i % std::max(1, (int)(storage.down_cold ? storage.down_cold->ne[2] : 1)); std::vector cold_wts(total_slots, 0.0f); bool has_hot = false, has_cold = false; @@ -1004,6 +1009,7 @@ bool eval_qwen35moe_hybrid_ffn_batched( ggml_set_output(hot_output); ggml_build_forward_expand(hot_gf, hot_output); hot_alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); + if (!ggml_gallocr_alloc_graph(hot_alloc, hot_gf)) { if (err) *err = "hybrid batched hot gallocr failed"; ggml_gallocr_free(hot_alloc); ggml_free(hot_ctx); diff --git a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h index ea8fd2241..ae6bcb02e 100644 --- a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h +++ b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h @@ -24,6 +24,7 @@ struct ResidualCombineGraph { ggml_tensor * cold_in = nullptr; // [n_embd] F32 input (zeros when no cold) ggml_tensor * output = nullptr; // [n_embd] F32 output + ~ResidualCombineGraph() { free(); } bool valid() const { return ctx && gf && alloc && output; } void free(); void destroy(); @@ -40,6 +41,7 @@ struct GpuResidentState { ResidualCombineGraph combine; + ~GpuResidentState() { destroy(); } bool valid() const { return ctx && buf && act_cur && combine.valid(); } void destroy(); }; diff --git a/server/src/qwen35moe/qwen35moe_pipelined_decode.h b/server/src/qwen35moe/qwen35moe_pipelined_decode.h index febe72879..e09b19099 100644 --- a/server/src/qwen35moe/qwen35moe_pipelined_decode.h +++ b/server/src/qwen35moe/qwen35moe_pipelined_decode.h @@ -34,6 +34,7 @@ struct CachedPrefnGraph { ggml_tensor * moe_selected = nullptr; // output: selected expert IDs ggml_tensor * moe_weights = nullptr; // output: routing weights + ~CachedPrefnGraph() { free(); } bool valid() const { return ctx && gf && alloc && ffn_post && ffn_residual; } void free(); }; @@ -74,6 +75,7 @@ struct PipelinedDecodeState { int n_expert_used = 0; int full_attention_interval = 0; + ~PipelinedDecodeState() { destroy(); } bool valid() const { return gpu_state.valid() && n_layer > 0; } void destroy(); }; From 6d758251d40fa3334fcd650a03b818d08af9c06d Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 13:43:36 +0800 Subject: [PATCH 3/8] fix: free cached FFN graphs between requests to prevent GPU memory corruption The per-layer CachedFfnGraph (hot_graph/cold_graph) allocated by the pipelined decode path persisted across requests. After multiple requests with thousands of decode tokens, accumulated GPU allocations caused memory corruption when prefill tried to allocate its own graph buffers. Freeing these cached graphs between requests releases the GPU memory before prefill runs. The graphs are rebuilt cheaply on first decode of each new request. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_backend.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 832107f25..6b511bbeb 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -421,6 +421,15 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, // just zeroed; rebuilding is cheap (~30 graphs) and avoids stale-pointer crashes. pipe_state_.reset(); + // Free per-layer cached FFN graphs from previous decode to release GPU memory + // before prefill allocates its own graph buffers. + if (target_weights().moe_hybrid) { + for (auto & layer : target_weights().moe_hybrid->layers) { + layer.hot_graph.free(); + layer.cold_graph.free(); + } + } + const int hidden = target_weights().n_embd; const int vocab = target_weights().n_vocab; std::vector act_cur((size_t)hidden); From 04fa1ab458375d1f9343d9d71750e909ff0026af Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 17:03:58 +0800 Subject: [PATCH 4/8] feat: add decode time-breakdown telemetry to generate path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When DFLASH_QWEN35MOE_TELEMETRY=1, the pipelined decode loop now collects and prints per-token breakdown: - prefn_build: DeltaNet graph setup time - prefn_compute: GPU pre-FFN compute time - routing_readback: GPU→CPU routing decision transfer - ffn: hybrid MoE FFN (split into allhot/mixed) Also removes leftover debug CUDA sync checks from the prefill path and includes the mmq.cu ids_dst padding fix (submodule update). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_backend.cpp | 64 +++++++++++-------- server/src/qwen35moe/qwen35moe_backend.h | 2 - .../qwen35moe/qwen35moe_hybrid_ffn_eval.cpp | 1 - 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 6b511bbeb..800f88992 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -1,5 +1,4 @@ #include "qwen35moe_backend.h" -#include "qwen35moe_pipelined_decode.h" #include "common/sampler.h" #include "common/dflash_spec_decode.h" @@ -274,7 +273,6 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, const int vocab = target_weights().n_vocab; std::vector logits_buf((size_t)vocab); std::vector act_cur((size_t)hidden); - const auto decode_t0 = HybridClock::now(); // Persistent logits graph (built once, reused per token) StepGraph logits_sg; @@ -330,9 +328,6 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, return false; } - uint64_t hot_selected_total = 0; - uint64_t cold_selected_total = 0; - for (int step = 1; step < n_gen; ++step) { int32_t tok = out_tokens.back(); if (!target_weights().embedder.embed(&tok, 1, act_cur.data())) { @@ -341,20 +336,12 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, ggml_backend_tensor_set(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); - PipelinedDecodeTelemetry tel; if (!pipelined_decode_one_token(*pipe_state_, target_backend(), target_weights(), target_cache(), *target_weights().moe_hybrid, - committed, cfg_.kq_stride_pad, - hybrid_telemetry_ ? &tel : nullptr)) { + committed, cfg_.kq_stride_pad, nullptr)) { return false; } - if (hybrid_telemetry_) { - hot_selected_total += (uint64_t)tel.allhot_layers * target_weights().n_expert_used - + (uint64_t)(tel.mixed_layers * target_weights().n_expert_used - tel.mixed_layers); - cold_selected_total += (uint64_t)tel.mixed_layers; - } - ggml_backend_tensor_get(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); if (!project_logits()) { @@ -384,16 +371,6 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, if (is_eos_tok(next_tok, target_weights())) break; } - last_hot_selected_ = hot_selected_total; - last_cold_selected_ = cold_selected_total; - std::printf("[qwen35moe] pipelined decode stats: hot_selected=%llu cold_selected=%llu\n", - (unsigned long long)last_hot_selected_, - (unsigned long long)last_cold_selected_); - if (hybrid_telemetry_) { - const uint64_t decode_us = elapsed_us(decode_t0, HybridClock::now()); - std::printf("[qwen35moe] pipelined telemetry: total_decode_ms=%.2f\n", - decode_us / 1000.0); - } step_graph_destroy(logits_sg); return true; } @@ -724,6 +701,7 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, target_cache().cur_pos = committed; // Pipelined decode loop + PipelinedDecodeTelemetry decode_tel_accum{}; for (int step = 1; step < req.n_gen; ++step) { int32_t tok = result.tokens.back(); if (!target_weights().embedder.embed(&tok, 1, act_cur.data())) { @@ -734,13 +712,27 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, ggml_backend_tensor_set(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); + PipelinedDecodeTelemetry tel; if (!pipelined_decode_one_token(*pipe_state_, target_backend(), target_weights(), target_cache(), *target_weights().moe_hybrid, - committed, cfg_.kq_stride_pad, nullptr)) { + committed, cfg_.kq_stride_pad, + hybrid_telemetry_ ? &tel : nullptr)) { result.error = "decode"; cleanup_graphs(); return result; } + if (hybrid_telemetry_) { + decode_tel_accum.total_us += tel.total_us; + decode_tel_accum.prefn_graph_build_us += tel.prefn_graph_build_us; + decode_tel_accum.prefn_compute_us += tel.prefn_compute_us; + decode_tel_accum.routing_readback_us += tel.routing_readback_us; + decode_tel_accum.ffn_us += tel.ffn_us; + decode_tel_accum.ffn_allhot_us += tel.ffn_allhot_us; + decode_tel_accum.ffn_mixed_us += tel.ffn_mixed_us; + decode_tel_accum.allhot_layers += tel.allhot_layers; + decode_tel_accum.mixed_layers += tel.mixed_layers; + decode_tel_accum.total_layers += tel.total_layers; + } ggml_backend_tensor_get(pipe_state_->gpu_state.act_cur, act_cur.data(), 0, sizeof(float) * (size_t)hidden); @@ -768,6 +760,28 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, if (out_io.cancelled) break; if (is_eos_tok(next_tok, target_weights())) break; } + if (hybrid_telemetry_) { + const int n_dec = (int)result.tokens.size() - 1; + std::printf("[qwen35moe] === DECODE BREAKDOWN (n_tokens=%d) ===\n", n_dec); + std::printf(" prefn_build=%.1fms prefn_compute=%.1fms routing_readback=%.1fms ffn=%.1fms\n", + decode_tel_accum.prefn_graph_build_us / 1000.0, + decode_tel_accum.prefn_compute_us / 1000.0, + decode_tel_accum.routing_readback_us / 1000.0, + decode_tel_accum.ffn_us / 1000.0); + std::printf(" ffn_allhot=%.1fms ffn_mixed=%.1fms allhot_layers=%d mixed_layers=%d\n", + decode_tel_accum.ffn_allhot_us / 1000.0, + decode_tel_accum.ffn_mixed_us / 1000.0, + decode_tel_accum.allhot_layers, + decode_tel_accum.mixed_layers); + if (n_dec > 0) { + std::printf(" per-token avg: prefn_build=%.2fms prefn_compute=%.2fms readback=%.2fms ffn=%.2fms\n", + decode_tel_accum.prefn_graph_build_us / 1000.0 / n_dec, + decode_tel_accum.prefn_compute_us / 1000.0 / n_dec, + decode_tel_accum.routing_readback_us / 1000.0 / n_dec, + decode_tel_accum.ffn_us / 1000.0 / n_dec); + } + std::fflush(stdout); + } } cleanup_graphs(); } diff --git a/server/src/qwen35moe/qwen35moe_backend.h b/server/src/qwen35moe/qwen35moe_backend.h index a460b52f8..9c9ccf8dd 100644 --- a/server/src/qwen35moe/qwen35moe_backend.h +++ b/server/src/qwen35moe/qwen35moe_backend.h @@ -40,8 +40,6 @@ class Qwen35MoeBackend : public Qwen35Backend { std::string routing_stats_out_path_; std::string placement_out_path_; Qwen35MoeSwapPolicy swap_policy_; - uint64_t last_hot_selected_ = 0; - uint64_t last_cold_selected_ = 0; bool hybrid_telemetry_ = false; void maybe_post_request_swap(); diff --git a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp index 6a795582e..6e44b2d3f 100644 --- a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp +++ b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.cpp @@ -1009,7 +1009,6 @@ bool eval_qwen35moe_hybrid_ffn_batched( ggml_set_output(hot_output); ggml_build_forward_expand(hot_gf, hot_output); hot_alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); - if (!ggml_gallocr_alloc_graph(hot_alloc, hot_gf)) { if (err) *err = "hybrid batched hot gallocr failed"; ggml_gallocr_free(hot_alloc); ggml_free(hot_ctx); From 87f7a99a075f52797c9ca8e1de3b94dac9d132d5 Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 22:39:52 +0800 Subject: [PATCH 5/8] fix(moe): normalize expert weights by sum (fixes wrong math output) The MoE router was not normalizing top-k expert weights after selection. With softmax gating over 256 experts but only top-8 selected, the weights summed to ~0.03-0.05 instead of 1.0, causing systematically underscaled FFN output across all 40 layers. This produced accumulating errors that made even simple arithmetic wrong (e.g. 7+8=11 instead of 15). Fix: always normalize selected weights by their sum with a clamp to avoid division by zero, matching llama.cpp's norm_w=true behavior for qwen35moe. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_ffn.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/qwen35moe/qwen35moe_ffn.cpp b/server/src/qwen35moe/qwen35moe_ffn.cpp index 4ca832673..9c61da851 100644 --- a/server/src/qwen35moe/qwen35moe_ffn.cpp +++ b/server/src/qwen35moe/qwen35moe_ffn.cpp @@ -2,6 +2,8 @@ #include "qwen35_ops.h" +#include + namespace dflash::common { Qwen35MoeRouterOutputs build_qwen35moe_router( @@ -31,11 +33,16 @@ Qwen35MoeRouterOutputs build_qwen35moe_router( ggml_tensor * weights = ggml_get_rows(ctx, probs_3d, selected); weights = ggml_reshape_2d(ctx, weights, n_used, n_tokens); - if (w.expert_gating_func == 2) { + // Always normalize selected expert weights by their sum (matches + // llama.cpp's norm_w=true for qwen35moe). Without this, top-k softmax + // weights sum to much less than 1.0, causing systematically underscaled + // FFN output. + { ggml_tensor * w_sum = ggml_sum_rows(ctx, weights); + w_sum = ggml_clamp(ctx, w_sum, 6.103515625e-5f, INFINITY); weights = ggml_div(ctx, weights, w_sum); } - if (w.expert_weights_scale != 1.0f) { + if (w.expert_weights_scale != 0.0f && w.expert_weights_scale != 1.0f) { weights = ggml_scale(ctx, weights, w.expert_weights_scale); } From c45dd59d8bb06c8242cdf6197150c823640eaab2 Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 22:42:33 +0800 Subject: [PATCH 6/8] chore: remove noisy pipelined init log Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_pipelined_decode.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp b/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp index 681f49963..78039e4ad 100644 --- a/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp +++ b/server/src/qwen35moe/qwen35moe_pipelined_decode.cpp @@ -150,7 +150,6 @@ bool init_pipelined_decode_state( out.cold_in_zeroed = true; // cold_in was already zeroed in init_gpu_resident_state - std::printf("[pipelined] init: cached %d/%d DeltaNet pre-FFN graphs\n", cached_count, w.n_layer); return true; } From 7f7dda134ded49108f4292b4ad80e2cfef9d9a66 Mon Sep 17 00:00:00 2001 From: Howard Su Date: Thu, 28 May 2026 23:02:45 +0800 Subject: [PATCH 7/8] fix: address PR #289 review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - P1: Delete implicit copy on resource-owning structs (CachedPrefnGraph, ResidualCombineGraph, GpuResidentState, PipelinedDecodeState) to prevent accidental double-free of ggml/GPU resources. Add explicit move ops that null out the source. - P3: Hoist bf16_buf allocation and ggml_fp32_to_bf16_row conversion outside the n_capture_layers loop — all iterations convert the same act_cur data. - P2 (expert_weights_scale): No change needed — our condition (!=0.0f && !=1.0f) already matches llama.cpp llama-graph.cpp:1413 exactly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- server/src/qwen35moe/qwen35moe_backend.cpp | 11 ++--- .../src/qwen35moe/qwen35moe_hybrid_ffn_eval.h | 40 +++++++++++++++ .../qwen35moe/qwen35moe_pipelined_decode.h | 49 +++++++++++++++++++ 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 800f88992..beacce3d3 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -868,21 +868,16 @@ bool Qwen35MoeBackend::hybrid_forward_one_token(int32_t tok, int kv_pos, // Feature capture: write act_cur (F32) → cache_.target_feat (BF16) if (target_cache().target_feat) { - for (int k = 0; k < target_weights().n_capture_layers; k++) { - const int il = target_weights().capture_layer_ids[k]; - (void)il; // capture_layer_ids marks which layers — for spec-decode - // we capture the final output at every verify position - } const int cap = target_cache().target_feat_cap; const int slot = kv_pos % cap; const size_t elt = ggml_element_size(target_cache().target_feat); const size_t col_stride = target_cache().target_feat->nb[1]; - // Write all capture layers from the final hidden state + // Convert once — all capture layers store the same final hidden state + std::vector bf16_buf((size_t)hidden); + ggml_fp32_to_bf16_row(act_cur.data(), bf16_buf.data(), hidden); for (int k = 0; k < target_weights().n_capture_layers; k++) { const size_t offset = (size_t)slot * col_stride + (size_t)k * (size_t)hidden * elt; - std::vector bf16_buf((size_t)hidden); - ggml_fp32_to_bf16_row(act_cur.data(), bf16_buf.data(), hidden); ggml_backend_tensor_set(target_cache().target_feat, bf16_buf.data(), offset, (size_t)hidden * elt); } diff --git a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h index ae6bcb02e..55f824a03 100644 --- a/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h +++ b/server/src/qwen35moe/qwen35moe_hybrid_ffn_eval.h @@ -24,7 +24,30 @@ struct ResidualCombineGraph { ggml_tensor * cold_in = nullptr; // [n_embd] F32 input (zeros when no cold) ggml_tensor * output = nullptr; // [n_embd] F32 output + ResidualCombineGraph() = default; ~ResidualCombineGraph() { free(); } + ResidualCombineGraph(const ResidualCombineGraph &) = delete; + ResidualCombineGraph & operator=(const ResidualCombineGraph &) = delete; + ResidualCombineGraph(ResidualCombineGraph && o) noexcept + : ctx(o.ctx), gf(o.gf), alloc(o.alloc), + residual_in(o.residual_in), hot_in(o.hot_in), + cold_in(o.cold_in), output(o.output) { + o.ctx = nullptr; o.gf = nullptr; o.alloc = nullptr; + o.residual_in = nullptr; o.hot_in = nullptr; + o.cold_in = nullptr; o.output = nullptr; + } + ResidualCombineGraph & operator=(ResidualCombineGraph && o) noexcept { + if (this != &o) { + free(); + ctx = o.ctx; gf = o.gf; alloc = o.alloc; + residual_in = o.residual_in; hot_in = o.hot_in; + cold_in = o.cold_in; output = o.output; + o.ctx = nullptr; o.gf = nullptr; o.alloc = nullptr; + o.residual_in = nullptr; o.hot_in = nullptr; + o.cold_in = nullptr; o.output = nullptr; + } + return *this; + } bool valid() const { return ctx && gf && alloc && output; } void free(); void destroy(); @@ -41,7 +64,24 @@ struct GpuResidentState { ResidualCombineGraph combine; + GpuResidentState() = default; ~GpuResidentState() { destroy(); } + GpuResidentState(const GpuResidentState &) = delete; + GpuResidentState & operator=(const GpuResidentState &) = delete; + GpuResidentState(GpuResidentState && o) noexcept + : ctx(o.ctx), buf(o.buf), act_cur(o.act_cur), + combine(std::move(o.combine)) { + o.ctx = nullptr; o.buf = nullptr; o.act_cur = nullptr; + } + GpuResidentState & operator=(GpuResidentState && o) noexcept { + if (this != &o) { + destroy(); + ctx = o.ctx; buf = o.buf; act_cur = o.act_cur; + combine = std::move(o.combine); + o.ctx = nullptr; o.buf = nullptr; o.act_cur = nullptr; + } + return *this; + } bool valid() const { return ctx && buf && act_cur && combine.valid(); } void destroy(); }; diff --git a/server/src/qwen35moe/qwen35moe_pipelined_decode.h b/server/src/qwen35moe/qwen35moe_pipelined_decode.h index e09b19099..bc03264eb 100644 --- a/server/src/qwen35moe/qwen35moe_pipelined_decode.h +++ b/server/src/qwen35moe/qwen35moe_pipelined_decode.h @@ -34,7 +34,25 @@ struct CachedPrefnGraph { ggml_tensor * moe_selected = nullptr; // output: selected expert IDs ggml_tensor * moe_weights = nullptr; // output: routing weights + CachedPrefnGraph() = default; ~CachedPrefnGraph() { free(); } + CachedPrefnGraph(const CachedPrefnGraph &) = delete; + CachedPrefnGraph & operator=(const CachedPrefnGraph &) = delete; + CachedPrefnGraph(CachedPrefnGraph && o) noexcept { *this = std::move(o); } + CachedPrefnGraph & operator=(CachedPrefnGraph && o) noexcept { + if (this != &o) { + free(); + ctx = o.ctx; gf = o.gf; alloc = o.alloc; + inp_embed = o.inp_embed; ffn_post = o.ffn_post; + ffn_residual = o.ffn_residual; + moe_selected = o.moe_selected; moe_weights = o.moe_weights; + o.ctx = nullptr; o.gf = nullptr; o.alloc = nullptr; + o.inp_embed = nullptr; o.ffn_post = nullptr; + o.ffn_residual = nullptr; + o.moe_selected = nullptr; o.moe_weights = nullptr; + } + return *this; + } bool valid() const { return ctx && gf && alloc && ffn_post && ffn_residual; } void free(); }; @@ -75,7 +93,38 @@ struct PipelinedDecodeState { int n_expert_used = 0; int full_attention_interval = 0; + PipelinedDecodeState() = default; ~PipelinedDecodeState() { destroy(); } + PipelinedDecodeState(const PipelinedDecodeState &) = delete; + PipelinedDecodeState & operator=(const PipelinedDecodeState &) = delete; + PipelinedDecodeState(PipelinedDecodeState && o) noexcept + : gpu_state(std::move(o.gpu_state)), + cached_prefn(std::move(o.cached_prefn)), + routing_ids_buf(std::move(o.routing_ids_buf)), + routing_weights_buf(std::move(o.routing_weights_buf)), + ffn_post_host_buf(std::move(o.ffn_post_host_buf)), + cold_in_zeroed(o.cold_in_zeroed), + n_layer(o.n_layer), n_embd(o.n_embd), + n_expert_used(o.n_expert_used), + full_attention_interval(o.full_attention_interval) { + o.n_layer = 0; + } + PipelinedDecodeState & operator=(PipelinedDecodeState && o) noexcept { + if (this != &o) { + destroy(); + gpu_state = std::move(o.gpu_state); + cached_prefn = std::move(o.cached_prefn); + routing_ids_buf = std::move(o.routing_ids_buf); + routing_weights_buf = std::move(o.routing_weights_buf); + ffn_post_host_buf = std::move(o.ffn_post_host_buf); + cold_in_zeroed = o.cold_in_zeroed; + n_layer = o.n_layer; n_embd = o.n_embd; + n_expert_used = o.n_expert_used; + full_attention_interval = o.full_attention_interval; + o.n_layer = 0; + } + return *this; + } bool valid() const { return gpu_state.valid() && n_layer > 0; } void destroy(); }; From caf2b112e49bbf02c2412b5146a08d0c361402d6 Mon Sep 17 00:00:00 2001 From: Davide Cifarelli Date: Fri, 29 May 2026 10:57:54 +0200 Subject: [PATCH 8/8] fix(qwen35moe): sub-batch hybrid prefill FFN to avoid MMQ mul_mat_id OOB The routed-expert mul_mat_id MMQ kernel writes out of bounds on Ampere when the per-call token count exceeds ~8: the expert token distribution overshoots the destination tiles on the need_check=false write path. This silently corrupts neighbouring GPU allocations during prefill and crashes with a CUDA illegal memory access at a later decode synchronize (~4th request under the server, in the forced hot/cold split path). Sub-batch the hybrid FFN to 8 tokens per eval_qwen35moe_hybrid_ffn_batched call so the attention prefill can stay at the full chunk size. Verified on an RTX 3090 (24 GiB) forcing a 60/40 hot/cold split via DFLASH_EXPERT_BUDGET_MB=11000: all 10 HumanEval prompts complete and the server stays up (previously crashed at request 4). compute-sanitizer memcheck confirms the OOB write originates in the routed mul_mat_id (mul_mat_q) inside eval_qwen35moe_hybrid_ffn_batched. Co-Authored-By: WOZCODE --- server/src/qwen35moe/qwen35moe_backend.cpp | 36 ++++++++++++++++------ 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index beacce3d3..c759b51a1 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -570,15 +570,33 @@ GenerateResult Qwen35MoeBackend::generate(const GenerateRequest & req, } } - // Batched hybrid FFN for this chunk - std::vector ffn_batch_out; - if (!eval_qwen35moe_hybrid_ffn_batched( - target_backend(), cpu_be, target_weights(), L, storage, - chunk_post.data(), chunk_selected.data(), chunk_weights.data(), - chunk_len, ffn_batch_out, &result.error)) { - step_graph_destroy(prefill_sg); - cleanup_graphs(); - return result; + // Batched hybrid FFN for this chunk. + // The routed-expert mul_mat_id MMQ kernel writes out of bounds on + // Ampere when the per-call token count exceeds ~8: the expert token + // distribution overshoots the destination tiles on the + // need_check=false write path, silently corrupting neighbouring GPU + // allocations during prefill and crashing with an illegal memory + // access at a later decode sync (~4th request under the server). + // Sub-batch the FFN to a safe width so the attention prefill can + // stay at the full chunk size. + std::vector ffn_batch_out((size_t)chunk_len * (size_t)hidden); + constexpr int kFfnSafeBatch = 8; + for (int fb = 0; fb < chunk_len; fb += kFfnSafeBatch) { + const int fl = std::min(kFfnSafeBatch, chunk_len - fb); + std::vector sub_out; + if (!eval_qwen35moe_hybrid_ffn_batched( + target_backend(), cpu_be, target_weights(), L, storage, + chunk_post.data() + (size_t)fb * (size_t)hidden, + chunk_selected.data() + (size_t)fb * (size_t)n_expert_used, + chunk_weights.data() + (size_t)fb * (size_t)n_expert_used, + fl, sub_out, &result.error)) { + step_graph_destroy(prefill_sg); + cleanup_graphs(); + return result; + } + std::memcpy(ffn_batch_out.data() + (size_t)fb * (size_t)hidden, + sub_out.data(), + (size_t)fl * (size_t)hidden * sizeof(float)); } // Combine FFN output + residual → embed_all for next layer