diff --git a/dflash/CMakeLists.txt b/dflash/CMakeLists.txt index 71d81a255..c84a5f56f 100644 --- a/dflash/CMakeLists.txt +++ b/dflash/CMakeLists.txt @@ -671,6 +671,27 @@ if(DFLASH27B_TESTS) endif() endif() + if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/src/ipc/dflash_draft_ipc_main.cpp") + add_executable(dflash_draft_ipc_daemon + src/ipc/dflash_draft_ipc_main.cpp + ) + target_include_directories(dflash_draft_ipc_daemon PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) + if(DFLASH27B_GPU_BACKEND STREQUAL "hip") + target_compile_definitions(dflash_draft_ipc_daemon PRIVATE DFLASH27B_BACKEND_HIP=1 GGML_USE_HIP) + else() + target_compile_definitions(dflash_draft_ipc_daemon PRIVATE + DFLASH27B_BACKEND_CUDA=1 + DFLASH27B_CUDA_MIN_SM=${_dflash_cuda_min_sm}) + endif() + target_link_libraries(dflash_draft_ipc_daemon PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread) + if(DFLASH27B_GPU_BACKEND STREQUAL "cuda") + find_package(CUDAToolkit REQUIRED) + target_link_libraries(dflash_draft_ipc_daemon PRIVATE CUDA::cudart) + else() + target_link_libraries(dflash_draft_ipc_daemon PRIVATE hip::host) + endif() + endif() + # Tokenizer test harness (no GPU needed — links static lib for tokenizer + GGUF reader) if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_tokenizer_harness.cpp") add_executable(test_tokenizer_harness test/test_tokenizer_harness.cpp) diff --git a/dflash/src/common/backend_factory.cpp b/dflash/src/common/backend_factory.cpp index 236989548..e5136d4e5 100644 --- a/dflash/src/common/backend_factory.cpp +++ b/dflash/src/common/backend_factory.cpp @@ -17,6 +17,10 @@ std::string detect_arch(const char * model_path) { return info.arch; } +bool arch_supports_remote_draft(const std::string & arch) { + return arch == "qwen35"; +} + std::unique_ptr create_backend(const BackendArgs & args) { if (!args.model_path) { std::fprintf(stderr, "[backend_factory] model_path is null\n"); @@ -38,6 +42,7 @@ std::unique_ptr create_backend(const BackendArgs & args) { cfg.draft_path = args.draft_path; cfg.device = args.device; cfg.draft_gpu = args.draft_device.gpu; + cfg.remote_draft = args.remote_draft; cfg.stream_fd = args.stream_fd; cfg.fa_window = args.fa_window; cfg.kq_stride_pad = args.kq_stride_pad; diff --git a/dflash/src/common/backend_factory.h b/dflash/src/common/backend_factory.h index 1e959948b..64a2ac973 100644 --- a/dflash/src/common/backend_factory.h +++ b/dflash/src/common/backend_factory.h @@ -12,6 +12,7 @@ #include "model_backend.h" #include "placement/placement_config.h" +#include "placement/remote_draft_config.h" #include #include @@ -31,6 +32,7 @@ struct BackendArgs { // Device placement DevicePlacement device; DevicePlacement draft_device; + RemoteDraftConfig remote_draft; // I/O — only used when running under daemon_loop (legacy). The new // server passes -1 and uses on_token callbacks instead. @@ -62,4 +64,6 @@ std::unique_ptr create_backend(const BackendArgs & args); // Useful for early dispatch (e.g. printing which backend will be used). std::string detect_arch(const char * model_path); +bool arch_supports_remote_draft(const std::string & arch); + } // namespace dflash::common diff --git a/dflash/src/common/model_backend.h b/dflash/src/common/model_backend.h index fc1682ee6..64441bd91 100644 --- a/dflash/src/common/model_backend.h +++ b/dflash/src/common/model_backend.h @@ -183,6 +183,12 @@ struct ModelBackend { // growth over time. Default is a no-op. virtual void release_scratch() {} + // Return true when the backend can route draft execution through the + // common remote-draft IPC transport. Model families that do not implement + // the DFlash feature boundary keep the default false and are rejected by + // the server before startup. + virtual bool supports_remote_draft() const { return false; } + // ── Cleanup ────────────────────────────────────────────────────── // Release all resources (weights, cache, snapshots, drafter). // Called by run_daemon() before returning. diff --git a/dflash/src/ipc/dflash_draft_ipc_main.cpp b/dflash/src/ipc/dflash_draft_ipc_main.cpp new file mode 100644 index 000000000..97034008a --- /dev/null +++ b/dflash/src/ipc/dflash_draft_ipc_main.cpp @@ -0,0 +1,45 @@ +// Standalone DFlash draft IPC daemon entry point. + +#include "dflash_draft_ipc.h" + +#include +#include +#include +#include + +using namespace dflash::common; + +int main(int argc, char ** argv) { + if (argc < 3 || std::strcmp(argv[1], "--draft-ipc-daemon") != 0) { + std::fprintf(stderr, + "usage: %s --draft-ipc-daemon " + "--ring-cap=N --stream-fd=FD [--draft-gpu=N]\n", + argv[0]); + return 2; + } + + const char * draft_path = argv[2]; + int ring_cap = 4096; + int draft_gpu = 0; + int stream_fd = -1; + for (int i = 3; i < argc; i++) { + if (std::strncmp(argv[i], "--ring-cap=", 11) == 0) { + ring_cap = std::atoi(argv[i] + 11); + } else if (std::strcmp(argv[i], "--ring-cap") == 0) { + if (i + 1 < argc) ring_cap = std::atoi(argv[++i]); + } else if (std::strncmp(argv[i], "--draft-gpu=", 12) == 0) { + draft_gpu = std::max(0, std::atoi(argv[i] + 12)); + } else if (std::strcmp(argv[i], "--draft-gpu") == 0) { + if (i + 1 < argc) draft_gpu = std::max(0, std::atoi(argv[++i])); + } else if (std::strncmp(argv[i], "--stream-fd=", 12) == 0) { + stream_fd = std::atoi(argv[i] + 12); + } else if (std::strcmp(argv[i], "--stream-fd") == 0) { + if (i + 1 < argc) stream_fd = std::atoi(argv[++i]); + } else { + std::fprintf(stderr, "[draft-ipc-daemon] unknown option: %s\n", argv[i]); + return 2; + } + } + + return run_dflash_draft_ipc_daemon(draft_path, ring_cap, draft_gpu, stream_fd); +} diff --git a/dflash/src/placement/remote_draft_config.h b/dflash/src/placement/remote_draft_config.h new file mode 100644 index 000000000..a4f998f58 --- /dev/null +++ b/dflash/src/placement/remote_draft_config.h @@ -0,0 +1,18 @@ +// Remote draft execution configuration for mixed-backend target/draft placement. + +#pragma once + +#include + +namespace dflash::common { + +struct RemoteDraftConfig { + std::string ipc_bin; + std::string work_dir; + int ring_cap = 0; + + bool enabled() const { return !ipc_bin.empty(); } + bool has_aux_options() const { return !work_dir.empty() || ring_cap > 0; } +}; + +} // namespace dflash::common diff --git a/dflash/src/qwen35/qwen35_backend.cpp b/dflash/src/qwen35/qwen35_backend.cpp index f2ea5cecb..1d636c531 100644 --- a/dflash/src/qwen35/qwen35_backend.cpp +++ b/dflash/src/qwen35/qwen35_backend.cpp @@ -20,10 +20,22 @@ #include #include #include +#include #include namespace dflash::common { +namespace { +static float bf16_bits_to_f32(uint16_t bits) { + union { + uint32_t u; + float f; + } v; + v.u = (uint32_t)bits << 16; + return v.f; +} +} // namespace + #define IS_EOS_TOK(tok, w) \ ( ((w).eos_chat_id >= 0 && (tok) == (w).eos_chat_id) \ || ((w).eos_id >= 0 && (tok) == (w).eos_id ) ) @@ -37,14 +49,15 @@ Qwen35Backend::~Qwen35Backend() { shutdown(); } // ── init() ────────────────────────────────────────────────────────────── bool Qwen35Backend::init() { - split_gpus_ = (cfg_.device.gpu != cfg_.draft_gpu); + const bool use_remote_draft = cfg_.remote_draft.enabled(); + split_gpus_ = !use_remote_draft && (cfg_.device.gpu != cfg_.draft_gpu); target_backend_ = ggml_backend_cuda_init(cfg_.device.gpu); if (!target_backend_) { std::fprintf(stderr, "target cuda init failed\n"); return false; } - draft_backend_ = target_backend_; + draft_backend_ = use_remote_draft ? nullptr : target_backend_; if (split_gpus_) { draft_backend_ = ggml_backend_cuda_init(cfg_.draft_gpu); if (!draft_backend_) { @@ -72,7 +85,22 @@ bool Qwen35Backend::init() { std::printf("[target] %s\n", dflash27b_last_error()); // Load draft - if (cfg_.draft_path) { + if (cfg_.draft_path && use_remote_draft) { + const int cap = cfg_.remote_draft.ring_cap > 0 + ? std::min(cfg_.remote_draft.ring_cap, cfg_.device.max_ctx) + : std::min(cfg_.device.max_ctx, cfg_.draft_ctx_max); + if (!remote_draft_.start(cfg_.remote_draft.ipc_bin, cfg_.draft_path, + cfg_.draft_gpu, cap, + cfg_.remote_draft.work_dir)) { + std::fprintf(stderr, "remote draft start failed\n"); + return false; + } + dw_.n_embd = DFLASH27B_TARGET_HIDDEN; + dw_.block_size = DFLASH27B_DRAFT_BLOCK_SIZE; + dw_.n_target_layers = DFLASH27B_DRAFT_N_TARGET_LAYERS; + std::printf("[draft] remote ipc ready gpu=%d cap=%d\n", + cfg_.draft_gpu, cap); + } else if (cfg_.draft_path) { std::string dp(cfg_.draft_path); bool draft_ok = (dp.size() >= 5 && dp.substr(dp.size() - 5) == ".gguf") ? load_draft_gguf(cfg_.draft_path, draft_backend_, dw_, &w_) @@ -104,7 +132,7 @@ bool Qwen35Backend::init() { // Init feature mirror when draft model is available (needed for spec decode). // On single-GPU, this is an F32 conversion buffer; on split-GPU, a cross-device mirror. - if (cfg_.draft_path) { + if (cfg_.draft_path && !use_remote_draft) { const int mirror_cap = std::min({cfg_.draft_ctx_max, cfg_.device.max_ctx, cache_.target_feat_cap > 0 ? cache_.target_feat_cap : cfg_.device.max_ctx}); if (!draft_feature_mirror_init(feature_mirror_, draft_backend_, @@ -130,10 +158,15 @@ void Qwen35Backend::print_ready_banner() const { bool Qwen35Backend::park(const std::string & what) { bool want_draft = (what.empty() || what == "all" || what == "draft"); bool want_target = (what.empty() || what == "all" || what == "target"); + const bool use_remote_draft = cfg_.remote_draft.enabled(); if (want_draft && !draft_parked_) { - step_graph_destroy(draft_sg_); - free_draft_weights(dw_); + if (use_remote_draft) { + remote_draft_.close(); + } else { + step_graph_destroy(draft_sg_); + free_draft_weights(dw_); + } draft_parked_ = true; std::printf("[park] draft released\n"); std::fflush(stdout); } @@ -149,6 +182,7 @@ bool Qwen35Backend::park(const std::string & what) { bool Qwen35Backend::unpark(const std::string & what) { bool want_target = (what.empty() || what == "all" || what == "target"); bool want_draft = (what.empty() || what == "all" || what == "draft"); + const bool use_remote_draft = cfg_.remote_draft.enabled(); if (want_target && target_parked_) { if (!load_target_gguf(cfg_.target_path, target_backend_, w_)) { @@ -159,18 +193,30 @@ bool Qwen35Backend::unpark(const std::string & what) { std::printf("[unpark] target restored\n"); std::fflush(stdout); } if (want_draft && draft_parked_ && cfg_.draft_path) { - std::string dp(cfg_.draft_path); - bool draft_ok = (dp.size() >= 5 && dp.substr(dp.size() - 5) == ".gguf") - ? load_draft_gguf(cfg_.draft_path, draft_backend_, dw_, &w_) - : load_draft_safetensors(cfg_.draft_path, draft_backend_, dw_, &w_); - if (!draft_ok) { - std::fprintf(stderr, "[unpark] draft: %s\n", dflash27b_last_error()); - return false; - } - if (cfg_.draft_swa_window > 0) { - dw_.swa_window = cfg_.draft_swa_window; - for (int il = 0; il < dw_.n_layer - 1; il++) - dw_.layers[il].is_swa = true; + if (use_remote_draft) { + const int cap = cfg_.remote_draft.ring_cap > 0 + ? std::min(cfg_.remote_draft.ring_cap, cfg_.device.max_ctx) + : std::min(cfg_.device.max_ctx, cfg_.draft_ctx_max); + if (!remote_draft_.start(cfg_.remote_draft.ipc_bin, cfg_.draft_path, + cfg_.draft_gpu, cap, + cfg_.remote_draft.work_dir)) { + std::fprintf(stderr, "[unpark] remote draft failed\n"); + return false; + } + } else { + std::string dp(cfg_.draft_path); + bool draft_ok = (dp.size() >= 5 && dp.substr(dp.size() - 5) == ".gguf") + ? load_draft_gguf(cfg_.draft_path, draft_backend_, dw_, &w_) + : load_draft_safetensors(cfg_.draft_path, draft_backend_, dw_, &w_); + if (!draft_ok) { + std::fprintf(stderr, "[unpark] draft: %s\n", dflash27b_last_error()); + return false; + } + if (cfg_.draft_swa_window > 0) { + dw_.swa_window = cfg_.draft_swa_window; + for (int il = 0; il < dw_.n_layer - 1; il++) + dw_.layers[il].is_swa = true; + } } draft_parked_ = false; std::printf("[unpark] draft restored\n"); std::fflush(stdout); @@ -421,16 +467,18 @@ DFlashTarget * Qwen35Backend::dflash_target() { // ── Shutdown ──────────────────────────────────────────────────────────── void Qwen35Backend::shutdown() { + const bool use_remote_draft = cfg_.remote_draft.enabled(); free_drafter(); step_graph_destroy(sg_); step_graph_destroy(draft_sg_); step_graph_destroy(proj_sg_); + remote_draft_.close(); draft_feature_mirror_free(feature_mirror_); for (int i = 0; i < PREFIX_SLOTS; i++) { free_prefix_snapshot(prefix_snapshots_[i]); } if (!target_parked_) free_target_weights(w_); - if (!draft_parked_) free_draft_weights(dw_); + if (!use_remote_draft && !draft_parked_) free_draft_weights(dw_); free_target_cache(cache_); if (split_gpus_ && draft_backend_) { ggml_backend_free(draft_backend_); @@ -702,8 +750,10 @@ int Qwen35Backend::do_prefill(const std::vector & tokens, snap_slot = -1; } - // Sync feature mirror if active - if (feature_mirror_.target_feat && !draft_parked_) { + // Sync draft-side features if active. + if (remote_draft_.active() && !draft_parked_) { + if (!sync_remote_draft_features(kv_pos, n_tokens)) return -1; + } else if (feature_mirror_.target_feat && !draft_parked_) { draft_feature_mirror_sync_range(cache_.target_feat, cache_.target_feat_cap, feature_mirror_, kv_pos, n_tokens); } @@ -793,6 +843,39 @@ bool Qwen35Backend::do_ar_decode(int committed, int n_gen, return true; } +bool Qwen35Backend::sync_remote_draft_features(int start_pos, int n_tokens) { + if (!remote_draft_.active() || !cache_.target_feat || n_tokens <= 0) return true; + if (cache_.target_feat_cap <= 0) return false; + + const int n_capture = w_.n_capture_layers; + const int feat_hidden = w_.n_embd; + const size_t src_stride = cache_.target_feat->nb[1]; + std::vector slice((size_t)n_tokens * (size_t)feat_hidden); + std::vector bf16(feat_hidden); + ggml_backend_synchronize(target_backend_); + for (int cap_idx = 0; cap_idx < n_capture; ++cap_idx) { + for (int t = 0; t < n_tokens; ++t) { + const int slot = (start_pos + t) % cache_.target_feat_cap; + const size_t src_offset = (size_t)slot * src_stride + + (size_t)cap_idx * (size_t)feat_hidden * sizeof(uint16_t); + ggml_backend_tensor_get(cache_.target_feat, bf16.data(), + src_offset, + sizeof(uint16_t) * (size_t)feat_hidden); + float * dst = slice.data() + (size_t)t * feat_hidden; + for (int h = 0; h < feat_hidden; ++h) { + dst[h] = bf16_bits_to_f32(bf16[h]); + } + } + if (!remote_draft_.send_feature_slice(cap_idx, start_pos, n_tokens, slice)) { + std::fprintf(stderr, + "spec-decode: remote feature sync failed capture=%d\n", + cap_idx); + return false; + } + } + return true; +} + // ── DFlash speculative decode loop ───────────────────────────────────── bool Qwen35Backend::do_spec_decode(int committed, int n_gen, @@ -815,7 +898,9 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, // - greedy decoding (no logit processing) — spec decode uses argmax verification const bool can_spec = cfg_.draft_path && !draft_parked_ - && feature_mirror_.target_feat + && (cfg_.remote_draft.enabled() + ? remote_draft_.active() + : feature_mirror_.target_feat != nullptr) && !sampler_.needs_logit_processing(); if (!can_spec) { @@ -829,7 +914,8 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, // ── DFlash spec-decode: draft → verify → accept → replay ────────── DFlashTarget * target = dflash_target(); - const int q_len = dw_.block_size; + const bool use_remote_draft = cfg_.remote_draft.enabled() && remote_draft_.active(); + const int q_len = dw_.block_size > 0 ? dw_.block_size : DFLASH27B_DRAFT_BLOCK_SIZE; StepGraph draft_sg; @@ -864,50 +950,60 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, // 2. Draft compute constexpr int DRAFT_CTX_MAX_DEFAULT = 2048; - const int ring_cap = feature_mirror_.cap; + const int ring_cap = use_remote_draft ? remote_draft_.ring_cap() : feature_mirror_.cap; const int draft_ctx = std::min(committed, std::min(ring_cap, std::max(DRAFT_CTX_MAX_DEFAULT, cfg_.draft_ctx_max))); const int draft_start = committed - draft_ctx; int mirror_slot0 = 0; const bool use_mirror_view = + !use_remote_draft && draft_feature_mirror_can_view(feature_mirror_, committed, draft_ctx, mirror_slot0); - if (!build_draft_step(draft_sg, dw_, /*lm_head=*/nullptr, draft_backend_, - draft_ctx, use_mirror_view ? &feature_mirror_ : nullptr, - committed, - /*ctx_len_max=*/std::min(ring_cap, std::max(DRAFT_CTX_MAX_DEFAULT, cfg_.draft_ctx_max)))) { - std::fprintf(stderr, "spec-decode: draft build failed\n"); - step_graph_destroy(draft_sg); - return false; - } - if (!use_mirror_view && - !copy_feature_ring_range_to_tensor(feature_mirror_, draft_sg.target_hidden_cat, - draft_start, draft_ctx)) { - std::fprintf(stderr, "spec-decode: feature copy failed\n"); - step_graph_destroy(draft_sg); - return false; - } - ggml_backend_tensor_set(draft_sg.inp_embed, noise_embed.data(), 0, - sizeof(float) * noise_embed.size()); - pos_k.resize((size_t)draft_ctx + q_len); - for (int i = 0; i < q_len; i++) pos_q[i] = draft_ctx + i; - for (int i = 0; i < draft_ctx + q_len; i++) pos_k[i] = i; - ggml_backend_tensor_set(draft_sg.positions, pos_q.data(), 0, - sizeof(int32_t) * pos_q.size()); - ggml_backend_tensor_set(draft_sg.positions_k, pos_k.data(), 0, - sizeof(int32_t) * pos_k.size()); - - auto st = ggml_backend_graph_compute(draft_backend_, draft_sg.gf); - if (st != GGML_STATUS_SUCCESS) { - std::fprintf(stderr, "spec-decode: draft compute failed\n"); - step_graph_destroy(draft_sg); - return false; - } + if (use_remote_draft) { + local_hidden.clear(); + if (!remote_draft_.propose(committed, draft_ctx, noise_embed, local_hidden)) { + std::fprintf(stderr, "spec-decode: remote draft propose failed\n"); + step_graph_destroy(draft_sg); + return false; + } + } else { + if (!build_draft_step(draft_sg, dw_, /*lm_head=*/nullptr, draft_backend_, + draft_ctx, use_mirror_view ? &feature_mirror_ : nullptr, + committed, + /*ctx_len_max=*/std::min(ring_cap, std::max(DRAFT_CTX_MAX_DEFAULT, cfg_.draft_ctx_max)))) { + std::fprintf(stderr, "spec-decode: draft build failed\n"); + step_graph_destroy(draft_sg); + return false; + } + if (!use_mirror_view && + !copy_feature_ring_range_to_tensor(feature_mirror_, draft_sg.target_hidden_cat, + draft_start, draft_ctx)) { + std::fprintf(stderr, "spec-decode: feature copy failed\n"); + step_graph_destroy(draft_sg); + return false; + } + ggml_backend_tensor_set(draft_sg.inp_embed, noise_embed.data(), 0, + sizeof(float) * noise_embed.size()); + pos_k.resize((size_t)draft_ctx + q_len); + for (int i = 0; i < q_len; i++) pos_q[i] = draft_ctx + i; + for (int i = 0; i < draft_ctx + q_len; i++) pos_k[i] = i; + ggml_backend_tensor_set(draft_sg.positions, pos_q.data(), 0, + sizeof(int32_t) * pos_q.size()); + ggml_backend_tensor_set(draft_sg.positions_k, pos_k.data(), 0, + sizeof(int32_t) * pos_k.size()); + + auto st = ggml_backend_graph_compute(draft_backend_, draft_sg.gf); + if (st != GGML_STATUS_SUCCESS) { + std::fprintf(stderr, "spec-decode: draft compute failed\n"); + step_graph_destroy(draft_sg); + return false; + } - // Read draft hidden states to host for LM-head projection. - local_hidden.resize((size_t)hidden * q_len); - ggml_backend_tensor_get(draft_sg.hidden_states, local_hidden.data(), 0, - sizeof(float) * local_hidden.size()); + // Read draft hidden states to host for LM-head projection. + local_hidden.resize((size_t)hidden * q_len); + ggml_backend_tensor_get(draft_sg.hidden_states, local_hidden.data(), 0, + sizeof(float) * local_hidden.size()); + } // 3. Project draft hidden → token IDs via target LM head if (!target->project_hidden_to_tokens(local_hidden.data(), q_len, draft_tok)) { @@ -979,7 +1075,12 @@ bool Qwen35Backend::do_spec_decode(int committed, int n_gen, last_tok = replay_last_tok; // 7. Sync features for replayed range to mirror (needed for next draft step) - if (feature_mirror_.target_feat && cache_.target_feat) { + if (use_remote_draft && cache_.target_feat) { + if (!sync_remote_draft_features(committed, commit_n)) { + step_graph_destroy(draft_sg); + return false; + } + } else if (feature_mirror_.target_feat && cache_.target_feat) { draft_feature_mirror_sync_range(cache_.target_feat, cache_.target_feat_cap, feature_mirror_, committed, commit_n); } diff --git a/dflash/src/qwen35/qwen35_backend.h b/dflash/src/qwen35/qwen35_backend.h index 506e30da4..277a8d3d5 100644 --- a/dflash/src/qwen35/qwen35_backend.h +++ b/dflash/src/qwen35/qwen35_backend.h @@ -13,7 +13,9 @@ #include "common/model_backend.h" #include "common/dflash_target.h" +#include "common/dflash_draft_ipc.h" #include "placement/placement_config.h" +#include "placement/remote_draft_config.h" #include "step_graph.h" #include "ddtree.h" #include "dflash_feature_ring.h" @@ -37,6 +39,7 @@ struct Qwen35Config { const char * draft_path = nullptr; DevicePlacement device; // target GPU placement int draft_gpu = 0; + RemoteDraftConfig remote_draft; int stream_fd = -1; // FA/KV @@ -107,6 +110,7 @@ class Qwen35Backend : public ModelBackend { bool supports_dflash_spec_decode() const override { return true; } DFlashTarget * dflash_target() override; + bool supports_remote_draft() const override { return true; } void shutdown() override; @@ -136,6 +140,7 @@ class Qwen35Backend : public ModelBackend { // ── Draft feature mirror (cross-GPU feature transfer) ──────────── DraftFeatureMirror feature_mirror_; + DFlashDraftIpcClient remote_draft_; // ── Prefix cache (snapshots) ───────────────────────────────────── static constexpr int PREFIX_SLOTS = 64; @@ -181,6 +186,8 @@ class Qwen35Backend : public ModelBackend { std::vector & out_tokens, const DaemonIO & io); + bool sync_remote_draft_features(int start_pos, int n_tokens); + // Chain-mode verify (single batch of q_len tokens). int verify_chain(int committed, const int32_t * draft_tok, int q_len); diff --git a/dflash/src/server/server_main.cpp b/dflash/src/server/server_main.cpp index dba9401de..166b41df2 100644 --- a/dflash/src/server/server_main.cpp +++ b/dflash/src/server/server_main.cpp @@ -65,24 +65,42 @@ static bool validate_server_placement(const BackendArgs & bargs) { placement_backend_name(compiled)); return false; } - if (!placement_backend_supported(bargs.draft_device.backend)) { - std::fprintf(stderr, - "[server] --draft-device=%s is unsupported in this binary " - "(compiled backend: %s)\n", - placement_device_name(bargs.draft_device).c_str(), - placement_backend_name(compiled)); - return false; - } const PlacementBackend target = bargs.device.backend == PlacementBackend::Auto ? compiled : bargs.device.backend; const PlacementBackend draft = bargs.draft_device.backend == PlacementBackend::Auto ? target : bargs.draft_device.backend; + if (!bargs.remote_draft.enabled() && bargs.remote_draft.has_aux_options()) { + std::fprintf(stderr, + "[server] --draft-ipc-work-dir and --draft-ipc-ring-cap require " + "--draft-ipc-bin\n"); + return false; + } if (target != draft) { + if (!bargs.remote_draft.enabled()) { + std::fprintf(stderr, + "[server] mixed target/draft backends require --draft-ipc-bin " + "(target=%s draft=%s)\n", + placement_backend_name(target), placement_backend_name(draft)); + return false; + } + if (!bargs.draft_path) { + std::fprintf(stderr, + "[server] mixed target/draft backends require --draft \n"); + return false; + } + } else if (bargs.remote_draft.enabled()) { std::fprintf(stderr, - "[server] mixed target/draft backends are not implemented in the " - "native server yet (target=%s draft=%s)\n", + "[server] --draft-ipc-bin is only needed for mixed target/draft " + "backends (target=%s draft=%s)\n", placement_backend_name(target), placement_backend_name(draft)); return false; + } else if (!placement_backend_supported(bargs.draft_device.backend)) { + std::fprintf(stderr, + "[server] --draft-device=%s is unsupported in this binary " + "(compiled backend: %s)\n", + placement_device_name(bargs.draft_device).c_str(), + placement_backend_name(compiled)); + return false; } if (!bargs.device.layer_split_gpus.empty()) { std::fprintf(stderr, @@ -111,6 +129,9 @@ static void print_usage(const char * prog) { " --max-tokens Default max output tokens (default: 4096)\n" " --target-device Target device (default: auto:0)\n" " --draft-device Draft device (default: auto:0)\n" + " --draft-ipc-bin Remote draft IPC daemon for mixed backends\n" + " --draft-ipc-work-dir Remote draft IPC scratch directory\n" + " --draft-ipc-ring-cap Remote draft feature ring capacity\n" " --target-devices Reserved layer-split devices, e.g. cuda:0,cuda:1\n" " --target-layer-split Reserved layer-split weights\n" " --peer-access Enable peer access for multi-GPU placement\n" @@ -197,6 +218,16 @@ int main(int argc, char ** argv) { std::fprintf(stderr, "[server] bad --draft-device value (expected backend:gpu)\n"); return 2; } + } else if (std::strcmp(argv[i], "--draft-ipc-bin") == 0 && i + 1 < argc) { + bargs.remote_draft.ipc_bin = argv[++i]; + } else if (std::strcmp(argv[i], "--draft-ipc-work-dir") == 0 && i + 1 < argc) { + bargs.remote_draft.work_dir = argv[++i]; + } else if (std::strcmp(argv[i], "--draft-ipc-ring-cap") == 0 && i + 1 < argc) { + bargs.remote_draft.ring_cap = std::atoi(argv[++i]); + if (bargs.remote_draft.ring_cap <= 0) { + std::fprintf(stderr, "[server] bad --draft-ipc-ring-cap value\n"); + return 2; + } } else if (std::strcmp(argv[i], "--target-devices") == 0 && i + 1 < argc) { if (target_device_seen) { std::fprintf(stderr, "[server] --target-devices conflicts with --target-device\n"); @@ -301,6 +332,21 @@ int main(int argc, char ** argv) { if (!validate_server_placement(bargs)) return 2; + if (bargs.remote_draft.enabled()) { + const std::string arch = detect_arch(bargs.model_path); + if (arch.empty()) { + std::fprintf(stderr, + "[server] failed to detect model architecture for remote draft validation\n"); + return 1; + } + if (!arch_supports_remote_draft(arch)) { + std::fprintf(stderr, + "[server] model architecture '%s' does not support remote draft execution\n", + arch.c_str()); + return 2; + } + } + // Sync max_ctx: if --max-ctx was not provided, use the backend's default. // This prevents the HTTP server from accepting prompts larger than the // KV cache the backend actually allocates. @@ -376,6 +422,12 @@ int main(int argc, char ** argv) { std::fprintf(stderr, "[server] backend creation failed\n"); return 1; } + if (bargs.remote_draft.enabled() && !backend->supports_remote_draft()) { + std::fprintf(stderr, + "[server] detected model backend does not support remote draft execution\n"); + backend->shutdown(); + return 2; + } // Start HTTP server. std::fprintf(stderr, "\n"); @@ -399,6 +451,18 @@ int main(int argc, char ** argv) { } std::fprintf(stderr, "[server] │ draft_device = %s\n", placement_device_name(bargs.draft_device).c_str()); + std::fprintf(stderr, "[server] │ draft_exec = %s\n", + bargs.remote_draft.enabled() ? "remote-ipc" : "local"); + if (bargs.remote_draft.enabled()) { + std::fprintf(stderr, "[server] │ draft_ipc_bin = %s\n", + bargs.remote_draft.ipc_bin.c_str()); + if (!bargs.remote_draft.work_dir.empty()) { + std::fprintf(stderr, "[server] │ draft_ipc_dir = %s\n", + bargs.remote_draft.work_dir.c_str()); + } + std::fprintf(stderr, "[server] │ draft_ipc_cap = %d\n", + bargs.remote_draft.ring_cap); + } std::fprintf(stderr, "[server] │ peer_access = %s\n", bargs.device.peer_access ? "ON" : "off"); std::fprintf(stderr, "[server] │ chunk = %d\n", bargs.chunk);