Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ add_library(dflash_common STATIC
src/common/pflash_drafter_ipc.cpp
src/common/dflash_draft_graph.cpp
src/common/dflash_spec_decode.cpp
src/common/layer_split_backend.cpp
src/qwen35/graph_builders.cpp
src/qwen35moe/qwen35moe_ffn.cpp
src/qwen35moe/qwen35moe_backend.cpp
Expand All @@ -256,6 +257,7 @@ add_library(dflash_common STATIC
src/qwen35/layer_split_forward.cpp
src/qwen35/layer_split_daemon.cpp
src/qwen35/qwen35_backend.cpp
src/qwen35/qwen35_layer_split_adapter.cpp
src/qwen35/qwen35_dflash_target.cpp
src/qwen35/qwen35_layer_split_dflash_target.cpp
src/qwen35/layer_split_daemon_loop.cpp
Expand All @@ -264,6 +266,7 @@ add_library(dflash_common STATIC
src/common/daemon_loop.cpp
src/common/gguf_inspect.cpp
src/common/backend_factory.cpp
src/placement/placement_config.cpp
src/common/layer_split_utils.cpp
src/common/ddtree.cpp
src/common/peer_access.cpp
Expand Down
28 changes: 28 additions & 0 deletions server/src/common/backend_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
#include "laguna_backend.h"
#include "qwen3_backend.h"
#include "gemma4_backend.h"
#include "layer_split_backend.h"
#include "qwen35_layer_split_adapter.h"

#include <cstdio>
#include <algorithm>

namespace dflash::common {

Expand Down Expand Up @@ -42,6 +45,31 @@ std::unique_ptr<ModelBackend> create_backend(const BackendArgs & args) {
std::fprintf(stderr, "[backend_factory] detected arch=%s\n", arch.c_str());

if (arch == "qwen35") {
if (args.device.is_layer_split()) {
Qwen35LayerSplitAdapterConfig cfg;
cfg.target_path = args.model_path;
cfg.draft_path = args.draft_path;
cfg.device = args.device;
cfg.draft_gpu = args.draft_device.gpu;
cfg.remote_draft = args.remote_draft;
cfg.fa_window = args.fa_window;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Layer-split qwen35 path silently drops multiple runtime/decode options present in non-split path

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At server/src/common/backend_factory.cpp, line 55:

<comment>Layer-split qwen35 path silently drops multiple runtime/decode options present in non-split path</comment>

<file context>
@@ -42,6 +45,30 @@ std::unique_ptr<ModelBackend> create_backend(const BackendArgs & args) {
+            cfg.device             = args.device;
+            cfg.draft_gpu          = args.draft_device.gpu;
+            cfg.remote_draft       = args.remote_draft;
+            cfg.fa_window          = args.fa_window;
+            cfg.kq_stride_pad      = args.kq_stride_pad;
+            cfg.draft_ctx_max      = args.draft_ctx_max;
</file context>

cfg.kq_stride_pad = args.kq_stride_pad;
cfg.draft_swa_window = args.draft_swa_window;
cfg.draft_ctx_max = args.draft_ctx_max;
cfg.max_verify_tokens = args.ddtree_mode
? std::max<int>(DFLASH27B_DRAFT_BLOCK_SIZE, args.ddtree_budget + 1)
: DFLASH27B_DRAFT_BLOCK_SIZE;
cfg.run_dflash = args.draft_path != nullptr;

auto adapter = std::make_unique<Qwen35LayerSplitAdapter>(cfg);
auto backend = std::make_unique<LayerSplitBackend>(std::move(adapter));
if (!backend->init()) {
std::fprintf(stderr, "[backend_factory] LayerSplitBackend(qwen35) init failed\n");
return nullptr;
}
return backend;
}

Qwen35Config cfg;
cfg.target_path = args.model_path;
cfg.draft_path = args.draft_path;
Expand Down
58 changes: 58 additions & 0 deletions server/src/common/dflash_draft_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,64 @@ bool DFlashDraftIpcClient::propose(
#endif
}

bool DFlashDraftIpcClient::get_feature_range(int start_pos, int n_tokens,
std::vector<float> & out) {
#if defined(_WIN32)
(void)start_pos; (void)n_tokens; (void)out;
return false;
#else
FILE * cmd = process_.command_stream();
const int stream_fd = process_.stream_fd();
if (!active_ || !cmd || stream_fd < 0 || ring_cap_ <= 0 ||
start_pos < 0 || n_tokens <= 0 || n_tokens > ring_cap_) return false;
const size_t count =
(size_t)n_tokens * (size_t)n_target_layers_ * (size_t)hidden_size_;
std::fprintf(cmd, "get_feature_range %d %d\n", start_pos, n_tokens);
std::fflush(cmd);
int32_t status = -1;
bool ok = read_exact_fd(stream_fd, &status, sizeof(status)) && status == 0;
if (ok) {
out.assign(count, 0.0f);
ok = read_exact_fd(stream_fd, out.data(), out.size() * sizeof(float));
}
if (!ok) {
std::fprintf(stderr, "draft-ipc get_feature_range failed status=%d\n", status);
}
return ok;
#endif
}

bool DFlashDraftIpcClient::set_feature_range(int start_pos, int n_tokens,
const std::vector<float> & data) {
#if defined(_WIN32)
(void)start_pos; (void)n_tokens; (void)data;
return false;
#else
FILE * cmd = process_.command_stream();
const int stream_fd = process_.stream_fd();
if (!active_ || !cmd || stream_fd < 0 || ring_cap_ <= 0 ||
start_pos < 0 || n_tokens <= 0 || n_tokens > ring_cap_) return false;
const size_t expected =
(size_t)n_tokens * (size_t)n_target_layers_ * (size_t)hidden_size_;
if (data.size() != expected) return false;
const std::string path = process_.next_path("feature_range");
if (!write_binary_file(path, data.data(), data.size() * sizeof(float))) {
std::fprintf(stderr, "draft-ipc write feature range failed: %s\n", path.c_str());
return false;
}
std::fprintf(cmd, "set_feature_range %d %d %s\n",
start_pos, n_tokens, path.c_str());
std::fflush(cmd);
int32_t status = -1;
const bool ok = read_exact_fd(stream_fd, &status, sizeof(status)) && status == 0;
std::remove(path.c_str());
if (!ok) {
std::fprintf(stderr, "draft-ipc set_feature_range failed status=%d\n", status);
}
return ok;
#endif
}

void DFlashDraftIpcClient::close() {
process_.close();
active_ = false;
Expand Down
4 changes: 4 additions & 0 deletions server/src/common/dflash_draft_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class DFlashDraftIpcClient {
const std::vector<float> & noise_embed,
std::vector<float> & hidden_out);

bool get_feature_range(int start_pos, int n_tokens, std::vector<float> & out);
bool set_feature_range(int start_pos, int n_tokens,
const std::vector<float> & data);

bool active() const { return active_; }
int ring_cap() const { return ring_cap_; }
int hidden_size() const { return hidden_size_; }
Expand Down
63 changes: 63 additions & 0 deletions server/src/common/dflash_draft_ipc_daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,69 @@ int run_dflash_draft_ipc_daemon(const char * draft_path,
stream_status(stream_fd, 0);
continue;
}
if (cmd == "get_feature_range") {
int start_pos = -1;
int n_tokens = 0;
iss >> start_pos >> n_tokens;
if (start_pos < 0 || n_tokens <= 0 || n_tokens > feature_ring.cap) {
std::fprintf(stderr, "[draft-ipc-daemon] bad get_feature_range: %s\n",
line.c_str());
stream_status(stream_fd, -1);
continue;
}
const int fc_in = feature_ring.n_target_layers * feature_ring.hidden_size;
const size_t row_bytes = (size_t)fc_in * sizeof(float);
const size_t src_stride = feature_ring.target_feat->nb[1];
std::vector<float> data((size_t)n_tokens * (size_t)fc_in);
for (int i = 0; i < n_tokens; ++i) {
const int slot = (start_pos + i) % feature_ring.cap;
ggml_backend_tensor_get(feature_ring.target_feat,
data.data() + (size_t)i * (size_t)fc_in,
(size_t)slot * src_stride,
row_bytes);
}
const size_t bytes = data.size() * sizeof(float);
if (!stream_status(stream_fd, 0) ||
!write_exact_fd(stream_fd, data.data(), bytes)) {
std::fprintf(stderr, "[draft-ipc-daemon] feature range stream failed\n");
break;
}
continue;
}
if (cmd == "set_feature_range") {
int start_pos = -1;
int n_tokens = 0;
iss >> start_pos >> n_tokens;
std::string path = read_line_tail(iss);
if (start_pos < 0 || n_tokens <= 0 || n_tokens > feature_ring.cap ||
path.empty()) {
std::fprintf(stderr, "[draft-ipc-daemon] bad set_feature_range: %s\n",
line.c_str());
stream_status(stream_fd, -1);
continue;
}
const int fc_in = feature_ring.n_target_layers * feature_ring.hidden_size;
const size_t row_bytes = (size_t)fc_in * sizeof(float);
const size_t dst_stride = feature_ring.target_feat->nb[1];
const size_t bytes = (size_t)n_tokens * row_bytes;
std::vector<float> data(bytes / sizeof(float));
if (!read_binary_file_exact(path, data.data(), bytes)) {
std::fprintf(stderr, "[draft-ipc-daemon] read feature range failed: %s\n",
path.c_str());
stream_status(stream_fd, -1);
continue;
}
for (int i = 0; i < n_tokens; ++i) {
const int slot = (start_pos + i) % feature_ring.cap;
ggml_backend_tensor_set(feature_ring.target_feat,
data.data() + (size_t)i * (size_t)fc_in,
(size_t)slot * dst_stride,
row_bytes);
}
ggml_backend_synchronize(backend);
stream_status(stream_fd, 0);
continue;
}
if (cmd == "propose") {
int committed = -1;
int ctx_len = 0;
Expand Down
12 changes: 6 additions & 6 deletions server/src/common/dflash_layer_split_runtime.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// dflash_layer_split_runtime.h — target-agnostic runtime types for the
// DFlash layer-split pipeline.
// target layer-split pipeline.
//
// Hosts the small pieces that are reused by every architecture's layer-split
// driver: a runtime-configuration struct (replaces former globals) and an
// activation double-buffer used to ferry hidden states between shards.
// Architecture-specific shard layouts (e.g. qwen35's TargetLayerSplitShard
// that embeds TargetWeights/TargetCache) live in their own headers.
// Hosts the small runtime pieces reused by layer-split drivers: a
// runtime-configuration struct and the activation double-buffer used to ferry
// hidden states between shards. Shared placement/load-plan/shard metadata lives
// in common/layer_split_utils.h; architecture-specific shard payloads keep their
// own weights/cache/graph types.

#pragma once

Expand Down
40 changes: 33 additions & 7 deletions server/src/common/dflash_spec_decode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,30 @@ bool run_dflash_spec_decode(
int draft_ctx_max,
int stream_fd,
DFlashDraftIpcClient * remote_draft,
const std::vector<int32_t> * hint_tokens) {
const std::vector<int32_t> * hint_tokens,
int base_pos) {
DaemonIO io;
io.stream_fd = stream_fd;
return run_dflash_spec_decode(target, draft_weights, draft_backend,
feature_ring, prompt, n_gen, last_tok,
out_path, draft_ctx_max, io,
remote_draft, hint_tokens, base_pos);
}

bool run_dflash_spec_decode(
DFlashTarget & target,
DraftWeights & draft_weights,
ggml_backend_t draft_backend,
DraftFeatureMirror & feature_ring,
const std::vector<int32_t> & prompt,
int n_gen,
int last_tok,
const char * out_path,
int draft_ctx_max,
const DaemonIO & io,
DFlashDraftIpcClient * remote_draft,
const std::vector<int32_t> * hint_tokens,
int base_pos) {
const bool use_remote_draft = remote_draft && remote_draft->active();
if (!use_remote_draft && !feature_ring.target_feat) return false;

Expand All @@ -54,7 +77,7 @@ bool run_dflash_spec_decode(
std::vector<float> remote_hidden; // host buffer for remote-draft hidden states

std::vector<int32_t> out_all = prompt;
int committed = (int)prompt.size();
int committed = base_pos + (int)prompt.size();
int n_generated = 0;
int n_draft_steps = 0;
int n_accept_sum = 0;
Expand Down Expand Up @@ -199,15 +222,19 @@ bool run_dflash_spec_decode(
last_tok = replay_last_tok;

bool hit_eos = false;
int emitted = 0;
for (int i = 0; i < commit_n; i++) {
out_all.push_back(replay_tok[i]);
stream_emit_fd(stream_fd, replay_tok[i]);
io.emit(replay_tok[i]);
if (io.cancelled) break;
++emitted;
if (target.is_eos(replay_tok[i])) hit_eos = true;
}
committed += commit_n;
n_generated += commit_n;
n_accept_sum += std::min(accept_n, commit_n);
committed += emitted;
n_generated += emitted;
n_accept_sum += std::min(accept_n, emitted);
n_draft_steps++;
if (io.cancelled) break;
if (hit_eos) break;
}
if (!use_remote_draft && draft_backend) ggml_backend_synchronize(draft_backend);
Expand All @@ -231,4 +258,3 @@ bool run_dflash_spec_decode(
}

} // namespace dflash::common

19 changes: 18 additions & 1 deletion server/src/common/dflash_spec_decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "dflash_target.h"
#include "dflash_feature_ring.h"
#include "dflash_draft_ipc.h"
#include "model_backend.h"

#include "ggml.h"
#include "ggml-backend.h"
Expand Down Expand Up @@ -53,6 +54,22 @@ bool run_dflash_spec_decode(
int draft_ctx_max,
int stream_fd = -1,
DFlashDraftIpcClient * remote_draft = nullptr,
const std::vector<int32_t> * hint_tokens = nullptr);
const std::vector<int32_t> * hint_tokens = nullptr,
int base_pos = 0);

bool run_dflash_spec_decode(
DFlashTarget & target,
DraftWeights & draft_weights,
ggml_backend_t draft_backend,
DraftFeatureMirror & feature_ring,
const std::vector<int32_t> & prompt,
int n_gen,
int last_tok,
const char * out_path,
int draft_ctx_max,
const DaemonIO & io,
DFlashDraftIpcClient * remote_draft = nullptr,
const std::vector<int32_t> * hint_tokens = nullptr,
int base_pos = 0);

} // namespace dflash::common
Loading
Loading