Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vllm_router_rs"
version = "0.1.24"
version = "0.1.25"
edition = "2021"

[features]
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "vllm-router"
version = "0.1.24"
version = "0.1.25"
description = "High-performance Rust-based load balancer for VLLM with multiple routing algorithms and prefill-decode disaggregation support"
authors = [{name = "Byron Hsu", email = "byronhsu1230@gmail.com"}]
requires-python = ">=3.8"
Expand Down Expand Up @@ -46,4 +46,3 @@ license-files = []

[tool.distutils.bdist_wheel]
py_limited_api = "cp38"

172 changes: 46 additions & 126 deletions src/routers/http/routed_experts_merge.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
//! Stitch compact routed-experts payloads for vLLM P/D disaggregation.

use base64::{engine::general_purpose::STANDARD, Engine as _};
use serde_json::Value;

const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
const NPY_V1_HEADER_PREFIX_LEN: usize = 10;
use serde_json::{json, Value};

#[derive(Clone, Debug, PartialEq, Eq)]
struct RoutedExpertsPayload {
seq_len: usize,
layers: usize,
topk: usize,
descr: String,
item_size: usize,
data: Vec<u8>,
}

impl RoutedExpertsPayload {
fn suffix_rows(&self, row_start: usize) -> Result<Self, String> {
let row_size = self.layers * self.topk * self.item_size;
if row_start > self.seq_len {
return Err(format!(
"decode routed_experts has {} rows, expected at least {row_start}",
self.seq_len
));
}
let row_size = self.layers * self.topk;
let byte_start = row_start * row_size;
let data = self
.data
Expand All @@ -35,24 +36,15 @@ impl RoutedExpertsPayload {
seq_len: self.seq_len - row_start,
layers: self.layers,
topk: self.topk,
descr: self.descr.clone(),
item_size: self.item_size,
data,
})
}

fn concat_rows(&self, other: &Self) -> Result<Self, String> {
if self.descr != other.descr || self.layers != other.layers || self.topk != other.topk {
if self.layers != other.layers || self.topk != other.topk {
return Err(format!(
"cannot concatenate routed_experts with shapes/dtypes ({}, {}, {}, {}) and ({}, {}, {}, {})",
self.seq_len,
self.layers,
self.topk,
self.descr,
other.seq_len,
other.layers,
other.topk,
other.descr,
"cannot concatenate routed_experts with shapes ({}, {}, {}) and ({}, {}, {})",
self.seq_len, self.layers, self.topk, other.seq_len, other.layers, other.topk,
));
}

Expand All @@ -64,8 +56,6 @@ impl RoutedExpertsPayload {
seq_len: self.seq_len + other.seq_len,
layers: self.layers,
topk: self.topk,
descr: self.descr.clone(),
item_size: self.item_size,
data,
})
}
Expand Down Expand Up @@ -103,7 +93,7 @@ pub fn merge_routed_experts_in_json(
let decode = decode_routed_experts_value(routed_experts, "decode routed_experts")?;
let completion = decode.suffix_rows(prompt.seq_len)?;
let merged = prompt.concat_rows(&completion)?;
choice["routed_experts"] = Value::String(encode_routed_experts_payload(&merged)?);
choice["routed_experts"] = encode_routed_experts_payload(&merged);
}

Ok(true)
Expand Down Expand Up @@ -133,99 +123,47 @@ fn decode_has_routed_experts(decode_json: &Value) -> bool {

fn decode_routed_experts_value(value: &Value, name: &str) -> Result<RoutedExpertsPayload, String> {
let payload = value
.as_str()
.ok_or_else(|| format!("{name} must be a base64 .npy string"))?;
.as_object()
.ok_or_else(|| format!("{name} must be an object with base64 data and shape"))?;
let data_payload = payload
.get("data")
.and_then(Value::as_str)
.ok_or_else(|| format!("{name} data must be a base64 string"))?;
let (seq_len, layers, topk) = parse_shape(payload.get("shape"), name)?;
let bytes = STANDARD
.decode(payload)
.decode(data_payload)
.map_err(|error| format!("{name} base64 decode failed: {error}"))?;
parse_npy_payload(&bytes, name)
}

fn parse_npy_payload(bytes: &[u8], name: &str) -> Result<RoutedExpertsPayload, String> {
if bytes.len() < NPY_V1_HEADER_PREFIX_LEN || &bytes[..6] != NPY_MAGIC {
return Err(format!("{name} is not a NumPy .npy payload"));
}

let (header_len, data_start) = match bytes[6] {
1 => (
u16::from_le_bytes([bytes[8], bytes[9]]) as usize,
NPY_V1_HEADER_PREFIX_LEN,
),
2 | 3 => (
u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]) as usize,
12,
),
version => return Err(format!("{name} has unsupported .npy version {version}")),
};

