diff --git a/Cargo.lock b/Cargo.lock index d88ddd271c..dc34272b32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,18 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.40.0" @@ -1232,6 +1244,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.15.17" @@ -2036,6 +2057,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "dlpark" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc178fc3bf4ce54c26ccffcf271ff574954ac4b940f15121be3d69f277194537" +dependencies = [ + "half 2.6.0", + "pyo3", +] + [[package]] name = "dlv-list" version = "0.5.2" @@ -2569,6 +2600,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eventsource-stream" version = "0.2.3" @@ -3890,6 +3942,12 @@ dependencies = [ "web-time", ] +[[package]] +name = "indoc" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" + [[package]] name = "inlinable_string" version = "0.1.15" @@ -3946,6 +4004,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "io-uring" version = "0.7.10" @@ -4203,6 +4270,40 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kvbm-py3" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "cudarc 0.16.6", + "derive-getters", + "dlpark", + "dynamo-llm", + "dynamo-runtime", + "either", + "futures", + "local-ip-address", + "once_cell", + "prometheus", + "pyo3", + "pyo3-async-runtimes", + "pythonize", + "rand 0.9.2", + "rstest 0.25.0", + "serde", + "serde_json", + "socket2 0.6.0", + "thiserror 2.0.16", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid 1.18.1", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -4571,6 +4672,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metal" version = "0.27.0" @@ -5115,7 +5225,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if 1.0.3", "libc", - "memoffset", + "memoffset 0.7.1", "pin-utils", ] @@ -5650,6 +5760,12 @@ dependencies = [ "serde", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -6232,6 +6348,107 @@ dependencies = [ "num-traits", ] +[[package]] +name = "pyo3" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7778bffd85cf38175ac1f545509665d0b9b92a198ca7941f131f85f7a4f9a872" +dependencies = [ + "cfg-if 1.0.3", + "indoc", + "libc", + "memoffset 0.9.1", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-async-runtimes" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977dc837525cfd22919ba6a831413854beb7c99a256c03bf8624ad707e45810e" +dependencies = [ + "async-channel", + "clap 4.5.48", + "futures", + "inventory", + "once_cell", + "pin-project-lite", + "pyo3", + "pyo3-async-runtimes-macros", + "tokio", +] + +[[package]] +name = "pyo3-async-runtimes-macros" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2df2884957d2476731f987673befac5d521dff10abb0a7cbe12015bc7702fe9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "pyo3-build-config" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f1b4c431c0bb1c8fb0a338709859eed0d030ff6daa34368d3b152a63dfdd8d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc2201328f63c4710f68abdf653c89d8dbc2858b88c5d88b0ff38a75288a9da" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca6726ad0f3da9c9de093d6f116a93c1a38e417ed73bf138472cf4064f72028" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "pythonize" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91a6ee7a084f913f98d70cdc3ebec07e852b735ae3059a1500db2661265da9ff" +dependencies = [ + "pyo3", + "serde", +] + [[package]] name = "qoi" version = "0.4.1" @@ -6820,6 +7037,18 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "rstest" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fc39292f8613e913f7df8fa892b8944ceb47c247b78e1b1ae2f09e019be789d" +dependencies = [ + "futures-timer", + "futures-util", + "rstest_macros 0.25.0", + "rustc_version", +] + [[package]] name = "rstest_macros" version = "0.18.2" @@ -6855,6 +7084,24 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rstest_macros" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f168d99749d307be9de54d23fd226628d99768225ef08f6ffb52e0182a27746" +dependencies = [ + "cfg-if 1.0.3", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.106", + "unicode-ident", +] + [[package]] name = "rstest_reuse" version = "0.7.0" @@ -9305,6 +9552,12 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "unindent" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3" + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 156b5039c3..afbaf26268 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "lib/bindings/c", "lib/bindings/python/codegen", "lib/engines/*", + "lib/kvbm", ] # Exclude certain packages that are slow to build and we don't ship as flagship # features from default build, but keep them in workspace for convenience. diff --git a/components/src/dynamo/vllm/args.py b/components/src/dynamo/vllm/args.py index 64e55d084d..ec19fd58bf 100644 --- a/components/src/dynamo/vllm/args.py +++ b/components/src/dynamo/vllm/args.py @@ -435,7 +435,7 @@ def create_kv_transfer_config(config: Config) -> Optional[KVTransferConfig]: elif connector == "kvbm": connector_cfg = { "kv_connector": "DynamoConnector", - "kv_connector_module_path": "dynamo.llm.vllm_integration.connector", + "kv_connector_module_path": "kvbm.vllm_integration.connector", "kv_role": "kv_both", } multi_connectors.append(connector_cfg) @@ -450,7 +450,7 @@ def create_kv_transfer_config(config: Config) -> Optional[KVTransferConfig]: kv_connector="PdConnector", kv_role="kv_both", kv_connector_extra_config={"connectors": multi_connectors}, - kv_connector_module_path="dynamo.llm.vllm_integration.connector", + kv_connector_module_path="kvbm.vllm_integration.connector", ) diff --git a/components/src/dynamo/vllm/main.py b/components/src/dynamo/vllm/main.py index d11a3a565b..4edbd5b05e 100644 --- a/components/src/dynamo/vllm/main.py +++ b/components/src/dynamo/vllm/main.py @@ -8,6 +8,7 @@ from typing import Optional import uvloop +from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints from prometheus_client import REGISTRY from vllm.distributed.kv_events import ZmqEventPublisher from vllm.usage.usage_lib import UsageContext @@ -25,7 +26,6 @@ fetch_llm, register_llm, ) -from dynamo.llm.vllm_integration.consolidator_config import get_consolidator_endpoints from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime.logging import configure_dynamo_logging from dynamo.vllm.multimodal_handlers import ( diff --git a/container/Dockerfile b/container/Dockerfile index 4aeaa61c76..b17c57ca7b 100644 --- a/container/Dockerfile +++ b/container/Dockerfile @@ -39,6 +39,7 @@ ARG SCCACHE_REGION="" # NIXL configuration ARG NIXL_UCX_REF=v1.19.0 ARG NIXL_REF=0.7.0 +ARG NIXL_GDRCOPY_REF=v2.5.1 # Python configuration ARG PYTHON_VERSION=3.12 @@ -58,6 +59,7 @@ ARG SCCACHE_BUCKET ARG SCCACHE_REGION ARG NIXL_UCX_REF ARG NIXL_REF +ARG NIXL_GDRCOPY_REF USER root WORKDIR /opt/dynamo @@ -103,46 +105,6 @@ RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1. rm rustup-init && \ chmod -R a+w $RUSTUP_HOME $CARGO_HOME -################################## -########## System Dependencies ### -################################## - -# Install system packages -RUN apt-get update -y \ - && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ - # NIXL build dependencies - autoconf \ - automake \ - cmake \ - git \ - git-lfs \ - libtool \ - meson \ - net-tools \ - ninja-build \ - pybind11-dev \ - # Rust build dependencies - clang \ - libclang-dev \ - protobuf-compiler \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - -# These headers are missing with the hpcx installer, required -# by UCX to build and use RDMA devices. Reinstall to make sure to recreate -# symlink .so to .so.1 in case some packages are already found. -RUN apt-get update -y \ - && DEBIAN_FRONTEND=noninteractive apt-get -y install --reinstall --no-install-recommends \ - libibverbs-dev \ - rdma-core \ - ibverbs-utils \ - libibumad-dev \ - libnuma-dev \ - librdmacm-dev \ - ibverbs-providers \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - ################################## ########## External Services ##### ################################## @@ -161,110 +123,6 @@ RUN wget --tries=3 --waitretry=5 https://github.com/etcd-io/etcd/releases/downlo rm /tmp/etcd.tar.gz ENV PATH=/usr/local/bin/etcd/:$PATH -################################## -########## UCX Build ############# -################################## - -# Build and install UCX -RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ - --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ - export SCCACHE_S3_KEY_PREFIX=${SCCACHE_S3_KEY_PREFIX:-${ARCH}} && \ - rm -rf /opt/hpcx/ucx && \ - rm -rf /usr/local/ucx && \ - echo "Building UCX with reference $NIXL_UCX_REF" && \ - cd /usr/local/src && \ - git clone https://github.com/openucx/ucx.git && \ - cd ucx && git checkout $NIXL_UCX_REF && \ - CC=${USE_SCCACHE:+sccache gcc} && \ - CXX=${USE_SCCACHE:+sccache g++} && \ - export CC=${CC} && \ - export CXX=${CXX} && \ - ./autogen.sh && \ - ./configure \ - --prefix=/usr/local/ucx \ - --enable-shared \ - --disable-static \ - --disable-doxygen-doc \ - --enable-optimizations \ - --enable-cma \ - --enable-devel-headers \ - --with-cuda=/usr/local/cuda \ - --with-verbs \ - --with-efa \ - --with-dm \ - --with-gdrcopy=/usr/local \ - --enable-mt && \ - make -j$(nproc) && \ - make -j$(nproc) install-strip && \ - /tmp/use-sccache.sh show-stats "UCX" && \ - echo "/usr/local/ucx/lib" > /etc/ld.so.conf.d/ucx.conf && \ - echo "/usr/local/ucx/lib/ucx" >> /etc/ld.so.conf.d/ucx.conf && \ - ldconfig && \ - cd /usr/local/src && \ - rm -rf ucx - -# UCX environment variables -ENV CPATH=/usr/include \ - PATH=/usr/bin:/usr/local/ucx/bin:$PATH \ - PKG_CONFIG_PATH=/usr/lib/pkgconfig - -################################## -########## NIXL Setup ############ -################################## - -# NIXL environment setup -ENV NIXL_SRC_DIR=/opt/nixl \ - NIXL_PREFIX=/opt/nvidia/nvda_nixl \ - NIXL_LIB_DIR=/opt/nvidia/nvda_nixl/lib/${ARCH_ALT}-linux-gnu \ - NIXL_PLUGIN_DIR=/opt/nvidia/nvda_nixl/lib/${ARCH_ALT}-linux-gnu/plugins - -# Build and install NIXL -RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ - --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ - git clone --depth 1 --branch ${NIXL_REF} "https://github.com/ai-dynamo/nixl.git" ${NIXL_SRC_DIR} && \ - cd ${NIXL_SRC_DIR} && \ - if [ "$ARCH" = "arm64" ]; then \ - nixl_build_args="-Ddisable_gds_backend=true"; \ - else \ - nixl_build_args=""; \ - fi && \ - meson setup build/ --buildtype=release --prefix=$NIXL_PREFIX $nixl_build_args && \ - ninja -C build/ -j$(nproc) && ninja -C build/ install && \ - /tmp/use-sccache.sh show-stats "NIXL" && \ - echo "$NIXL_LIB_DIR" > /etc/ld.so.conf.d/nixl.conf && \ - echo "$NIXL_PLUGIN_DIR" >> /etc/ld.so.conf.d/nixl.conf && \ - ldconfig - -# Build NIXL Python module -# TODO OPS-590: Move gds_path selection based on arch into NIXL build and re-enable gds backend for arm64 -RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ - --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ - if [ "$ARCH" = "arm64" ]; then \ - cd ${NIXL_SRC_DIR} && uv build . --out-dir /opt/dynamo/wheelhouse/nixl --python $PYTHON_VERSION \ - --config-settings=setup-args="-Ddisable_gds_backend=true"; \ - else \ - cd ${NIXL_SRC_DIR} && uv build . --out-dir /opt/dynamo/wheelhouse/nixl --python $PYTHON_VERSION; \ - fi - -################################## -########## Python Environment #### -################################## - -# Create and activate virtual environment -ARG PYTHON_VERSION -RUN mkdir -p /opt/dynamo/venv && \ - uv venv /opt/dynamo/venv --python $PYTHON_VERSION - -ENV VIRTUAL_ENV=/opt/dynamo/venv \ - PATH="/opt/dynamo/venv/bin:${PATH}" - -# Install common and test dependencies -RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \ - --mount=type=bind,source=./container/deps/requirements.test.txt,target=/tmp/requirements.test.txt \ - UV_GIT_LFS=1 uv pip install \ - --no-cache \ - --requirement /tmp/requirements.txt \ - --requirement /tmp/requirements.test.txt ################################## ##### Wheel Build Image ########## @@ -279,27 +137,42 @@ FROM quay.io/pypa/manylinux_2_28_${ARCH_ALT} AS wheel_builder ARG ARCH ARG ARCH_ALT ARG CARGO_BUILD_JOBS +ARG PYTHON_VERSION ARG ENABLE_KVBM ARG USE_SCCACHE ARG SCCACHE_BUCKET ARG SCCACHE_REGION +ARG NIXL_UCX_REF +ARG NIXL_REF +ARG NIXL_GDRCOPY_REF -WORKDIR /opt/dynamo - -# Set environment variables -ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS:-16} \ - RUSTUP_HOME=/usr/local/rustup \ - CARGO_HOME=/usr/local/cargo \ - CARGO_TARGET_DIR=/opt/dynamo/target \ - VIRTUAL_ENV=/opt/dynamo/venv \ - NIXL_PREFIX=/opt/nvidia/nvda_nixl \ - PATH=/usr/local/cargo/bin:/opt/dynamo/venv/bin:$PATH +WORKDIR /workspace # Install system dependencies -RUN dnf update -y \ - && dnf install -y llvm-toolset protobuf-compiler wget unzip \ - && dnf clean all \ - && rm -rf /var/cache/dnf +RUN yum groupinstall -y 'Development Tools' && \ + dnf install -y almalinux-release-synergy && \ + dnf config-manager --set-enabled powertools && \ + dnf install -y \ + # Build tools + cmake \ + ninja-build \ + clang-devel \ + gcc-c++ \ + flex \ + wget \ + # Kernel module build dependencies + dkms \ + # Protobuf support + protobuf-compiler \ + # RDMA/InfiniBand support (required for UCX build with --with-verbs) + libibverbs \ + libibverbs-devel \ + rdma-core \ + rdma-core-devel \ + libibumad \ + libibumad-devel \ + librdmacm-devel \ + numactl-devel # Ensure a modern protoc is available (required for --experimental_allow_proto3_optional) RUN set -eux; \ @@ -319,18 +192,16 @@ RUN set -eux; \ # Point build tools explicitly at the modern protoc ENV PROTOC=/usr/local/bin/protoc +# Set environment variables first so they can be used in COPY commands +ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS:-16} \ + RUSTUP_HOME=/usr/local/rustup \ + CARGO_HOME=/usr/local/cargo \ + CARGO_TARGET_DIR=/opt/dynamo/target \ + PATH=/usr/local/cargo/bin:$PATH + # Copy artifacts from base stage COPY --from=base $RUSTUP_HOME $RUSTUP_HOME COPY --from=base $CARGO_HOME $CARGO_HOME -COPY --from=base $NIXL_PREFIX $NIXL_PREFIX - -ARG PYTHON_VERSION -RUN mkdir -p /opt/dynamo/venv && \ - uv venv /opt/dynamo/venv --python $PYTHON_VERSION - -ENV VIRTUAL_ENV=/opt/dynamo/venv \ - PATH="/opt/dynamo/venv/bin:${PATH}" - # Install SCCACHE if requested COPY container/use-sccache.sh /tmp/use-sccache.sh @@ -343,21 +214,110 @@ ENV SCCACHE_BUCKET=${USE_SCCACHE:+${SCCACHE_BUCKET}} \ SCCACHE_REGION=${USE_SCCACHE:+${SCCACHE_REGION}} \ RUSTC_WRAPPER=${USE_SCCACHE:+sccache} +# Copy CUDA from base stage +COPY --from=base /usr/local/cuda /usr/local/cuda +COPY --from=base /etc/ld.so.conf.d/hpcx.conf /etc/ld.so.conf.d/hpcx.conf + +ENV CUDA_PATH=/usr/local/cuda \ + PATH=/usr/local/cuda/bin:$PATH \ + LD_LIBRARY_PATH=/usr/local/cuda/lib64:/usr/local/lib:/usr/local/lib64:$LD_LIBRARY_PATH \ + NVIDIA_DRIVER_CAPABILITIES=video,compute,utility + +# Create virtual environment for building wheels +ENV VIRTUAL_ENV=/workspace/.venv +RUN uv venv ${VIRTUAL_ENV} --python $PYTHON_VERSION && \ + uv pip install --upgrade meson pybind11 patchelf maturin[patchelf] + +# Build and install gdrcopy +RUN git clone --depth 1 --branch ${NIXL_GDRCOPY_REF} https://github.com/NVIDIA/gdrcopy.git && \ + cd gdrcopy/packages && \ + CUDA=/usr/local/cuda ./build-rpm-packages.sh && \ + rpm -Uvh gdrcopy-kmod-*.el8.noarch.rpm && \ + rpm -Uvh gdrcopy-*.el8.${ARCH_ALT}.rpm && \ + rpm -Uvh gdrcopy-devel-*.el8.noarch.rpm + +# Build and install UCX +RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ + --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ + export SCCACHE_S3_KEY_PREFIX="${SCCACHE_S3_KEY_PREFIX:-${ARCH}}" && \ + CC=${USE_SCCACHE:+sccache gcc} && \ + CXX=${USE_SCCACHE:+sccache g++} && \ + export CC=${CC} && \ + export CXX=${CXX} && \ + cd /usr/local/src && \ + git clone https://github.com/openucx/ucx.git && \ + cd ucx && \ + git checkout $NIXL_UCX_REF && \ + ./autogen.sh && ./configure \ + --prefix=/usr/local/ucx \ + --enable-shared \ + --disable-static \ + --disable-doxygen-doc \ + --enable-optimizations \ + --enable-cma \ + --enable-devel-headers \ + --with-cuda=/usr/local/cuda \ + --with-verbs \ + --with-dm \ + --with-gdrcopy=/usr/local \ + --with-efa \ + --enable-mt && \ + make -j && \ + make -j install-strip && \ + /tmp/use-sccache.sh show-stats "UCX" && \ + echo "/usr/local/ucx/lib" > /etc/ld.so.conf.d/ucx.conf && \ + echo "/usr/local/ucx/lib/ucx" >> /etc/ld.so.conf.d/ucx.conf && \ + ldconfig + +# build and install nixl +RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ + --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ + export SCCACHE_S3_KEY_PREFIX="${SCCACHE_S3_KEY_PREFIX:-${ARCH}}" && \ + source ${VIRTUAL_ENV}/bin/activate && \ + git clone --depth 1 --branch ${NIXL_REF} "https://github.com/ai-dynamo/nixl.git" && \ + cd nixl && \ + mkdir build && \ + meson setup build/ --prefix=/opt/nvidia/nvda_nixl --buildtype=release \ + -Dcudapath_lib="/usr/local/cuda/lib64" \ + -Dcudapath_inc="/usr/local/cuda/include" \ + -Ducx_path="/usr/local/ucx" && \ + cd build && \ + ninja && \ + ninja install && \ + /tmp/use-sccache.sh show-stats "NIXL" + +ENV NIXL_LIB_DIR=/opt/nvidia/nvda_nixl/lib64 \ + NIXL_PLUGIN_DIR=/opt/nvidia/nvda_nixl/lib64/plugins \ + NIXL_PREFIX=/opt/nvidia/nvda_nixl +ENV LD_LIBRARY_PATH=${NIXL_LIB_DIR}:${NIXL_PLUGIN_DIR}:/usr/local/ucx/lib:/usr/local/ucx/lib/ucx:${LD_LIBRARY_PATH} + +RUN echo "$NIXL_LIB_DIR" > /etc/ld.so.conf.d/nixl.conf && \ + echo "$NIXL_PLUGIN_DIR" >> /etc/ld.so.conf.d/nixl.conf && \ + ldconfig + +RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ + --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ + export SCCACHE_S3_KEY_PREFIX="${SCCACHE_S3_KEY_PREFIX:-${ARCH}}" && \ + cd /workspace/nixl && \ + uv build . --out-dir /opt/dynamo/dist/nixl --python $PYTHON_VERSION + # Copy source code (order matters for layer caching) COPY pyproject.toml README.md LICENSE Cargo.toml Cargo.lock rust-toolchain.toml hatch_build.py /opt/dynamo/ +COPY launch/ /opt/dynamo/launch/ COPY lib/ /opt/dynamo/lib/ COPY components/ /opt/dynamo/components/ -# Build wheels +# Build dynamo wheels RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ export SCCACHE_S3_KEY_PREFIX=${SCCACHE_S3_KEY_PREFIX:-${ARCH}} && \ + source ${VIRTUAL_ENV}/bin/activate && \ + cd /opt/dynamo && \ uv build --wheel --out-dir /opt/dynamo/dist && \ cd /opt/dynamo/lib/bindings/python && \ - uv pip install maturin[patchelf] && \ + maturin build --release --out /opt/dynamo/dist && \ if [ "$ENABLE_KVBM" = "true" ]; then \ - maturin build --release --features block-manager --out /opt/dynamo/dist; \ - else \ + cd /opt/dynamo/lib/kvbm && \ maturin build --release --out /opt/dynamo/dist; \ fi && \ /tmp/use-sccache.sh show-stats "Dynamo" @@ -368,23 +328,66 @@ RUN --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ FROM base AS dev +ARG ENABLE_KVBM +ARG ARCH_ALT + # Application environment variables ENV DYNAMO_HOME=/opt/dynamo \ CARGO_TARGET_DIR=/opt/dynamo/target -WORKDIR /opt/dynamo +# NIXL environment variables +ENV NIXL_PREFIX=/opt/nvidia/nvda_nixl \ + NIXL_LIB_DIR=/opt/nvidia/nvda_nixl/lib/${ARCH_ALT}-linux-gnu \ + NIXL_PLUGIN_DIR=/opt/nvidia/nvda_nixl/lib/${ARCH_ALT}-linux-gnu/plugins +ENV LD_LIBRARY_PATH=${NIXL_LIB_DIR}:${NIXL_PLUGIN_DIR}:/usr/local/ucx/lib:/usr/local/ucx/lib/ucx:${LD_LIBRARY_PATH} + +# Copy ucx and nixl libs +COPY --from=wheel_builder /usr/local/ucx/ /usr/local/ucx/ +COPY --from=wheel_builder ${NIXL_PREFIX}/ ${NIXL_PREFIX}/ +COPY --from=wheel_builder /opt/nvidia/nvda_nixl/lib64/. ${NIXL_LIB_DIR}/ # Copy built artifacts +COPY --from=wheel_builder /opt/dynamo/dist/nixl/ /opt/dynamo/wheelhouse/nixl/ COPY --from=wheel_builder /opt/dynamo/dist/*.whl /opt/dynamo/wheelhouse/ COPY --from=wheel_builder $CARGO_TARGET_DIR $CARGO_TARGET_DIR COPY --from=wheel_builder $CARGO_HOME $CARGO_HOME -# Install Python packages +RUN apt-get update -y \ + && apt-get install -y --no-install-recommends \ + # required for AIC perf files + git \ + git-lfs \ + # rust build packages + clang \ + libclang-dev \ + protobuf-compiler \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Create and activate virtual environment +ARG PYTHON_VERSION +RUN mkdir -p /opt/dynamo/venv && \ + uv venv /opt/dynamo/venv --python $PYTHON_VERSION + +ENV VIRTUAL_ENV=/opt/dynamo/venv \ + PATH="/opt/dynamo/venv/bin:${PATH}" + +# Install common and test dependencies +RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \ + --mount=type=bind,source=./container/deps/requirements.test.txt,target=/tmp/requirements.test.txt \ + UV_GIT_LFS=1 uv pip install \ + --no-cache \ + --requirement /tmp/requirements.txt \ + --requirement /tmp/requirements.test.txt + COPY benchmarks/ /opt/dynamo/benchmarks/ RUN uv pip install \ /opt/dynamo/wheelhouse/ai_dynamo_runtime*.whl \ /opt/dynamo/wheelhouse/ai_dynamo*any.whl \ - /opt/dynamo/wheelhouse/nixl/nixl*.whl \ + /opt/dynamo/wheelhouse/nixl/nixl*.whl && \ + if [ "$ENABLE_KVBM" = "true" ]; then \ + uv pip install /opt/dynamo/wheelhouse/kvbm*.whl; \ + fi \ && cd /opt/dynamo/benchmarks \ && UV_GIT_LFS=1 uv pip install --no-cache . \ && cd - \ @@ -393,7 +396,8 @@ RUN uv pip install \ # Setup launch banner RUN --mount=type=bind,source=./container/launch_message.txt,target=/opt/dynamo/launch_message.txt \ sed '/^#\s/d' /opt/dynamo/launch_message.txt > ~/.launch_screen && \ - echo "cat ~/.launch_screen" >> ~/.bashrc + echo "cat ~/.launch_screen" >> ~/.bashrc && \ + echo "source $VIRTUAL_ENV/bin/activate" >> ~/.bashrc ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"] CMD [] diff --git a/container/Dockerfile.trtllm b/container/Dockerfile.trtllm index 0345288ab7..7735f160d3 100644 --- a/container/Dockerfile.trtllm +++ b/container/Dockerfile.trtllm @@ -64,6 +64,7 @@ ENV VIRTUAL_ENV=/opt/dynamo/venv ARG ARCH_ALT ARG PYTHON_VERSION +ARG ENABLE_KVBM ENV NIXL_PREFIX=/opt/nvidia/nvda_nixl ENV NIXL_LIB_DIR=$NIXL_PREFIX/lib/${ARCH_ALT}-linux-gnu ENV NIXL_PLUGIN_DIR=$NIXL_LIB_DIR/plugins @@ -241,9 +242,12 @@ ENV LD_LIBRARY_PATH=${TENSORRT_LIB_DIR}:${LD_LIBRARY_PATH} COPY benchmarks/ /opt/dynamo/benchmarks/ COPY --from=dynamo_base /opt/dynamo/wheelhouse/ /opt/dynamo/wheelhouse/ RUN uv pip install \ - /opt/dynamo/wheelhouse/ai_dynamo_runtime*.whl \ - /opt/dynamo/wheelhouse/ai_dynamo*any.whl \ - /opt/dynamo/wheelhouse/nixl/nixl*.whl \ + /opt/dynamo/wheelhouse/ai_dynamo_runtime*.whl \ + /opt/dynamo/wheelhouse/ai_dynamo*any.whl \ + /opt/dynamo/wheelhouse/nixl/nixl*.whl \ + && if [ "${ENABLE_KVBM}" = "true" ]; then \ + uv pip install /opt/dynamo/wheelhouse/kvbm*.whl; \ + fi \ && cd /opt/dynamo/benchmarks \ && UV_GIT_LFS=1 uv pip install --no-cache . \ && cd - \ diff --git a/container/Dockerfile.vllm b/container/Dockerfile.vllm index 0d4c5b3ce4..58d1d2adde 100644 --- a/container/Dockerfile.vllm +++ b/container/Dockerfile.vllm @@ -178,6 +178,7 @@ ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" ARG ARCH_ALT ARG PYTHON_VERSION +ARG ENABLE_KVBM ENV NIXL_PREFIX=/opt/nvidia/nvda_nixl ENV NIXL_LIB_DIR=$NIXL_PREFIX/lib/${ARCH_ALT}-linux-gnu ENV NIXL_PLUGIN_DIR=$NIXL_LIB_DIR/plugins @@ -253,9 +254,12 @@ COPY --from=framework ${VIRTUAL_ENV} ${VIRTUAL_ENV} COPY benchmarks/ /opt/dynamo/benchmarks/ COPY --from=dynamo_base /opt/dynamo/wheelhouse/ /opt/dynamo/wheelhouse/ RUN uv pip install \ - /opt/dynamo/wheelhouse/ai_dynamo_runtime*.whl \ - /opt/dynamo/wheelhouse/ai_dynamo*any.whl \ - /opt/dynamo/wheelhouse/nixl/nixl*.whl \ + /opt/dynamo/wheelhouse/ai_dynamo_runtime*.whl \ + /opt/dynamo/wheelhouse/ai_dynamo*any.whl \ + /opt/dynamo/wheelhouse/nixl/nixl*.whl \ + && if [ "${ENABLE_KVBM}" = "true" ]; then \ + uv pip install /opt/dynamo/wheelhouse/kvbm*.whl; \ + fi \ && cd /opt/dynamo/benchmarks \ && UV_GIT_LFS=1 uv pip install --no-cache . \ && cd - \ diff --git a/container/build.sh b/container/build.sh index bf9e483bfc..4d6659c7e0 100755 --- a/container/build.sh +++ b/container/build.sh @@ -780,7 +780,7 @@ if [[ $FRAMEWORK == "VLLM" ]] || [[ $FRAMEWORK == "TRTLLM" ]]; then fi if [ ! -z ${ENABLE_KVBM} ]; then - echo "Enabling the KVBM in the ai-dynamo-runtime" + echo "Enabling the KVBM in the dynamo image" BUILD_ARGS+=" --build-arg ENABLE_KVBM=${ENABLE_KVBM} " fi diff --git a/container/deps/trtllm/install_nixl.sh b/container/deps/trtllm/install_nixl.sh index b9fb752ab3..4bc31d2e3d 100755 --- a/container/deps/trtllm/install_nixl.sh +++ b/container/deps/trtllm/install_nixl.sh @@ -77,4 +77,4 @@ cd builddir && ninja install cd ../.. rm -rf nixl* # Remove NIXL source tree to save space -echo "export LD_LIBRARY_PATH=/opt/nvidia/nvda_nixl/lib/${ARCH_NAME}:/opt/nvidia/nvda_nixl/lib64:\$LD_LIBRARY_PATH" >> "${ENV}" \ No newline at end of file +echo "export LD_LIBRARY_PATH=/opt/nvidia/nvda_nixl/lib/${ARCH_NAME}:/opt/nvidia/nvda_nixl/lib64:\$LD_LIBRARY_PATH" >> "${ENV}" diff --git a/docs/kvbm/trtllm-setup.md b/docs/kvbm/trtllm-setup.md index 1b881847bb..0be42b9399 100644 --- a/docs/kvbm/trtllm-setup.md +++ b/docs/kvbm/trtllm-setup.md @@ -87,7 +87,7 @@ kv_cache_config: enable_partial_reuse: false free_gpu_memory_fraction: 0.80 kv_connector_config: - connector_module: dynamo.llm.trtllm_integration.connector + connector_module: kvbm.trtllm_integration.connector connector_scheduler_class: DynamoKVBMConnectorLeader connector_worker_class: DynamoKVBMConnectorWorker EOF diff --git a/docs/kvbm/vllm-setup.md b/docs/kvbm/vllm-setup.md index 7c07d75c44..93f7971c1e 100644 --- a/docs/kvbm/vllm-setup.md +++ b/docs/kvbm/vllm-setup.md @@ -101,7 +101,7 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" Alternatively, can use `vllm serve` directly to use KVBM for aggregated serving: ```bash -vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' Qwen/Qwen3-0.6B +vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "kvbm.vllm_integration.connector"}' Qwen/Qwen3-0.6B ``` ## Enable and View KVBM Metrics diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index b671326c54..839caf2d5e 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -358,7 +358,7 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ - "bindgen 0.69.5", + "bindgen", "cc", "cmake", "dunce", @@ -558,26 +558,6 @@ dependencies = [ "which", ] -[[package]] -name = "bindgen" -version = "0.71.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" -dependencies = [ - "bitflags 2.9.3", - "cexpr", - "clang-sys", - "itertools 0.13.0", - "log", - "prettyplease", - "proc-macro2", - "quote", - "regex", - "rustc-hash 2.1.1", - "shlex", - "syn 2.0.106", -] - [[package]] name = "bit-set" version = "0.5.3" @@ -1463,15 +1443,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "dynamo-kvbm-kernels" -version = "0.6.0" -dependencies = [ - "cc", - "cudarc", - "once_cell", -] - [[package]] name = "dynamo-llm" version = "0.6.1" @@ -1479,7 +1450,6 @@ dependencies = [ "ahash", "aho-corasick", "akin", - "aligned-vec", "anyhow", "async-nats", "async-stream", @@ -1495,13 +1465,11 @@ dependencies = [ "bytes", "candle-core", "chrono", - "cudarc", "dashmap", "derive-getters", "derive_builder", "dialoguer", "dynamo-async-openai", - "dynamo-kvbm-kernels", "dynamo-parsers", "dynamo-runtime", "either", @@ -1518,9 +1486,6 @@ dependencies = [ "minijinja-contrib", "modelexpress-client", "modelexpress-common", - "ndarray", - "nix 0.26.4", - "nixl-sys", "offset-allocator", "oneshot", "parking_lot", @@ -1639,7 +1604,7 @@ dependencies = [ "local-ip-address", "log", "nid", - "nix 0.29.0", + "nix", "nuid", "once_cell", "opentelemetry", @@ -2988,15 +2953,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -3344,16 +3300,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" -[[package]] -name = "matrixmultiply" -version = "0.3.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06de3016e9fae57a36fd14dba131fccf49f74b40b7fbdb472f96e361ec71a08" -dependencies = [ - "autocfg", - "rawpointer", -] - [[package]] name = "maybe-rayon" version = "0.1.1" @@ -3386,15 +3332,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d1115007560874e373613744c6fba374c17688327a71c1476d1a5954cc857b" -[[package]] -name = "memoffset" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.9.1" @@ -3572,21 +3509,6 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" -[[package]] -name = "ndarray" -version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "882ed72dce9365842bf196bdeedf5055305f11fc8c03dee7bb0194a6cad34841" -dependencies = [ - "matrixmultiply", - "num-complex", - "num-integer", - "num-traits", - "portable-atomic", - "portable-atomic-util", - "rawpointer", -] - [[package]] name = "neli" version = "0.6.5" @@ -3640,19 +3562,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "nix" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" -dependencies = [ - "bitflags 1.3.2", - "cfg-if 1.0.3", - "libc", - "memoffset 0.7.1", - "pin-utils", -] - [[package]] name = "nix" version = "0.29.0" @@ -3665,22 +3574,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nixl-sys" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a73b92494c94b2ff2d004cd9274d966863089e867dc9cd98bc640aefe7622036" -dependencies = [ - "bindgen 0.71.1", - "cc", - "libc", - "os_info", - "pkg-config", - "serde", - "thiserror 2.0.16", - "tracing", -] - [[package]] name = "nkeys" version = "0.4.5" @@ -4057,18 +3950,6 @@ dependencies = [ "hashbrown 0.14.5", ] -[[package]] -name = "os_info" -version = "3.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" -dependencies = [ - "log", - "plist", - "serde", - "windows-sys 0.52.0", -] - [[package]] name = "overload" version = "0.1.1" @@ -4294,19 +4175,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plist" -version = "1.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3af6b589e163c5a788fab00ce0c0366f6efbb9959c2f9874b224936af7fce7e1" -dependencies = [ - "base64 0.22.1", - "indexmap 2.11.0", - "quick-xml", - "serde", - "time", -] - [[package]] name = "png" version = "0.17.16" @@ -4586,7 +4454,7 @@ dependencies = [ "cfg-if 1.0.3", "indoc", "libc", - "memoffset 0.9.1", + "memoffset", "once_cell", "portable-atomic", "pyo3-build-config", @@ -4693,15 +4561,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" -[[package]] -name = "quick-xml" -version = "0.38.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" -dependencies = [ - "memchr", -] - [[package]] name = "quinn" version = "0.11.9" @@ -4909,12 +4768,6 @@ dependencies = [ "bitflags 2.9.3", ] -[[package]] -name = "rawpointer" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" - [[package]] name = "rayon" version = "1.11.0" diff --git a/lib/bindings/python/Cargo.toml b/lib/bindings/python/Cargo.toml index d542af8c9b..10f4745793 100644 --- a/lib/bindings/python/Cargo.toml +++ b/lib/bindings/python/Cargo.toml @@ -23,7 +23,6 @@ crate-type = ["cdylib", "rlib"] [features] default = [] -block-manager = ["dynamo-llm/block-manager", "dep:dlpark", "dep:cudarc"] [dependencies] dynamo-llm = { path = "../../llm" } diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index fa89843a2b..1ff77a78be 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -6,15 +6,20 @@ use futures::StreamExt; use once_cell::sync::OnceCell; use pyo3::IntoPyObjectExt; use pyo3::exceptions::PyStopAsyncIteration; +use pyo3::types::PyCapsule; use pyo3::types::{PyDict, PyString}; use pyo3::{exceptions::PyException, prelude::*}; use rand::seq::IteratorRandom as _; use rs::pipeline::network::Ingress; +use std::ffi::CString; use std::fs; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::time::Duration; -use std::{fmt::Display, sync::Arc}; +use std::{ + fmt::Display, + sync::{Arc, Weak}, +}; use tokio::sync::Mutex; use tracing::Instrument; @@ -191,9 +196,6 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { prometheus_metrics::add_to_module(&prometheus_metrics)?; m.add_submodule(&prometheus_metrics)?; - #[cfg(feature = "block-manager")] - llm::block_manager::add_to_module(m)?; - Ok(()) } @@ -629,6 +631,21 @@ impl DistributedRuntime { let inner = self.inner.runtime().child_token(); CancellationToken { inner } } + + // This is used to pass the DistributedRuntime from the dynamo-runtime bindings + // to the KVBM bindings, since KVBM cannot directly use the struct from this cdylib. + // TODO: Create a separate crate "dynamo-python" so that all binding crates can import + // from it and share the same crate path. This will allow PyO3 to automatically + // recognize that both bindings use the same PyClass. + #[pyo3(name = "to_capsule")] + fn to_capsule<'py>(&self, py: Python<'py>) -> PyResult> { + let arc: Arc = Arc::new(self.inner.clone()); + let weak: Weak = Arc::downgrade(&arc); + + let name = CString::new("dynamo.runtime.weak").expect("valid capsule name"); + + PyCapsule::new(py, weak, Some(name)) + } } // Bind a TCP port and return a socket held until dropped. diff --git a/lib/bindings/python/rust/llm.rs b/lib/bindings/python/rust/llm.rs index 759fec00cf..46988a2ec3 100644 --- a/lib/bindings/python/rust/llm.rs +++ b/lib/bindings/python/rust/llm.rs @@ -33,6 +33,3 @@ pub mod kv; pub mod local_model; pub mod model_card; pub mod preprocessor; - -#[cfg(feature = "block-manager")] -pub mod block_manager; diff --git a/lib/bindings/python/src/dynamo/llm/__init__.py b/lib/bindings/python/src/dynamo/llm/__init__.py index 4a57ae29ad..d507315515 100644 --- a/lib/bindings/python/src/dynamo/llm/__init__.py +++ b/lib/bindings/python/src/dynamo/llm/__init__.py @@ -5,13 +5,6 @@ import logging -try: - from dynamo._core import BlockManager as BlockManager - from dynamo._core import KvbmLeader as KvbmLeader - from dynamo._core import KvbmWorker as KvbmWorker -except ImportError: - pass # BlockManager is not enabled by default - from dynamo._core import ApproxKvIndexer as ApproxKvIndexer from dynamo._core import DisaggregatedRouter as DisaggregatedRouter from dynamo._core import EngineType diff --git a/lib/kvbm/Cargo.toml b/lib/kvbm/Cargo.toml new file mode 100644 index 0000000000..23d85e8487 --- /dev/null +++ b/lib/kvbm/Cargo.toml @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + + +[package] +name = "kvbm-py3" +version = "0.1.0" +edition = "2024" +authors = ["NVIDIA"] +license = "Apache-2.0" +homepage = "https://github.com/ai-dynamo/dynamo" +repository = "https://github.com/ai-dynamo/dynamo.git" + +[lib] +path = "src/lib.rs" +name = "_core" +# "cdylib" is necessary to produce a shared library for Python to import from. +# "rlib" is necessary to support doctests. +crate-type = ["cdylib", "rlib"] + +[features] +default = ["block-manager"] +block-manager = ["dynamo-llm/block-manager", "dep:dlpark", "dep:cudarc"] + +[dependencies] +dynamo-llm = { path = "../llm" } +dynamo-runtime = { path = "../runtime" } + +anyhow = { version = "1" } +async-stream = { version = "0.3" } +async-trait = { version = "0.1" } +derive-getters = "0.5" +either = { version = "1.13", features = ["serde"] } +futures = { version = "0.3" } +local-ip-address = { version = "0.6" } +once_cell = { version = "1.20.3" } +rand = { version = "0.9" } +socket2 = { version = "0.6" } +serde = { version = "1" } +serde_json = { version = "1.0.138" } +thiserror = { version = "2.0" } +tokio = { version = "1.46.0", features = ["full"] } +tokio-stream = { version = "0" } +tokio-util = { version = "0.7", features = ["rt"] } +tracing = { version = "0" } +tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } +uuid = { version = "1.17", features = ["v4", "serde"] } + +# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so) +# "abi3-py310" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.10 +pyo3 = { version = "0.23.4", default-features = false, features = [ + "macros", + "experimental-async", + "experimental-inspect", + "extension-module", + "py-clone", + "abi3-py310", +] } + +pyo3-async-runtimes = { version = "0.23.0", default-features = false, features = [ + "attributes", + "testing", + "tokio-runtime", + "unstable-streams", +] } + +pythonize = "0.23" + +dlpark = { version = "0.5", features = ["pyo3", "half"], optional = true } +cudarc = { version = "0.16.2", features = ["cuda-12020"], optional = true } +prometheus = "0.14.0" + +[dev-dependencies] +rstest = "0.25" diff --git a/lib/kvbm/LICENSE b/lib/kvbm/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/lib/kvbm/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/kvbm/README.md b/lib/kvbm/README.md new file mode 100644 index 0000000000..42d8c5523c --- /dev/null +++ b/lib/kvbm/README.md @@ -0,0 +1,117 @@ + + +# Dynamo KVBM + +The Dynamo KVBM is a distributed KV-cache block management system designed for scalable LLM inference. It cleanly separates memory management from inference runtimes (vLLM, TensorRT-LLM, and SGLang), enabling GPU↔CPU↔Disk/Remote tiering, asynchronous block offload/onboard, and efficient block reuse. + +![A block diagram showing a layered architecture view of Dynamo KV Block manager.](../../docs/images/kvbm-architecture.png) + + +## Feature Highlights + +- **Distributed KV-Cache Management:** Unified GPU↔CPU↔Disk↔Remote tiering for scalable LLM inference. +- **Async Offload & Reuse:** Seamlessly move KV blocks between memory tiers using GDS-accelerated transfers powered by NIXL, without recomputation. +- **Runtime-Agnostic:** Works out-of-the-box with vLLM, TensorRT-LLM, and SGLang via lightweight connectors. +- **Memory-Safe & Modular:** RAII lifecycle and pluggable design for reliability, portability, and backend extensibility. + +## Build and Installation + +The pip wheel is built through a Docker build process: + +```bash +# Build the Docker image with KVBM enabled (from the dynamo repo root) +./container/build.sh --framework none --enable-kvbm --tag local-kvbm +``` + +Once built, you can either: + +**Option 1: Run and use the container directly** +```bash +./container/run.sh --framework none -it +``` + +**Option 2: Extract the wheel file to your local filesystem** +```bash +# Create a temporary container from the built image +docker create --name temp-kvbm-container local-kvbm:latest + +# Copy the KVBM wheel to your current directory +docker cp temp-kvbm-container:/opt/dynamo/wheelhouse/ ./dynamo_wheelhouse + +# Clean up the temporary container +docker rm temp-kvbm-container + +# Install the wheel locally +pip install ./kvbm*.whl +``` + +Note that the default pip wheel built is not compatible with CUDA 13 at the moment. + + +## Integrations + +### Environment Variables + +| Variable | Description | Default | +|-----------|--------------|----------| +| `DYN_KVBM_CPU_CACHE_GB` | CPU pinned memory cache size (GB) | required | +| `DYN_KVBM_DISK_CACHE_GB` | SSD Disk/Storage system cache size (GB) | optional | +| `DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS` | Timeout (in seconds) for the KVBM leader and worker to synchronize and allocate the required memory and storage. Increase this value if allocating large amounts of memory or storage. | 120 | +| `DYN_KVBM_METRICS` | Enable metrics endpoint | `false` | +| `DYN_KVBM_METRICS_PORT` | Metrics port | `6880` | +| `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER` | Disable disk offload filtering to remove SSD lifespan protection | `false` | + +### vLLM + +```bash +DYN_KVBM_CPU_CACHE_GB=100 vllm serve \ + --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both","kv_connector_module_path":"kvbm.vllm_integration.connector"}' \ + Qwen/Qwen3-8B +``` + +For more detailed integration with dynamo, disaggregated serving support and benchmarking, please check [vllm-setup](../../docs/kvbm/vllm-setup.md) + +### TensorRT-LLM + +```bash +cat >/tmp/kvbm_llm_api_config.yaml <=1.0,<2.0", "patchelf"] +build-backend = "maturin" + +[tool.uv] +config-settings = { build-args = '--auditwheel repair --manylinux' } diff --git a/lib/kvbm/python/kvbm/__init__.py b/lib/kvbm/python/kvbm/__init__.py new file mode 100644 index 0000000000..ae0359ceac --- /dev/null +++ b/lib/kvbm/python/kvbm/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# flake8: noqa + +from kvbm._core import BlockManager as BlockManager +from kvbm._core import KvbmLeader as KvbmLeader +from kvbm._core import KvbmWorker as KvbmWorker diff --git a/lib/kvbm/python/kvbm/_core.pyi b/lib/kvbm/python/kvbm/_core.pyi new file mode 100644 index 0000000000..192a76e4cc --- /dev/null +++ b/lib/kvbm/python/kvbm/_core.pyi @@ -0,0 +1,231 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, List, Optional + +class Layer: + """ + A KV cache block layer + """ + + ... + + def __dlpack__(self, stream: Optional[Any] = None, max_version: Optional[Any] = None, dl_device: Optional[Any] = None, copy: Optional[bool] = None) -> Any: + """ + Get a dlpack capsule of the layer + """ + ... + + def __dlpack_device__(self) -> Any: + """ + Get the dlpack device of the layer + """ + ... + +class Block: + """ + A KV cache block + """ + + ... + + def __len__(self) -> int: + """ + Get the number of layers in the list + """ + ... + + def __getitem__(self, index: int) -> Layer: + """ + Get a layer by index + """ + ... + + def __iter__(self) -> 'Block': + """ + Get an iterator over the layers + """ + ... + + def __next__(self) -> Block: + """ + Get the next layer in the iterator + """ + ... + + def to_list(self) -> List[Layer]: + """ + Get a list of layers + """ + ... + + def __dlpack__(self, stream: Optional[Any] = None, max_version: Optional[Any] = None, dl_device: Optional[Any] = None, copy: Optional[bool] = None) -> Any: + """ + Get a dlpack capsule of the block + Exception raised if the block is not contiguous + """ + ... + + def __dlpack_device__(self) -> Any: + """ + Get the dlpack device of the block + """ + ... + +class BlockList: + """ + A list of KV cache blocks + """ + + ... + + def __len__(self) -> int: + """ + Get the number of blocks in the list + """ + ... + + def __getitem__(self, index: int) -> Block: + """ + Get a block by index + """ + ... + + def __iter__(self) -> 'BlockList': + """ + Get an iterator over the blocks + """ + ... + + def __next__(self) -> Block: + """ + Get the next block in the iterator + """ + ... + + def to_list(self) -> List[Block]: + """ + Get a list of blocks + """ + ... + +class BlockManager: + """ + A KV cache block manager + """ + + def __init__( + self, + worker_id: int, + num_layer: int, + page_size: int, + inner_dim: int, + dtype: Optional[str] = None, + host_num_blocks: Optional[int] = None, + device_num_blocks: Optional[int] = None, + device_id: int = 0 + ) -> None: + """ + Create a `BlockManager` object + + Parameters: + ----------- + worker_id: int + The worker ID for this block manager + num_layer: int + Number of layers in the model + page_size: int + Page size for blocks + inner_dim: int + Inner dimension size + dtype: Optional[str] + Data type (e.g., 'fp16', 'bf16', 'fp32'), defaults to 'fp16' if None + host_num_blocks: Optional[int] + Number of host blocks to allocate, None means no host blocks + device_num_blocks: Optional[int] + Number of device blocks to allocate, None means no device blocks + device_id: int + CUDA device ID, defaults to 0 + """ + ... + + def allocate_host_blocks_blocking(self, count: int) -> BlockList: + """ + Allocate a list of host blocks (blocking call) + + Parameters: + ----------- + count: int + Number of blocks to allocate + + Returns: + -------- + BlockList + List of allocated blocks + """ + ... + + async def allocate_host_blocks(self, count: int) -> BlockList: + """ + Allocate a list of host blocks + + Parameters: + ----------- + count: int + Number of blocks to allocate + + Returns: + -------- + BlockList + List of allocated blocks + """ + ... + + def allocate_device_blocks_blocking(self, count: int) -> BlockList: + """ + Allocate a list of device blocks (blocking call) + + Parameters: + ----------- + count: int + Number of blocks to allocate + + Returns: + -------- + BlockList + List of allocated blocks + """ + ... + + async def allocate_device_blocks(self, count: int) -> BlockList: + """ + Allocate a list of device blocks + + Parameters: + ----------- + count: int + Number of blocks to allocate + + Returns: + -------- + BlockList + List of allocated blocks + """ + ... + +class KvbmCacheManager: + """ + A KV cache manager for VLLM + """ + + def __init__(self, block_manager: BlockManager) -> None: + ... + + +class KvbmRequest: + """ + A request for KV cache + """ + + def __init__(self, request_id: int, tokens: List[int], block_size: int) -> None: + ... diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/__init__.py b/lib/kvbm/python/kvbm/trtllm_integration/__init__.py similarity index 100% rename from lib/bindings/python/src/dynamo/llm/trtllm_integration/__init__.py rename to lib/kvbm/python/kvbm/trtllm_integration/__init__.py diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/__init__.py b/lib/kvbm/python/kvbm/trtllm_integration/connector/__init__.py similarity index 100% rename from lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/__init__.py rename to lib/kvbm/python/kvbm/trtllm_integration/connector/__init__.py diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py b/lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py similarity index 89% rename from lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py rename to lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py index af1db0107a..3a08127321 100644 --- a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py +++ b/lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py @@ -2,8 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 -from typing import List +from typing import List, Optional +from kvbm import KvbmLeader +from kvbm.trtllm_integration.rust import KvbmRequest +from kvbm.trtllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader +from kvbm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput +from kvbm.utils import is_dyn_runtime_enabled from tensorrt_llm._torch.pyexecutor.kv_cache_connector import ( KvCacheConnectorScheduler, SchedulerOutput, @@ -11,19 +16,20 @@ from tensorrt_llm.bindings.internal.batch_manager import LlmRequest from tensorrt_llm.llmapi.llm_args import TorchLlmArgs -from dynamo.llm import KvbmLeader -from dynamo.llm.trtllm_integration.rust import KvbmRequest -from dynamo.llm.trtllm_integration.rust import ( - KvConnectorLeader as RustKvConnectorLeader, -) -from dynamo.llm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput -from dynamo.runtime import DistributedRuntime +DistributedRuntime = None +if is_dyn_runtime_enabled(): + from dynamo.runtime import DistributedRuntime class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): def __init__(self, llm_args: TorchLlmArgs): super().__init__(llm_args) - self.drt = DistributedRuntime.detached() + + drt: Optional[object] = None + if is_dyn_runtime_enabled(): + drt = DistributedRuntime.detached() + + self.drt = drt mappings = self._llm_args.parallel_config.to_mapping() diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py b/lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_worker.py similarity index 91% rename from lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py rename to lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_worker.py index af09810cd4..1869732d18 100644 --- a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py +++ b/lib/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_worker.py @@ -1,15 +1,21 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from typing import Optional + +# Keeping this import is important because it runs the code in nixl’s __init__.py +# to set up the Nixl plugin path. +import nixl # noqa: F401 import torch +from kvbm.trtllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker +from kvbm.utils import is_dyn_runtime_enabled from tensorrt_llm import logger from tensorrt_llm._torch.pyexecutor.kv_cache_connector import KvCacheConnectorWorker from tensorrt_llm.llmapi.llm_args import TorchLlmArgs -from dynamo.llm.trtllm_integration.rust import ( - KvConnectorWorker as RustKvConnectorWorker, -) -from dynamo.runtime import DistributedRuntime +DistributedRuntime = None +if is_dyn_runtime_enabled(): + from dynamo.runtime import DistributedRuntime class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): @@ -31,7 +37,11 @@ def callback(): def __init__(self, llm_args: TorchLlmArgs): super().__init__(llm_args) - self.drt = DistributedRuntime.detached() + drt: Optional[object] = None + if is_dyn_runtime_enabled(): + drt = DistributedRuntime.detached() + + self.drt = drt mappings = self._llm_args.parallel_config.to_mapping() self.rank = mappings.rank diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/rust.py b/lib/kvbm/python/kvbm/trtllm_integration/rust.py similarity index 96% rename from lib/bindings/python/src/dynamo/llm/trtllm_integration/rust.py rename to lib/kvbm/python/kvbm/trtllm_integration/rust.py index cd486e8527..464c7f51d9 100644 --- a/lib/bindings/python/src/dynamo/llm/trtllm_integration/rust.py +++ b/lib/kvbm/python/kvbm/trtllm_integration/rust.py @@ -7,7 +7,7 @@ try: # TODO: use TRTLLM own integration module - from dynamo._core import _vllm_integration + from kvbm._core import _vllm_integration # Runtime - dynamically loaded classes from Rust extension KvbmRequest = getattr(_vllm_integration, "KvbmRequest") diff --git a/lib/kvbm/python/kvbm/utils.py b/lib/kvbm/python/kvbm/utils.py new file mode 100644 index 0000000000..39ee12ccb6 --- /dev/null +++ b/lib/kvbm/python/kvbm/utils.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + + +import os + + +def is_dyn_runtime_enabled() -> bool: + """ + Return True if DYN_RUNTIME_ENABLED_KVBM is set to '1' or 'true' (case-insensitive). + DYN_RUNTIME_ENABLED_KVBM indicates if KVBM should use the existing DistributedRuntime + in the current environment. + + WRN: Calling DistributedRuntime.detached() can crash the entire process if + dependencies are not satisfied, and it cannot be caught with try/except in Python. + TODO: Make DistributedRuntime.detached() raise a catchable Python exception and + avoid crashing the process. + """ + val = os.environ.get("DYN_RUNTIME_ENABLED_KVBM", "").strip().lower() + return val in {"1", "true"} diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/__init__.py b/lib/kvbm/python/kvbm/vllm_integration/__init__.py similarity index 100% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/__init__.py rename to lib/kvbm/python/kvbm/vllm_integration/__init__.py diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector/__init__.py b/lib/kvbm/python/kvbm/vllm_integration/connector/__init__.py similarity index 100% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/connector/__init__.py rename to lib/kvbm/python/kvbm/vllm_integration/connector/__init__.py diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector/dynamo_connector.py b/lib/kvbm/python/kvbm/vllm_integration/connector/dynamo_connector.py similarity index 94% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/connector/dynamo_connector.py rename to lib/kvbm/python/kvbm/vllm_integration/connector/dynamo_connector.py index e54dc7261d..8d06db7055 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector/dynamo_connector.py +++ b/lib/kvbm/python/kvbm/vllm_integration/connector/dynamo_connector.py @@ -26,9 +26,9 @@ from vllm.v1.request import Request -# from dynamo.llm.vllm_integration.kv_cache_utils import KvbmCacheBlocks -from dynamo.llm.vllm_integration.connector_leader import KvConnectorLeader -from dynamo.llm.vllm_integration.connector_worker import KvConnectorWorker +# from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks +from kvbm.vllm_integration.connector_leader import KvConnectorLeader +from kvbm.vllm_integration.connector_worker import KvConnectorWorker EngineId = str diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector/pd_connector.py b/lib/kvbm/python/kvbm/vllm_integration/connector/pd_connector.py similarity index 98% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/connector/pd_connector.py rename to lib/kvbm/python/kvbm/vllm_integration/connector/pd_connector.py index ab8ed67f3c..0fd4e69646 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector/pd_connector.py +++ b/lib/kvbm/python/kvbm/vllm_integration/connector/pd_connector.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING +from kvbm.vllm_integration.connector.dynamo_connector import DynamoConnector from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorRole from vllm.distributed.kv_transfer.kv_connector.v1.lmcache_connector import ( LMCacheConnectorV1, @@ -14,8 +15,6 @@ from vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector import NixlConnector from vllm.v1.core.sched.output import SchedulerOutput -from dynamo.llm.vllm_integration.connector.dynamo_connector import DynamoConnector - if TYPE_CHECKING: from vllm.config import VllmConfig from vllm.v1.core.kv_cache_manager import KVCacheBlocks diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py b/lib/kvbm/python/kvbm/vllm_integration/connector_leader.py similarity index 90% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py rename to lib/kvbm/python/kvbm/vllm_integration/connector_leader.py index f98775427f..3c32f92a1c 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py +++ b/lib/kvbm/python/kvbm/vllm_integration/connector_leader.py @@ -20,19 +20,23 @@ from vllm.v1.request import Request -# from dynamo.llm.vllm_integration.kv_cache_utils import KvbmCacheBlocks -# from dynamo.llm.vllm_integration.rust import BlockManager, KvbmRequest -# from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader -# from dynamo.llm.vllm_integration.rust import ( +# from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks +# from kvbm.vllm_integration.rust import BlockManager, KvbmRequest +# from kvbm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader +# from kvbm.vllm_integration.rust import ( # KvConnectorMetadata as RustKvConnectorMetadata, # ) -# from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput +# from kvbm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput -from dynamo.llm import KvbmLeader -from dynamo.llm.vllm_integration.rust import KvbmRequest -from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader -from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput -from dynamo.runtime import DistributedRuntime +from kvbm import KvbmLeader +from kvbm.utils import is_dyn_runtime_enabled +from kvbm.vllm_integration.rust import KvbmRequest +from kvbm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader +from kvbm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput + +DistributedRuntime = None +if is_dyn_runtime_enabled(): + from dynamo.runtime import DistributedRuntime class DynamoConnectorMetadata(KVConnectorMetadata): @@ -51,12 +55,14 @@ class KvConnectorLeader: """ def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): - drt = kwargs.get("drt", None) - if drt is None: - self.drt = DistributedRuntime.detached() + drt: Optional[object] = kwargs.get("drt") + + if drt is None and is_dyn_runtime_enabled(): + drt = DistributedRuntime.detached() else: - self.drt = drt + drt = None + self.drt = drt self.vllm_config = vllm_config world_size = vllm_config.parallel_config.world_size diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py b/lib/kvbm/python/kvbm/vllm_integration/connector_worker.py similarity index 89% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py rename to lib/kvbm/python/kvbm/vllm_integration/connector_worker.py index 13560ecbcb..052ae77b45 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py +++ b/lib/kvbm/python/kvbm/vllm_integration/connector_worker.py @@ -9,7 +9,11 @@ from typing import TYPE_CHECKING, Optional +# Keeping this import is important because it runs the code in nixl’s __init__.py +# to set up the Nixl plugin path when there is no pre-defined NIXL_PLUGIN_DIR +import nixl # noqa: F401 import torch +from kvbm.utils import is_dyn_runtime_enabled from vllm.config import VllmConfig from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorMetadata from vllm.model_executor.models.utils import extract_layer_index @@ -21,15 +25,18 @@ from vllm.forward_context import ForwardContext -# from dynamo.llm.vllm_integration.kv_cache_utils import KvbmCacheBlocks -# from dynamo.llm.vllm_integration.rust import BlockManager -# from dynamo.llm.vllm_integration.rust import ( +# from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks +# from kvbm.vllm_integration.rust import BlockManager +# from kvbm.vllm_integration.rust import ( # KvConnectorMetadata as RustKvConnectorMetadata, # KvConnectorWorker as RustKvConnectorWorker, # ) -from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker -from dynamo.runtime import DistributedRuntime +from kvbm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker + +DistributedRuntime = None +if is_dyn_runtime_enabled(): + from dynamo.runtime import DistributedRuntime class DynamoConnectorMetadata(KVConnectorMetadata): @@ -40,11 +47,14 @@ def __init__(self, metadata: bytes): class KvConnectorWorker: def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): - drt = kwargs.get("drt", None) - if drt is None: - self.drt = DistributedRuntime.detached() + drt: Optional[object] = kwargs.get("drt") + + if drt is None and is_dyn_runtime_enabled(): + drt = DistributedRuntime.detached() else: - self.drt = drt + drt = None + + self.drt = drt self.vllm_config = vllm_config self._connector = RustKvConnectorWorker(self.drt, engine_id) diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/consolidator_config.py b/lib/kvbm/python/kvbm/vllm_integration/consolidator_config.py similarity index 100% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/consolidator_config.py rename to lib/kvbm/python/kvbm/vllm_integration/consolidator_config.py diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_manager.py b/lib/kvbm/python/kvbm/vllm_integration/kv_cache_manager.py similarity index 98% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_manager.py rename to lib/kvbm/python/kvbm/vllm_integration/kv_cache_manager.py index 3c4e79cb8c..8e616a5937 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_manager.py +++ b/lib/kvbm/python/kvbm/vllm_integration/kv_cache_manager.py @@ -26,10 +26,10 @@ from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.request import Request -from dynamo.llm.vllm_integration.kv_cache_utils import KvbmCacheBlocks -from dynamo.llm.vllm_integration.rust import BlockManager -from dynamo.llm.vllm_integration.rust import KvbmCacheManager as RustKvbmCacheManager -from dynamo.llm.vllm_integration.rust import KvbmRequest, SlotUpdate +from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks +from kvbm.vllm_integration.rust import BlockManager +from kvbm.vllm_integration.rust import KvbmCacheManager as RustKvbmCacheManager +from kvbm.vllm_integration.rust import KvbmRequest, SlotUpdate class KvbmCacheManager(KVConnectorBase_V1): diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py b/lib/kvbm/python/kvbm/vllm_integration/kv_cache_utils.py similarity index 96% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py rename to lib/kvbm/python/kvbm/vllm_integration/kv_cache_utils.py index 5acb8b33f9..63cb5298ff 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py +++ b/lib/kvbm/python/kvbm/vllm_integration/kv_cache_utils.py @@ -9,11 +9,10 @@ from typing import List +from kvbm.vllm_integration.rust import BlockState, BlockStates, KvbmBlockList from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.core.kv_cache_utils import KVCacheBlock -from dynamo.llm.vllm_integration.rust import BlockState, BlockStates, KvbmBlockList - # from vllm.logger import init_logger # logger = init_logger(__name__) diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py b/lib/kvbm/python/kvbm/vllm_integration/rust.py similarity index 94% rename from lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py rename to lib/kvbm/python/kvbm/vllm_integration/rust.py index 2b985399de..d0727d0c5d 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py +++ b/lib/kvbm/python/kvbm/vllm_integration/rust.py @@ -6,7 +6,7 @@ """ try: - from dynamo._core import _vllm_integration + from kvbm._core import _vllm_integration # Runtime - dynamically loaded classes from Rust extension KvbmCacheManager = getattr(_vllm_integration, "KvbmCacheManager") @@ -20,7 +20,7 @@ KvConnectorLeader = getattr(_vllm_integration, "PyKvConnectorLeader") SchedulerOutput = getattr(_vllm_integration, "SchedulerOutput") - from dynamo.llm import BlockManager + from kvbm import BlockManager except ImportError: print("Failed to import Dynamo KVBM. vLLM integration will not be available.") diff --git a/lib/bindings/python/rust/llm/block_manager.rs b/lib/kvbm/src/block_manager.rs similarity index 94% rename from lib/bindings/python/rust/llm/block_manager.rs rename to lib/kvbm/src/block_manager.rs index d6a047cbbd..37dd906d2f 100644 --- a/lib/bindings/python/rust/llm/block_manager.rs +++ b/lib/kvbm/src/block_manager.rs @@ -9,39 +9,11 @@ use dynamo_llm::block_manager::block::{ use dynamo_llm::block_manager::kv_consolidator::KvEventConsolidatorConfig; use dynamo_llm::block_manager::offload::filter::FrequencyFilter; use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy}; - +use dynamo_runtime::DistributedRuntime; use pyo3::PyResult; use std::time::Duration; use tokio_util::sync::CancellationToken; -/// Creates a disk offload filter based on environment configuration. -/// Returns `Ok(None)` if the filter is disabled via `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER`, -/// otherwise constructs a `FrequencyFilter` with standard parameters. -fn create_disk_offload_filter( - cancel_token: &CancellationToken, - runtime: &tokio::runtime::Handle, -) -> Result>> { - // Check if disk offload filter is disabled via environment variable - let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER") - .map(|v| v == "true" || v == "1") - .unwrap_or(false); - - if disable_filter { - return Ok(None); - } - - // TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them. - let frequency_filter = FrequencyFilter::new( - 2, - Duration::from_secs(600), - 1_000_000, - cancel_token.child_token(), - runtime.clone(), - )?; - - Ok(Some(Arc::new(frequency_filter))) -} - mod controller; mod distributed; @@ -73,11 +45,39 @@ type VllmController = Arc< >, >; +/// Creates a disk offload filter based on environment configuration. +/// Returns `Ok(None)` if the filter is disabled via `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER`, +/// otherwise constructs a `FrequencyFilter` with standard parameters. +fn create_disk_offload_filter( + cancel_token: &CancellationToken, + runtime: &tokio::runtime::Handle, +) -> Result>> { + // Check if disk offload filter is disabled via environment variable + let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + + if disable_filter { + return Ok(None); + } + + // TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them. + let frequency_filter = FrequencyFilter::new( + 2, + Duration::from_secs(600), + 1_000_000, + cancel_token.child_token(), + runtime.clone(), + )?; + + Ok(Some(Arc::new(frequency_filter))) +} + #[pyclass] #[derive(Clone)] pub struct BlockManager { inner: VllmBlockManager, - drt: DistributedRuntime, + _drt: Option>, _controller: Option, } @@ -126,19 +126,17 @@ impl BlockManager { if leader.num_host_blocks() > 0 { tracing::info!("Using {} host blocks", leader.num_host_blocks()); - let mut host_layout_config = dynamo_llm::block_manager::KvManagerLayoutConfig::builder() .num_blocks(leader.num_host_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)); - if leader.num_disk_blocks() > 0 { - if let Some(filter) = - create_disk_offload_filter(&cancel_token, &rt.inner().runtime().primary()) + if leader.num_disk_blocks() > 0 + && let Some(filter) = + create_disk_offload_filter(&cancel_token, &get_current_tokio_handle()) .map_err(to_pyerr)? - { - host_layout_config = host_layout_config.offload_filter(Some(filter)); - } + { + host_layout_config = host_layout_config.offload_filter(Some(filter)); } config = config.host_layout(host_layout_config.build().map_err(to_pyerr)?); @@ -181,7 +179,7 @@ impl BlockManager { // ) }; - let rt = drt.inner().runtime().primary(); + let rt = get_current_tokio_handle(); let config = config.build().map_err(to_pyerr)?; Ok(BlockManager { @@ -197,7 +195,7 @@ impl BlockManager { .await }) .map_err(to_pyerr)?, - drt, + _drt: drt, _controller: None, }) } @@ -213,11 +211,7 @@ impl BlockManager { } let block_manager = self.inner.clone(); - let controller = self - .drt - .inner() - .runtime() - .primary() + let controller = get_current_tokio_handle() .block_on(controller::Controller::new( block_manager, component.inner.clone(), @@ -280,6 +274,7 @@ impl BlockManagerBuilder { self.disable_device_pool = yes; self } + pub fn kvbm_metrics( mut self, metrics: dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics, @@ -339,12 +334,11 @@ impl BlockManagerBuilder { .num_blocks(leader_inner.num_host_blocks()) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)); - if leader_inner.num_disk_blocks() > 0 { - if let Some(filter) = - create_disk_offload_filter(&cancel_token, &drt.inner().runtime().primary())? - { - host_layout_config = host_layout_config.offload_filter(Some(filter)); - } + if leader_inner.num_disk_blocks() > 0 + && let Some(filter) = + create_disk_offload_filter(&cancel_token, &get_current_tokio_handle())? + { + host_layout_config = host_layout_config.offload_filter(Some(filter)); } config = config.host_layout(host_layout_config.build()?); @@ -382,7 +376,7 @@ impl BlockManagerBuilder { Ok(BlockManager { inner, - drt, + _drt: drt, _controller: None, }) } diff --git a/lib/bindings/python/rust/llm/block_manager/block.rs b/lib/kvbm/src/block_manager/block.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/block.rs rename to lib/kvbm/src/block_manager/block.rs diff --git a/lib/bindings/python/rust/llm/block_manager/block_list.rs b/lib/kvbm/src/block_manager/block_list.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/block_list.rs rename to lib/kvbm/src/block_manager/block_list.rs diff --git a/lib/bindings/python/rust/llm/block_manager/controller.rs b/lib/kvbm/src/block_manager/controller.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/controller.rs rename to lib/kvbm/src/block_manager/controller.rs diff --git a/lib/bindings/python/rust/llm/block_manager/distributed.rs b/lib/kvbm/src/block_manager/distributed.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/distributed.rs rename to lib/kvbm/src/block_manager/distributed.rs diff --git a/lib/bindings/python/rust/llm/block_manager/distributed/leader.rs b/lib/kvbm/src/block_manager/distributed/leader.rs similarity index 86% rename from lib/bindings/python/rust/llm/block_manager/distributed/leader.rs rename to lib/kvbm/src/block_manager/distributed/leader.rs index 55de38485a..a68cd2e3b1 100644 --- a/lib/bindings/python/rust/llm/block_manager/distributed/leader.rs +++ b/lib/kvbm/src/block_manager/distributed/leader.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; + use derive_getters::Dissolve; use llm_rs::block_manager::distributed::{ KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig, @@ -57,7 +58,7 @@ fn get_leader_init_timeout_secs(override_key: &str) -> u64 { #[derive(Clone, Dissolve)] pub struct KvbmLeader { leader: Arc, - drt: DistributedRuntime, + drt: Option>, } impl KvbmLeader { @@ -69,8 +70,16 @@ impl KvbmLeader { #[pymethods] impl KvbmLeader { #[new] - #[pyo3(signature = (world_size, drt))] - fn new(world_size: usize, drt: DistributedRuntime) -> PyResult { + #[pyo3(signature = (world_size, drt=None))] + fn new(world_size: usize, drt: Option) -> PyResult { + let drt: Option> = Python::with_gil(|py| { + if let Some(obj) = drt { + extract_distributed_runtime_from_obj(py, obj) + } else { + Ok(None) + } + })?; + let leader_init_timeout_sec: u64 = get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS); @@ -86,7 +95,7 @@ impl KvbmLeader { config.sanity_check().map_err(to_pyerr)?; - let rt = drt.inner().runtime().primary(); + let rt = get_current_tokio_handle(); let leader = rt.block_on(async move { KvbmLeaderImpl::new(config).await.map_err(to_pyerr) })?; diff --git a/lib/bindings/python/rust/llm/block_manager/distributed/utils.rs b/lib/kvbm/src/block_manager/distributed/utils.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/distributed/utils.rs rename to lib/kvbm/src/block_manager/distributed/utils.rs diff --git a/lib/bindings/python/rust/llm/block_manager/distributed/worker.rs b/lib/kvbm/src/block_manager/distributed/worker.rs similarity index 93% rename from lib/bindings/python/rust/llm/block_manager/distributed/worker.rs rename to lib/kvbm/src/block_manager/distributed/worker.rs index c08223b387..2d9ca29ba8 100644 --- a/lib/bindings/python/rust/llm/block_manager/distributed/worker.rs +++ b/lib/kvbm/src/block_manager/distributed/worker.rs @@ -1,11 +1,10 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; - use super::*; use std::sync::Arc; +use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; use llm_rs::block_manager::distributed::{ BlockTransferHandler as RustBlockTransferHandler, KvbmWorker as KvbmWorkerImpl, @@ -132,7 +131,7 @@ impl BlockTransferHandler { #[derive(Clone)] pub struct KvbmWorker { inner: Arc>, - _drt: DistributedRuntime, + _drt: Option>, } impl KvbmWorker { @@ -151,19 +150,21 @@ impl KvbmWorker { tensors: Vec>, device_id: usize, dtype_width_bytes: usize, - drt: Option, + drt: Option, layout_blocking: bool, device_layout_type: Option, host_layout_type: Option, disk_layout_type: Option, ) -> PyResult { - let py_drt = drt.ok_or_else(|| { - pyo3::exceptions::PyValueError::new_err("DistributedRuntime (drt) must be provided") + let drt: Option> = Python::with_gil(|py| { + if let Some(obj) = drt { + extract_distributed_runtime_from_obj(py, obj) + } else { + Ok(None) + } })?; - // rusty drt - let drt = py_drt.inner.clone(); - let rt = drt.runtime().primary(); + let rt = get_current_tokio_handle(); let mut vllm_tensors: Vec> = Vec::with_capacity(tensors.len()); @@ -173,7 +174,7 @@ impl KvbmWorker { } let config = KvbmWorkerConfig::builder() - .drt(drt) + .cancel_token(get_current_cancel_token()) .num_device_blocks(num_device_blocks) .page_size(page_size) .tensors(vllm_tensors) @@ -208,7 +209,7 @@ impl KvbmWorker { Ok(Self { inner: Arc::new(Mutex::new(worker)), - _drt: py_drt, + _drt: drt, }) } } diff --git a/lib/bindings/python/rust/llm/block_manager/dlpack.rs b/lib/kvbm/src/block_manager/dlpack.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/dlpack.rs rename to lib/kvbm/src/block_manager/dlpack.rs diff --git a/lib/bindings/python/rust/llm/block_manager/layer.rs b/lib/kvbm/src/block_manager/layer.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/layer.rs rename to lib/kvbm/src/block_manager/layer.rs diff --git a/lib/bindings/python/rust/llm/block_manager/vllm.rs b/lib/kvbm/src/block_manager/vllm.rs similarity index 99% rename from lib/bindings/python/rust/llm/block_manager/vllm.rs rename to lib/kvbm/src/block_manager/vllm.rs index 524af640ae..1cc4430d23 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm.rs +++ b/lib/kvbm/src/block_manager/vllm.rs @@ -22,9 +22,9 @@ use dynamo_llm::{ tokens::{SaltHash, SequenceHash, TokenBlockSequence, Tokens}, }; -// use crate::llm::block_manager::BlockManager as PyBlockManager; -use crate::llm::block_manager::BlockManager as PyBlockManager; -use crate::llm::block_manager::VllmBlockManager; +// use crate::block_manager::BlockManager as PyBlockManager; +use crate::block_manager::BlockManager as PyBlockManager; +use crate::block_manager::VllmBlockManager; use crate::to_pyerr; diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/block_list.rs b/lib/kvbm/src/block_manager/vllm/block_list.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/block_list.rs rename to lib/kvbm/src/block_manager/vllm/block_list.rs diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector.rs b/lib/kvbm/src/block_manager/vllm/connector.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector.rs rename to lib/kvbm/src/block_manager/vllm/connector.rs diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/kvbm/src/block_manager/vllm/connector/leader.rs similarity index 98% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs rename to lib/kvbm/src/block_manager/vllm/connector/leader.rs index 876e79c8a2..363be2ace5 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader.rs @@ -6,15 +6,14 @@ pub mod slot; use super::*; use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry}; -use dynamo_runtime::DistributedRuntime; use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState}; -use crate::DistributedRuntime as PyDistributedRuntime; -use crate::llm::block_manager::BlockManagerBuilder; -use crate::llm::block_manager::{ +use crate::block_manager::BlockManagerBuilder; +use crate::block_manager::{ VllmBlockManager, distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest, vllm::connector::leader::slot::VllmConnectorSlot, }; +use crate::get_current_tokio_handle; use dynamo_llm::block_manager::{ BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage, @@ -89,7 +88,6 @@ pub struct KvConnectorLeader { impl KvConnectorLeader { fn new( worker_id: String, - drt: PyDistributedRuntime, page_size: usize, leader_py: PyKvbmLeader, consolidator_vllm_endpoint: Option, @@ -101,8 +99,7 @@ impl KvConnectorLeader { ); let leader = leader_py.get_inner().clone(); - let drt = drt.inner().clone(); - let handle: Handle = drt.runtime().primary(); + let handle: Handle = get_current_tokio_handle(); let kvbm_metrics = KvbmMetrics::new( &KvbmMetricsRegistry::default(), @@ -161,7 +158,6 @@ impl KvConnectorLeader { let sm = ConnectorSlotManager::new( block_manager.get_block_manager().clone(), leader.clone(), - drt.clone(), kvbm_metrics_clone.clone(), ); @@ -567,12 +563,14 @@ impl PyKvConnectorLeader { #[pyo3(signature = (worker_id, drt, page_size, leader, consolidator_vllm_endpoint=None, consolidator_output_endpoint=None))] pub fn new( worker_id: String, - drt: PyDistributedRuntime, + drt: Option, page_size: usize, leader: PyKvbmLeader, consolidator_vllm_endpoint: Option, consolidator_output_endpoint: Option, - ) -> Self { + ) -> PyResult { + let _ = &drt; // drt is currently un-used in leader + // Initialize logging for the vLLM connector dynamo_runtime::logging::init(); @@ -583,7 +581,6 @@ impl PyKvConnectorLeader { let connector_leader: Box = if enable_kvbm_record { Box::new(recorder::KvConnectorLeaderRecorder::new( worker_id, - drt, page_size, leader, consolidator_vllm_endpoint, @@ -592,14 +589,13 @@ impl PyKvConnectorLeader { } else { Box::new(KvConnectorLeader::new( worker_id, - drt, page_size, leader, consolidator_vllm_endpoint, consolidator_output_endpoint, )) }; - Self { connector_leader } + Ok(Self { connector_leader }) } fn get_num_new_matched_tokens( diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs b/lib/kvbm/src/block_manager/vllm/connector/leader/recorder.rs similarity index 97% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs rename to lib/kvbm/src/block_manager/vllm/connector/leader/recorder.rs index 59129ddac8..fedeab4330 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader/recorder.rs @@ -87,7 +87,6 @@ pub struct KvConnectorLeaderRecorder { impl KvConnectorLeaderRecorder { pub fn new( worker_id: String, - drt: PyDistributedRuntime, page_size: usize, leader_py: PyKvbmLeader, consolidator_vllm_endpoint: Option, @@ -99,8 +98,7 @@ impl KvConnectorLeaderRecorder { ); let leader = leader_py.get_inner().clone(); - let drt = drt.inner().clone(); - let handle: Handle = drt.runtime().primary(); + let handle: Handle = get_current_tokio_handle(); let kvbm_metrics = KvbmMetrics::new( &KvbmMetricsRegistry::default(), @@ -113,9 +111,7 @@ impl KvConnectorLeaderRecorder { let output_path = "/tmp/records.jsonl"; tracing::info!("recording events to {}", output_path); - let recorder = drt - .runtime() - .primary() + let recorder = get_current_tokio_handle() .block_on(async { Recorder::new(token, &output_path, None, None, None).await }) .unwrap(); @@ -123,8 +119,7 @@ impl KvConnectorLeaderRecorder { let recorder_tx = recorder.event_sender(); // todo(kvbm): make this a critical task - drt.runtime() - .primary() + get_current_tokio_handle() .spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx)); let slot_manager_cell = Arc::new(OnceLock::new()); @@ -172,7 +167,6 @@ impl KvConnectorLeaderRecorder { let sm = ConnectorSlotManager::new( block_manager.get_block_manager().clone(), leader.clone(), - drt.clone(), kvbm_metrics_clone.clone(), ); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs similarity index 98% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs rename to lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs index cf1c51117b..c633f74d7c 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs @@ -16,6 +16,8 @@ use dynamo_llm::{ use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use tokio_util::sync::CancellationToken; +use crate::get_current_cancel_token; + use super::*; #[derive(Debug, thiserror::Error)] @@ -191,7 +193,6 @@ impl ConnectorSlotManager { pub fn new( block_manager: VllmBlockManager, leader: Arc, - drt: DistributedRuntime, kvbm_metrics: KvbmMetrics, ) -> Self { tracing::debug!( @@ -202,15 +203,20 @@ impl ConnectorSlotManager { let (xfer_tx, xfer_rx) = mpsc::unbounded_channel(); let mut xfer_engine = LocalTransferEngine::new(block_manager.clone(), leader, xfer_rx); - let primary_token = drt.primary_token(); - let runtime_primary = drt.runtime().primary(); - - let drt_for_task = drt; + let primary_token = get_current_cancel_token(); + let primary_token_clone = primary_token.clone(); + let runtime_primary = get_current_tokio_handle(); + let runtime_primary_clone = runtime_primary.clone(); let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( |cancellation_token| async move { xfer_engine - .execute(cancellation_token, drt_for_task, kvbm_metrics) + .execute( + cancellation_token, + runtime_primary_clone, + primary_token_clone, + kvbm_metrics, + ) .await }, primary_token, @@ -1163,12 +1169,12 @@ impl LocalTransferEngine { async fn execute( &mut self, cancellation_token: CancellationToken, - drt: DistributedRuntime, + task_handle: Handle, + task_token: CancellationToken, kvbm_metrics: KvbmMetrics, ) -> anyhow::Result<()> { let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel(); let (offload_tx, mut offload_rx) = mpsc::unbounded_channel(); - let drt_clone = drt.clone(); // Clone resources needed for tasks let block_manager_offload = self.block_manager.clone(); @@ -1194,9 +1200,9 @@ impl LocalTransferEngine { } Ok(()) }, - drt.primary_token(), + task_token.clone(), "LocalOnboardTask", - &drt.runtime().primary(), + &task_handle, ) .unwrap(); let offload_task = CriticalTaskExecutionHandle::new_with_runtime( @@ -1219,9 +1225,9 @@ impl LocalTransferEngine { } Ok(()) }, - drt_clone.primary_token(), + task_token, "LocalOffloadTask", - &drt_clone.runtime().primary(), + &task_handle, ) .unwrap(); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs b/lib/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs similarity index 95% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs rename to lib/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs index b8a163c559..61faf382e6 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs @@ -3,15 +3,15 @@ use super::*; -use crate::DistributedRuntime as PyDistributedRuntime; -use crate::llm::block_manager::BlockManagerBuilder; -use crate::llm::block_manager::vllm::connector::leader::slot::{ +use crate::block_manager::BlockManagerBuilder; +use crate::block_manager::vllm::connector::leader::slot::{ ConnectorSlotManager, SlotManager, SlotState, }; -use crate::llm::block_manager::vllm::connector::leader::{ +use crate::block_manager::vllm::connector::leader::{ kvbm_metrics_endpoint_enabled, parse_kvbm_metrics_port, }; -use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; +use crate::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; +use crate::get_current_tokio_handle; use anyhow; use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry}; use std::collections::HashSet; @@ -63,20 +63,14 @@ pub struct KvConnectorLeader { } impl KvConnectorLeader { - fn new( - worker_id: u64, - drt: PyDistributedRuntime, - page_size: usize, - leader_py: PyKvbmLeader, - ) -> Self { + fn new(worker_id: u64, page_size: usize, leader_py: PyKvbmLeader) -> Self { tracing::info!( "KvConnectorLeader initialized with worker_id: {}", worker_id ); let leader = leader_py.get_inner().clone(); - let drt = drt.inner().clone(); - let handle: Handle = drt.runtime().primary(); + let handle: Handle = get_current_tokio_handle(); let kvbm_metrics = KvbmMetrics::new( &KvbmMetricsRegistry::default(), @@ -120,7 +114,6 @@ impl KvConnectorLeader { let sm = ConnectorSlotManager::new( block_manager.get_block_manager().clone(), leader.clone(), - drt.clone(), kvbm_metrics_clone.clone(), ); @@ -448,13 +441,14 @@ impl PyTrtllmKvConnectorLeader { #[pyo3(signature = (worker_id, drt, page_size, leader))] pub fn new( worker_id: u64, - drt: PyDistributedRuntime, + drt: Option, page_size: usize, leader: PyKvbmLeader, - ) -> Self { + ) -> PyResult { + let _ = &drt; // drt is currently un-used in leader let connector_leader: Box = - Box::new(KvConnectorLeader::new(worker_id, drt, page_size, leader)); - Self { connector_leader } + Box::new(KvConnectorLeader::new(worker_id, page_size, leader)); + Ok(Self { connector_leader }) } fn get_num_new_matched_tokens( diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs b/lib/kvbm/src/block_manager/vllm/connector/trtllm_worker.rs similarity index 92% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs rename to lib/kvbm/src/block_manager/vllm/connector/trtllm_worker.rs index ad85437f25..633eb73e1f 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/trtllm_worker.rs @@ -10,18 +10,18 @@ use std::collections::HashSet; use std::sync::{Arc, OnceLock}; use super::*; -use crate::llm::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; -use crate::llm::block_manager::vllm::connector::worker::event_sync_blocking; +use crate::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; +use crate::block_manager::vllm::connector::worker::event_sync_blocking; +use crate::{block_manager::distributed::VllmTensor, to_pyerr}; +use dynamo_runtime::DistributedRuntime; + use crate::{ - DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor, - to_pyerr, + extract_distributed_runtime_from_obj, get_current_cancel_token, get_current_tokio_handle, }; - use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::layout::LayoutType; use dynamo_llm::block_manager::storage::torch::TorchTensor; -use dynamo_runtime::DistributedRuntime; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; pub trait Worker: Send + Sync { @@ -51,7 +51,7 @@ pub trait Worker: Send + Sync { } pub struct KvConnectorWorker { - drt: DistributedRuntime, + _drt: Option>, kvbm_worker: OnceLock, connector: WorkerSchedulerClient, transfer_client: TransferSchedulerClient, @@ -74,18 +74,18 @@ pub struct KvConnectorWorker { } impl KvConnectorWorker { - fn new(py_drt: PyDistributedRuntime, trtllm_rank: String) -> anyhow::Result { - let drt = py_drt.inner.clone(); - let runtime = drt.runtime().primary(); + fn new(drt: Option>, trtllm_rank: String) -> anyhow::Result { + let runtime = get_current_tokio_handle(); - let (scheduler, worker_client, transfer_client) = Scheduler::new(drt.primary_token()); + let (scheduler, worker_client, transfer_client) = + Scheduler::new(get_current_cancel_token()); CriticalTaskExecutionHandle::new_with_runtime( move |_| { let mut scheduler = scheduler; async move { scheduler.run().await } }, - drt.primary_token(), + get_current_cancel_token(), "kv-connector-scheduler-task", &runtime, )? @@ -97,7 +97,7 @@ impl KvConnectorWorker { ); Ok(Self { - drt, + _drt: drt, kvbm_worker: OnceLock::new(), connector: worker_client, transfer_client, @@ -131,7 +131,7 @@ impl Worker for KvConnectorWorker { let kv_cache_tensors = vec![kv_cache_tensor as Arc]; let config = KvbmWorkerConfig::builder() - .drt(self.drt.clone()) + .cancel_token(get_current_cancel_token()) .num_device_blocks(num_device_blocks) .page_size(page_size) .tensors(kv_cache_tensors) @@ -147,7 +147,7 @@ impl Worker for KvConnectorWorker { self.layer_events = raw_event_handles; - let worker = self.drt.runtime().primary().block_on(async move { + let worker = get_current_tokio_handle().block_on(async move { let worker = KvbmWorker::new(config, true).await?; anyhow::Ok(worker) })?; @@ -405,9 +405,17 @@ pub struct PyTrtllmKvConnectorWorker { impl PyTrtllmKvConnectorWorker { #[new] #[pyo3(signature = (py_drt, trtllm_rank))] - pub fn new(py_drt: PyDistributedRuntime, trtllm_rank: String) -> PyResult { + pub fn new(py_drt: Option, trtllm_rank: String) -> PyResult { + let drt: Option> = Python::with_gil(|py| { + if let Some(obj) = py_drt { + extract_distributed_runtime_from_obj(py, obj) + } else { + Ok(None) + } + })?; + let connector_worker: Box = - Box::new(KvConnectorWorker::new(py_drt, trtllm_rank).map_err(to_pyerr)?); + Box::new(KvConnectorWorker::new(drt, trtllm_rank).map_err(to_pyerr)?); Ok(Self { connector_worker }) } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/kvbm/src/block_manager/vllm/connector/worker.rs similarity index 94% rename from lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs rename to lib/kvbm/src/block_manager/vllm/connector/worker.rs index 4c93ac7185..1b12d28cad 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/worker.rs @@ -10,13 +10,13 @@ use std::collections::HashSet; use std::sync::{Arc, OnceLock}; use super::*; -use crate::llm::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; +use crate::block_manager::distributed::{get_leader_zmq_ack_url, get_leader_zmq_pub_url}; +use crate::{block_manager::distributed::VllmTensor, to_pyerr}; + +use crate::block_manager::distributed::PyLayoutType; use crate::{ - DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor, - to_pyerr, + extract_distributed_runtime_from_obj, get_current_cancel_token, get_current_tokio_handle, }; - -use crate::llm::block_manager::distributed::PyLayoutType; use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::layout::LayoutType; @@ -51,7 +51,7 @@ pub trait Worker: Send + Sync { } pub struct KvConnectorWorker { - drt: DistributedRuntime, + _drt: Option>, kvbm_worker: OnceLock, connector: WorkerSchedulerClient, transfer_client: TransferSchedulerClient, @@ -76,18 +76,18 @@ pub struct KvConnectorWorker { } impl KvConnectorWorker { - fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> anyhow::Result { - let drt = py_drt.inner.clone(); - let runtime = drt.runtime().primary(); + fn new(drt: Option>, vllm_worker_id: String) -> anyhow::Result { + let runtime = get_current_tokio_handle(); - let (scheduler, worker_client, transfer_client) = Scheduler::new(drt.primary_token()); + let (scheduler, worker_client, transfer_client) = + Scheduler::new(get_current_cancel_token()); CriticalTaskExecutionHandle::new_with_runtime( move |_| { let mut scheduler = scheduler; async move { scheduler.run().await } }, - drt.primary_token(), + get_current_cancel_token(), "kv-connector-scheduler-task", &runtime, )? @@ -99,7 +99,7 @@ impl KvConnectorWorker { ); Ok(Self { - drt, + _drt: drt, kvbm_worker: OnceLock::new(), connector: worker_client, transfer_client, @@ -194,21 +194,21 @@ impl Worker for KvConnectorWorker { }; let config = KvbmWorkerConfig::builder() - .drt(self.drt.clone()) + .cancel_token(get_current_cancel_token()) .num_device_blocks(num_device_blocks) .page_size(page_size) .tensors(vllm_tensors) .device_id(device_id) .dtype_width_bytes(dtype_width_bytes) - .leader_pub_url(get_leader_zmq_pub_url()) - .leader_ack_url(get_leader_zmq_ack_url()) .scheduler_client(Some(self.transfer_client.clone())) .device_layout_type(detected_device_layout_type) .host_layout_type(host_layout_type.unwrap_or(LayoutType::FullyContiguous)) .disk_layout_type(disk_layout_type.unwrap_or(LayoutType::FullyContiguous)) + .leader_pub_url(get_leader_zmq_pub_url()) + .leader_ack_url(get_leader_zmq_ack_url()) .build()?; - let worker = self.drt.runtime().primary().block_on(async move { + let worker = get_current_tokio_handle().block_on(async move { let worker = KvbmWorker::new(config, false).await?; anyhow::Ok(worker) })?; @@ -447,9 +447,17 @@ pub struct PyKvConnectorWorker { impl PyKvConnectorWorker { #[new] #[pyo3(signature = (py_drt, vllm_worker_id))] - pub fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> PyResult { + pub fn new(py_drt: Option, vllm_worker_id: String) -> PyResult { + let drt: Option> = Python::with_gil(|py| { + if let Some(obj) = py_drt { + extract_distributed_runtime_from_obj(py, obj) + } else { + Ok(None) + } + })?; + let connector_worker: Box = - Box::new(KvConnectorWorker::new(py_drt, vllm_worker_id).map_err(to_pyerr)?); + Box::new(KvConnectorWorker::new(drt, vllm_worker_id).map_err(to_pyerr)?); Ok(Self { connector_worker }) } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/request.rs b/lib/kvbm/src/block_manager/vllm/request.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/request.rs rename to lib/kvbm/src/block_manager/vllm/request.rs diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/slot.rs b/lib/kvbm/src/block_manager/vllm/slot.rs similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/slot.rs rename to lib/kvbm/src/block_manager/vllm/slot.rs diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/slot_manager_test_plan.md b/lib/kvbm/src/block_manager/vllm/slot_manager_test_plan.md similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/slot_manager_test_plan.md rename to lib/kvbm/src/block_manager/vllm/slot_manager_test_plan.md diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/slot_test_plan.md b/lib/kvbm/src/block_manager/vllm/slot_test_plan.md similarity index 100% rename from lib/bindings/python/rust/llm/block_manager/vllm/slot_test_plan.md rename to lib/kvbm/src/block_manager/vllm/slot_test_plan.md diff --git a/lib/kvbm/src/lib.rs b/lib/kvbm/src/lib.rs new file mode 100644 index 0000000000..99910ca1c3 --- /dev/null +++ b/lib/kvbm/src/lib.rs @@ -0,0 +1,120 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +use pyo3::exceptions::{PyRuntimeError, PyTypeError}; +use pyo3::types::{PyCapsule, PyCapsuleMethods}; +use pyo3::{exceptions::PyException, prelude::*}; +use std::sync::OnceLock; +use std::sync::Weak; +use std::{fmt::Display, sync::Arc}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; + +use dynamo_runtime::{self as rs, RuntimeConfig, logging, traits::DistributedRuntimeProvider}; + +use dynamo_llm::{self as llm_rs}; + +mod block_manager; + +/// A Python module implemented in Rust. The name of this function must match +/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to +/// import the module. +#[pymodule] +fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { + logging::init(); + + init_pyo3_tokio_rt(); + #[cfg(feature = "block-manager")] + block_manager::add_to_module(m)?; + + Ok(()) +} + +static PYO3_TOKIO_INIT: OnceLock<()> = OnceLock::new(); +static PYO3_TOKIO_RT: OnceLock = OnceLock::new(); +static PYO3_TOKIO_CANCEL_TOKEN: OnceLock = OnceLock::new(); + +// The runtime's threads do not survive when passing DistributedRuntime across bindings, +// so we need to reinitialize the runtime thread pool. +// This is also required in environments without a DistributedRuntime. +fn init_pyo3_tokio_rt() { + PYO3_TOKIO_INIT.get_or_init(|| { + let cfg = + RuntimeConfig::from_settings().expect("failed to build runtime config from settings"); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads( + cfg.num_worker_threads + .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()), + ) + .max_blocking_threads(cfg.max_blocking_threads) + .enable_all() + .build() + .expect("failed to build fallback tokio runtime for pyo3_async_runtimes"); + + let _ = PYO3_TOKIO_RT.set(rt); + let rt_ref = PYO3_TOKIO_RT.get().expect("runtime missing after set"); + + // Initialize the shared cancellation token + let cancel_token = CancellationToken::new(); + let _ = PYO3_TOKIO_CANCEL_TOKEN.set(cancel_token); + + // Initialize pyo3-async runtimes with this runtime + let _ = pyo3_async_runtimes::tokio::init_with_runtime(rt_ref); + }); +} + +pub fn get_current_tokio_handle() -> tokio::runtime::Handle { + PYO3_TOKIO_RT + .get() + .expect("Tokio runtime not initialized!") + .handle() + .clone() +} + +pub fn get_current_cancel_token() -> CancellationToken { + PYO3_TOKIO_CANCEL_TOKEN + .get() + .expect("Cancellation token not initialized!") + .clone() +} + +pub fn to_pyerr(err: E) -> PyErr +where + E: Display, +{ + PyException::new_err(format!("{}", err)) +} + +#[pyclass] +#[derive(Clone)] +struct Component { + inner: rs::component::Component, +} + +pub fn extract_distributed_runtime_from_obj( + py: Python<'_>, + drt_obj: PyObject, +) -> PyResult>> { + if drt_obj.is_none(py) { + return Ok(None); + } + + let obj = drt_obj.bind(py); + + let cls = py.import("dynamo._core")?.getattr("DistributedRuntime")?; + if !obj.is_instance(&cls)? { + return Err(PyTypeError::new_err( + "expected dynamo._core.DistributedRuntime", + )); + } + + let cap_any = obj.call_method0("to_capsule")?; + let cap: &Bound<'_, PyCapsule> = cap_any.downcast()?; + let weak: &Weak = unsafe { cap.reference::>() }; + + let strong = weak.upgrade().ok_or_else(|| { + PyRuntimeError::new_err("runtime is no longer alive (weak ref upgrade failed)") + })?; + + Ok(Some(strong)) +} diff --git a/lib/bindings/python/tests/test_kvbm_vllm_integration.py b/lib/kvbm/tests/test_kvbm_vllm_integration.py similarity index 99% rename from lib/bindings/python/tests/test_kvbm_vllm_integration.py rename to lib/kvbm/tests/test_kvbm_vllm_integration.py index bf58d2269b..20083650ac 100644 --- a/lib/bindings/python/tests/test_kvbm_vllm_integration.py +++ b/lib/kvbm/tests/test_kvbm_vllm_integration.py @@ -26,8 +26,8 @@ VLLM_NOT_AVAILABLE = True try: - from dynamo.llm import BlockManager - from dynamo.llm.vllm_integration.kv_cache_manager import KvbmCacheManager + from kvbm import BlockManager + from kvbm.vllm_integration.kv_cache_manager import KvbmCacheManager KVBM_NOT_AVAILABLE = False except ImportError: @@ -819,7 +819,7 @@ def test_kvbm_wrong_blocks_provided(): @pytest.mark.skipif(KVBM_NOT_AVAILABLE, reason="KVBM not available") @pytest.mark.skipif(VLLM_NOT_AVAILABLE, reason="VLLM not available") -@patch("dynamo.llm.vllm_integration.kv_cache_manager.KvbmCacheManager") +@patch("kvbm.vllm_integration.kv_cache_manager.KvbmCacheManager") def test_kvbm_new_matched_tokens_edge_case(MockCacheManager): PAGE_SIZE = 4 NUM_BLOCKS = 3 diff --git a/lib/llm/src/block_manager/distributed.rs b/lib/llm/src/block_manager/distributed.rs index c1a79492d0..7c50d8ea99 100644 --- a/lib/llm/src/block_manager/distributed.rs +++ b/lib/llm/src/block_manager/distributed.rs @@ -115,12 +115,6 @@ mod tests { } } - fn get_unique_barrier_id() -> String { - static COUNTER: AtomicUsize = AtomicUsize::new(0); - - COUNTER.fetch_add(1, Ordering::Relaxed).to_string() - } - async fn build_leader_and_workers(num_workers: usize) -> Result<(KvbmLeader, Vec)> { let mut workers = Vec::new(); diff --git a/lib/llm/src/block_manager/distributed/leader.rs b/lib/llm/src/block_manager/distributed/leader.rs index a64bb6d428..8988934b67 100644 --- a/lib/llm/src/block_manager/distributed/leader.rs +++ b/lib/llm/src/block_manager/distributed/leader.rs @@ -69,6 +69,7 @@ impl KvbmLeaderConfig { "leader_pub_url and leader_ack_url must differ (same endpoint would fail to bind)." ); } + let cpu = &self.host_blocks_config; let disk = &self.disk_blocks_config; let cpu_configured = cpu.num_blocks_overriden > 0 || cpu.cache_size_in_gb > 0.0; diff --git a/lib/llm/src/block_manager/distributed/worker.rs b/lib/llm/src/block_manager/distributed/worker.rs index 2aaa207d89..d7503e79cf 100644 --- a/lib/llm/src/block_manager/distributed/worker.rs +++ b/lib/llm/src/block_manager/distributed/worker.rs @@ -29,7 +29,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use tokio::runtime::Handle; use tokio_util::sync::CancellationToken; -use dynamo_runtime::{DistributedRuntime, utils::task::CriticalTaskExecutionHandle}; +use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use tokio::sync::{Mutex, RwLock, oneshot}; struct WorkerState { @@ -362,7 +362,7 @@ impl Handler for BlockTransferDispatch { #[derive(Builder, Clone)] #[builder(pattern = "owned")] pub struct KvbmWorkerConfig { - drt: DistributedRuntime, + cancel_token: CancellationToken, num_device_blocks: usize, @@ -531,7 +531,7 @@ impl KvbmWorker { CriticalTaskExecutionHandle, oneshot::Receiver, )> { - let cancel_token = config.drt.primary_token().clone(); + let cancel_token = config.cancel_token.clone(); // establish a oneshot channel to get back the raw BlockTransferHandler let (handler_tx, handler_rx) = oneshot::channel(); @@ -582,7 +582,7 @@ impl KvbmWorker { CriticalTaskExecutionHandle, oneshot::Receiver, )> { - let cancel_token = config.drt.primary_token().clone(); + let cancel_token = config.cancel_token.clone(); let scheduler_client = config.scheduler_client.clone(); // channel to get BlockTransferHandler back to the caller @@ -682,8 +682,7 @@ impl KvbmWorker { scheduler_client: Option, bytes_per_block: usize, ) -> anyhow::Result<()> { - let drt = config.drt.clone(); - let worker_id = drt.connection_id() as usize; + let worker_id = config.device_id; // Readiness gating for ping let state = Arc::new(WorkerState::new()); diff --git a/lib/llm/src/block_manager/state/resources.rs b/lib/llm/src/block_manager/state/resources.rs index 93eb48809f..7d99d0c484 100644 --- a/lib/llm/src/block_manager/state/resources.rs +++ b/lib/llm/src/block_manager/state/resources.rs @@ -57,13 +57,6 @@ impl Resources { tracing::debug!("Creating NIXL backends"); - if let Ok((_, ucx_params)) = agent.get_plugin_params("UCX") { - let backend = agent.create_backend("UCX", &ucx_params)?; - nixl_backends.insert("UCX".to_string(), Arc::new(backend)); - } else { - tracing::warn!("No UCX plugin found; will not create UCX backend"); - } - if config.disk_layout.is_some() { if let Ok((_, gds_mt_params)) = agent.get_plugin_params("GDS_MT") { let backend = agent.create_backend("GDS_MT", &gds_mt_params)?; diff --git a/pyproject.toml b/pyproject.toml index 8bbbfc03d7..9c3e63a9f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,6 +147,7 @@ addopts = [ "--ignore-glob=*model.py", "--ignore-glob=*vllm_integration*", "--ignore-glob=*trtllm_integration*", + "--ignore-glob=*kvbm/python/kvbm*", "--ignore-glob=*_inc.py", "--ignore-glob=*/llm/tensorrtllm*", "--ignore-glob=docs/*", diff --git a/tests/kvbm/README.md b/tests/kvbm_integration/README.md similarity index 95% rename from tests/kvbm/README.md rename to tests/kvbm_integration/README.md index ee0bf922f1..a4ef4900d2 100644 --- a/tests/kvbm/README.md +++ b/tests/kvbm_integration/README.md @@ -34,10 +34,10 @@ pytest -v -m "kvbm" -s Run the determinism test file directly inside dynamo repo: ```bash -pytest -v tests/kvbm/test_determinism_agg.py -s +pytest -v tests/kvbm_integration/test_determinism_agg.py -s # disagg needs 2 GPUs to run -pytest -v tests/kvbm/test_determinism_disagg.py -s +pytest -v tests/kvbm_integration/test_determinism_disagg.py -s ``` ## Configuration @@ -84,7 +84,7 @@ pytest -v -m "kvbm" -s ## Requirements - `vllm` executable available in PATH inside the test environment. -- The connector module path must be valid: `dynamo.llm.vllm_integration.connector`. +- The connector module path must be valid: `kvbm.vllm_integration.connector`. - NATS and etcd services (provided automatically by the `runtime_services` fixture). - `datasets` library for IFEval concurrent testing (included in test dependencies). - For containerized workflows, follow the top-level `tests/README.md` guidance to build/run the appropriate image, then execute pytest inside the container. diff --git a/tests/kvbm/__init__.py b/tests/kvbm_integration/__init__.py similarity index 100% rename from tests/kvbm/__init__.py rename to tests/kvbm_integration/__init__.py diff --git a/tests/kvbm/common.py b/tests/kvbm_integration/common.py similarity index 100% rename from tests/kvbm/common.py rename to tests/kvbm_integration/common.py diff --git a/tests/kvbm/engine_config_with_cuda_graph_and_kvbm.yaml b/tests/kvbm_integration/engine_config_with_cuda_graph_and_kvbm.yaml similarity index 87% rename from tests/kvbm/engine_config_with_cuda_graph_and_kvbm.yaml rename to tests/kvbm_integration/engine_config_with_cuda_graph_and_kvbm.yaml index 21b58f4882..4abec5599c 100644 --- a/tests/kvbm/engine_config_with_cuda_graph_and_kvbm.yaml +++ b/tests/kvbm_integration/engine_config_with_cuda_graph_and_kvbm.yaml @@ -9,6 +9,6 @@ kv_cache_config: free_gpu_memory_fraction: 0.80 max_tokens: 8192 kv_connector_config: - connector_module: dynamo.llm.trtllm_integration.connector + connector_module: kvbm.trtllm_integration.connector connector_scheduler_class: DynamoKVBMConnectorLeader connector_worker_class: DynamoKVBMConnectorWorker diff --git a/tests/kvbm/engine_config_without_cuda_graph_and_kvbm.yaml b/tests/kvbm_integration/engine_config_without_cuda_graph_and_kvbm.yaml similarity index 87% rename from tests/kvbm/engine_config_without_cuda_graph_and_kvbm.yaml rename to tests/kvbm_integration/engine_config_without_cuda_graph_and_kvbm.yaml index e7f5fcd118..00c26471ac 100644 --- a/tests/kvbm/engine_config_without_cuda_graph_and_kvbm.yaml +++ b/tests/kvbm_integration/engine_config_without_cuda_graph_and_kvbm.yaml @@ -8,6 +8,6 @@ kv_cache_config: free_gpu_memory_fraction: 0.80 max_tokens: 8192 kv_connector_config: - connector_module: dynamo.llm.trtllm_integration.connector + connector_module: kvbm.trtllm_integration.connector connector_scheduler_class: DynamoKVBMConnectorLeader connector_worker_class: DynamoKVBMConnectorWorker diff --git a/tests/kvbm/test_consolidator_router_e2e.py b/tests/kvbm_integration/test_consolidator_router_e2e.py similarity index 99% rename from tests/kvbm/test_consolidator_router_e2e.py rename to tests/kvbm_integration/test_consolidator_router_e2e.py index 25fff7edb6..22e6042ed2 100755 --- a/tests/kvbm/test_consolidator_router_e2e.py +++ b/tests/kvbm_integration/test_consolidator_router_e2e.py @@ -23,7 +23,7 @@ import pytest import requests -from tests.kvbm.common import ApiTester, check_logs_for_patterns +from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns from tests.utils.managed_process import ManagedProcess # Check if vLLM is available diff --git a/tests/kvbm/test_cuda_graph.py b/tests/kvbm_integration/test_cuda_graph.py similarity index 100% rename from tests/kvbm/test_cuda_graph.py rename to tests/kvbm_integration/test_cuda_graph.py diff --git a/tests/kvbm/test_determinism_agg.py b/tests/kvbm_integration/test_determinism_agg.py similarity index 99% rename from tests/kvbm/test_determinism_agg.py rename to tests/kvbm_integration/test_determinism_agg.py index fa8b6ba364..9f76eb6823 100755 --- a/tests/kvbm/test_determinism_agg.py +++ b/tests/kvbm_integration/test_determinism_agg.py @@ -109,7 +109,7 @@ def _set_up_vllm_config(self, gpu_cache_blocks): "--port", str(self.port), "--kv-transfer-config", - '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}', + '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "kvbm.vllm_integration.connector"}', os.environ.get("KVBM_MODEL_ID", "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"), "--max-model-len", "8000", # required to fit on L4 GPU when using 8b model @@ -132,7 +132,7 @@ def _set_up_trtllm_config(self, gpu_cache_blocks): "free_gpu_memory_fraction": 0.10, # Set a small GPU fraction so that we can evict/reset the on-device kv cache faster } llm_api_config["kv_connector_config"] = { - "connector_module": "dynamo.llm.trtllm_integration.connector", + "connector_module": "kvbm.trtllm_integration.connector", "connector_scheduler_class": "DynamoKVBMConnectorLeader", "connector_worker_class": "DynamoKVBMConnectorWorker", } diff --git a/tests/kvbm/test_determinism_disagg.py b/tests/kvbm_integration/test_determinism_disagg.py similarity index 100% rename from tests/kvbm/test_determinism_disagg.py rename to tests/kvbm_integration/test_determinism_disagg.py diff --git a/tests/serve/test_vllm.py b/tests/serve/test_vllm.py index 62f857d760..24c03c9473 100644 --- a/tests/serve/test_vllm.py +++ b/tests/serve/test_vllm.py @@ -100,8 +100,8 @@ class VLLMConfig(EngineConfig): ], timeout=700, request_payloads=[ - chat_payload_default(), - completion_payload_default(), + chat_payload_default(expected_response=["joke"]), + completion_payload_default(expected_response=["joke"]), ], ), "multimodal_agg_llava": VLLMConfig(