let header_end = data_start + header_len;
let header = std::str::from_utf8(&bytes[data_start..header_end])
.map_err(|error| format!("{name} header decode failed: {error}"))?;
let descr = parse_descr(header, name)?;
let item_size = dtype_item_size(&descr, name)?;
let (seq_len, layers, topk) = parse_shape(header, name)?;
let data = bytes[header_end..].to_vec();
let expected_data_len = seq_len * layers * topk * item_size;
if data.len() != expected_data_len {
let expected_data_len = seq_len
.checked_mul(layers)
.and_then(|size| size.checked_mul(topk))
.ok_or_else(|| format!("{name} shape is too large"))?;
if bytes.len() != expected_data_len {
return Err(format!(
"{name} has {} data bytes, expected {expected_data_len}",
data.len()
bytes.len()
));
}

Ok(RoutedExpertsPayload {
seq_len,
layers,
topk,
descr,
item_size,
data,
data: bytes,
})
}

fn parse_descr(header: &str, name: &str) -> Result<String, String> {
let after_key = header
.split("'descr':")
.nth(1)
.or_else(|| header.split("\"descr\":").nth(1))
.ok_or_else(|| format!("{name} header is missing descr"))?;
let quote = after_key
.find(['\'', '"'])
.ok_or_else(|| format!("{name} descr is missing opening quote"))?;
let after_quote = &after_key[quote + 1..];
let end_quote = after_quote
.find(['\'', '"'])
.ok_or_else(|| format!("{name} descr is missing closing quote"))?;
Ok(after_quote[..end_quote].to_string())
}

fn dtype_item_size(descr: &str, name: &str) -> Result<usize, String> {
match descr {
"|u1" => Ok(1),
"<i2" => Ok(2),
"<i4" => Ok(4),
_ => Err(format!("{name} has unsupported dtype {descr}")),
}
}

fn parse_shape(header: &str, name: &str) -> Result<(usize, usize, usize), String> {
let shape_header = header
.split("shape")
.nth(1)
.ok_or_else(|| format!("{name} header is missing shape"))?;
let shape_start = shape_header
.find('(')
.ok_or_else(|| format!("{name} shape is missing '('"))?;
let shape_end = shape_header[shape_start + 1..]
.find(')')
.ok_or_else(|| format!("{name} shape is missing ')'"))?
+ shape_start
+ 1;
let dims = shape_header[shape_start + 1..shape_end]
.split(',')
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| value.parse::<usize>().map_err(|error| error.to_string()))
fn parse_shape(value: Option<&Value>, name: &str) -> Result<(usize, usize, usize), String> {
let shape = value
.and_then(Value::as_array)
.ok_or_else(|| format!("{name} shape must be an array"))?;
let dims = shape
.iter()
.map(|value| {
let dim = value
.as_u64()
.ok_or_else(|| "shape dimension must be a non-negative integer".to_string())?;
usize::try_from(dim).map_err(|error| error.to_string())
})
.collect::<Result<Vec<_>, _>>()
.map_err(|error| format!("{name} shape parse failed: {error}"))?;

Expand All @@ -235,44 +173,26 @@ fn parse_shape(header: &str, name: &str) -> Result<(usize, usize, usize), String
}
}

fn encode_routed_experts_payload(payload: &RoutedExpertsPayload) -> Result<String, String> {
let header_body = format!(
"{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}, {}), }}",
payload.descr, payload.seq_len, payload.layers, payload.topk
);
let mut header = header_body.into_bytes();
let padding = (16 - ((NPY_V1_HEADER_PREFIX_LEN + header.len() + 1) % 16)) % 16;
header.extend(std::iter::repeat_n(b' ', padding));
header.push(b'\n');

let header_len = u16::try_from(header.len())
.map_err(|_| "routed_experts NumPy header is too large for v1 .npy".to_string())?;
let mut bytes =
Vec::with_capacity(NPY_V1_HEADER_PREFIX_LEN + header.len() + payload.data.len());
bytes.extend_from_slice(NPY_MAGIC);
bytes.push(1);
bytes.push(0);
bytes.extend_from_slice(&header_len.to_le_bytes());
bytes.extend_from_slice(&header);
bytes.extend_from_slice(&payload.data);
Ok(STANDARD.encode(bytes))
fn encode_routed_experts_payload(payload: &RoutedExpertsPayload) -> Value {
json!({
"data": STANDARD.encode(&payload.data),
"shape": [payload.seq_len, payload.layers, payload.topk],
})
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

fn uint8_payload(seq_len: usize, layers: usize, topk: usize, data: &[u8]) -> String {
fn uint8_payload(seq_len: usize, layers: usize, topk: usize, data: &[u8]) -> Value {
let payload = RoutedExpertsPayload {
seq_len,
layers,
topk,
descr: "|u1".to_string(),
item_size: 1,
data: data.to_vec(),
};
encode_routed_experts_payload(&payload).unwrap()
encode_routed_experts_payload(&payload)
}

#[test]
Expand Down
Loading