diff --git a/.github/workflows/test-spyre.yml b/.github/workflows/test-spyre.yml deleted file mode 100644 index 566a33264..000000000 --- a/.github/workflows/test-spyre.yml +++ /dev/null @@ -1,27 +0,0 @@ -name: test-sypre - -on: pull_request - -jobs: - test-spyre: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - name: Build docker image - run: docker build . -t vllm-spyre -f Dockerfile.spyre - - name: Run Spyre tests within docker container - run: | - docker run -i --rm --entrypoint /bin/bash vllm-spyre -c ''' - pip install pytest sentence-transformers && \ - python3.12 -c "from transformers import pipeline; pipeline(\"text-generation\", model=\"JackFram/llama-160m\")" && \ - export VARIANT=$(ls /root/.cache/huggingface/hub/models--JackFram--llama-160m/snapshots/) && \ - mkdir -p /models && \ - ln -s /root/.cache/huggingface/hub/models--JackFram--llama-160m/snapshots/${VARIANT} /models/llama-194m && \ - python3.12 -c "from sentence_transformers import SentenceTransformer; SentenceTransformer(\"sentence-transformers/all-roberta-large-v1\")" && \ - export VARIANT=$(ls /root/.cache/huggingface/hub/models--sentence-transformers--all-roberta-large-v1/snapshots/) && \ - ln -s /root/.cache/huggingface/hub/models--sentence-transformers--all-roberta-large-v1/snapshots/${VARIANT} /models/all-roberta-large-v1 && \ - export MASTER_PORT=12355 && \ - export MASTER_ADDR=localhost && \ - export DISTRIBUTED_STRATEGY_IGNORE_MODULES=WordEmbedding && \ - python3.12 -m pytest tests/spyre -v - ''' diff --git a/.yapfignore b/.yapfignore index 7a23d8913..2d6dcf838 100644 --- a/.yapfignore +++ b/.yapfignore @@ -1,3 +1 @@ collect_env.py - -vllm/model_executor/model_loader/spyre_setup.py diff --git a/Dockerfile.spyre b/Dockerfile.spyre deleted file mode 100644 index c68dc5b4d..000000000 --- a/Dockerfile.spyre +++ /dev/null @@ -1,28 +0,0 @@ -# Global Args ################################################################# -ARG BASE_UBI_IMAGE_TAG=9.4 -ARG PYTHON_VERSION=3.12 - -# Base Layer ################################################################## -FROM registry.access.redhat.com/ubi9/ubi-minimal:${BASE_UBI_IMAGE_TAG} AS base -ARG PYTHON_VERSION -ENV PYTHON_VERSION=${PYTHON_VERSION} -WORKDIR /workspace/vllm - -# Install some basic utilities ################################################################## -RUN microdnf update -y && microdnf install -y \ - python${PYTHON_VERSION}-devel python${PYTHON_VERSION}-pip python${PYTHON_VERSION}-wheel git vim gcc g++\ - && microdnf clean all - -# Install build dependencies ################################################################## -RUN --mount=type=bind,source=requirements-build.txt,target=requirements-build.txt \ - python3.12 -m pip install --upgrade pip && \ - pip install -r requirements-build.txt - -# Build vLLM ################################################################## -COPY . . - -ENV VLLM_TARGET_DEVICE=spyre -RUN --mount=type=bind,source=.git,target=.git \ - pip install --no-build-isolation -v -e . - -CMD ["/bin/bash"] diff --git a/examples/offline_inference_multi_spyre.py b/examples/offline_inference_multi_spyre.py deleted file mode 100644 index 7bf422d8c..000000000 --- a/examples/offline_inference_multi_spyre.py +++ /dev/null @@ -1,60 +0,0 @@ -import gc -import os -import time - -from vllm import LLM, SamplingParams - -max_tokens = 3 - -os.environ["VLLM_SPYRE_WARMUP_PROMPT_LENS"] = '64' -os.environ["VLLM_SPYRE_WARMUP_NEW_TOKENS"] = str(max_tokens) -os.environ['VLLM_SPYRE_WARMUP_BATCH_SIZES'] = '1' - -# stuff for multi-spyre -os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1" -os.environ["DISTRIBUTED_STRATEGY_IGNORE_MODULES"] = "WordEmbedding" -os.environ["MASTER_ADDR"] = "localhost" -os.environ["MASTER_PORT"] = "12355" - -# Sample prompts. -template = ( - "Below is an instruction that describes a task. Write a response that " - "appropriately completes the request. Be polite in your response to the " - "user.\n\n### Instruction:\n{}\n\n### Response:") -prompt1 = template.format( - "Provide a list of instructions for preparing chicken soup for a family " - "of four.") -prompts = [ - prompt1, -] - -# Create a sampling params object. -sampling_params = SamplingParams(max_tokens=max_tokens, - temperature=0.0, - ignore_eos=True) -# Create an LLM. -llm = LLM( - model="/models/llama-194m", - tokenizer="/models/llama-194m", - max_model_len=2048, - block_size=2048, - device="spyre", - tensor_parallel_size=2, -) - -# Generate texts from the prompts. The output is a list of RequestOutput objects -# that contain the prompt, generated text, and other information. -print("=============== GENERATE") -t0 = time.time() -outputs = llm.generate(prompts, sampling_params) -print("Time elaspsed for %d tokens is %.2f sec" % - (len(outputs[0].outputs[0].token_ids), time.time() - t0)) -for output in outputs: - prompt = output.prompt - generated_text = output.outputs[0].text - print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}") -print(output.outputs[0]) - -# needed to prevent ugly stackdump caused by sigterm -del llm -gc.collect() diff --git a/examples/offline_inference_spyre.ipynb b/examples/offline_inference_spyre.ipynb deleted file mode 100644 index 792c73177..000000000 --- a/examples/offline_inference_spyre.ipynb +++ /dev/null @@ -1,313 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "bb1996e6", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "WARNING 08-15 14:54:53 _custom_ops.py:14] Failed to import from vllm._C with ModuleNotFoundError(\"No module named 'vllm._C'\")\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/opt/vllm/lib64/python3.9/site-packages/vllm/connections.py:8: RuntimeWarning: Failed to read commit hash:\n", - "No module named 'vllm.commit_id'\n", - " from vllm.version import __version__ as VLLM_VERSION\n" - ] - } - ], - "source": [ - "import time\n", - "%load_ext wurlitzer" - ] - }, - { - "cell_type": "markdown", - "id": "45172614", - "metadata": {}, - "source": [ - "Offline inference demo\n", - "----------------------------\n", - "This is just a brief demo to show that vLLM with Spyre can be used in the offline mode. \n", - "\n", - "vLLM will determine the Spyre config automatically and warmup the Spyre stack. \n", - "The startup of vLLM (including warmup of Spyre), is expected to take 15 min for prompt length of 64 and maximum number of decode tokens 5 (it will take ~20min for 64/20)." - ] - }, - { - "cell_type": "markdown", - "id": "bf37c07b", - "metadata": {}, - "source": [ - "### 1. create vLLM instance \n", - "(for offline usage, including warmup)\n", - "\n", - "The maximum prompt length and maximum number of decode tokens can be specified using the environment variables `VLLM_SPYRE_WARMUP_PROMPT_LENS`, and `VLLM_SPYRE_WARMUP_NEW_TOKENS`. \n", - "Otherwise the default max prompt length of 64 and maximum of 20 decode tokens will be used. " - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "ecf0992b", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "\n", - "os.environ['VLLM_SPYRE_WARMUP_PROMPT_LENS'] = '64'\n", - "os.environ['VLLM_SPYRE_WARMUP_NEW_TOKENS'] = '5'" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "88b984d7", - "metadata": { - "scrolled": true - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "INFO 08-15 14:54:54 llm_engine.py:176] Initializing an LLM engine (v0.5.3.post1) with config: model='/tmp/7B-F', speculative_config=None, tokenizer='/tmp/7B-F', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, rope_scaling=None, rope_theta=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=2048, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto, quantization_param_path=None, device_config=cpu, decoding_config=DecodingConfig(guided_decoding_backend='outlines'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None), seed=0, served_model_name=/tmp/7B-F, use_v2_block_manager=False, enable_prefix_caching=False)\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "You are using the default legacy behaviour of the . This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message.\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "WARNING: Disabled: dynamo_tracer\n", - "WARNING 08-15 14:54:55 utils.py:581] Pin memory is not supported on Spyre device.\n", - "[SpyreWorker] environment configured\n", - "[SpyreWorker] load model...\n", - ">> DEBUG SETUP\n", - "0 / 1 : Python Version : 3.9.18\n", - "0 / 1 : PyTorch Version: 2.2.2+cpu\n", - "0 / 1 : PCI Addr Rank 0 AIU_WORLD_RANK_0=0\n", - "0 / 1 : PCI Addr Rank 0 FLEX_RDMA_PCI_BUS_ADDR_0=0000:1e:00.0\n", - "0 / 1 : FLEX_COMPUTE=SENTIENT\n", - "0 / 1 : FLEX_DEVICE=VFIO\n", - "0 / 1 : DEEPRT_EXPORT_DIR=export/0\n", - "0 / 1 : DTCOMPILER_EXPORT_DIR=export/0\n", - "0 / 1 : AIU_CONFIG_FILE_0=/etc/aiu/senlib_config.json\n", - "0 / 1 : SENLIB_DEVEL_CONFIG_FILE=/etc/aiu/senlib_config.json\n", - "0 / 1 : FLEX_RDMA_PCI_BUS_ADDR_0=0000:1e:00.0\n", - "0 / 1 : FLEX_RDMA_LOCAL_RANK=0\n", - "0 / 1 : FLEX_RDMA_LOCAL_SIZE=1\n", - "0 / 1 : FLEX_RDMA_WORLD_RANK=0\n", - "0 / 1 : FLEX_RDMA_WORLD_SIZE=1\n", - "0 / 1 : Spyre: Enabled (0) (offset=0)\n", - "0 / 1 : Dynamo Backend : sendnn_decoder\n", - "0 / 1 : CPU Cores : 56 x 2 HW threads\n", - "------------------------------------------------------------\n", - "NOTICE: Adjusting torch._dynamo.config.accumulated_cache_size_limit from 64 to 160 to accommodate prompt size of 64 and decode tokens of 5\n", - "NOTICE: Adjusting torch._dynamo.config.cache_size_limit from 8 to 160 to accommodate prompt size of 64 and decode tokens of 5\n", - "\tload model took 62.92104411125183s\n", - "[SpyreWorker] Start warming up 1 different prompt/decode-shape combinations.\n", - "[SpyreWorker] Warmup 1/1 prompt/decode-shape combinations...\n", - "[SpyreWorker] warmup for prompt length 64 and max output tokens 5.\n", - "[SpyreWorker] warmup 1/2...\n", - "[SpyreWorker] warmup 2/2...\n", - "update_lazyhandle() done (duration: 134.3403525352478s)\n", - "[SpyreWorker] ... warmup finished.\n", - "\twarmup took 893.4236354827881s (for prompt length 64 and max output tokens 5)\n", - "[SpyreWorker] All warmups for 1 different prompt/decode-shape combinations finished. Total warmup time 893.4242045879364s.\n" - ] - } - ], - "source": [ - "from vllm import LLM, SamplingParams\n", - "\n", - "# Create an LLM.\n", - "llm = LLM(\n", - " model=\"/models/llama-7b-chat\",\n", - " tokenizer=\"/models/llama-7b-chat\",\n", - " max_model_len=2048,\n", - " block_size=2048,\n", - " device=\"spyre\")" - ] - }, - { - "cell_type": "markdown", - "id": "818488f4", - "metadata": {}, - "source": [ - "### 2. Create the prompt and `SamplingParams`" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "6c32e3e2", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "['Below is an instruction that describes a task. Write a response that appropriately completes the request. Be polite in your response to the user.\\n\\n### Instruction:\\nProvide a list of instructions for preparing chicken soup for a family of four.\\n\\n### Response:']\n" - ] - } - ], - "source": [ - "# Sample prompts.\n", - "template = (\n", - " \"Below is an instruction that describes a task. Write a response that \"\n", - " \"appropriately completes the request. Be polite in your response to the \"\n", - " \"user.\\n\\n### Instruction:\\n{}\\n\\n### Response:\"\n", - ")\n", - "prompt1 = template.format(\n", - " \"Provide a list of instructions for preparing chicken soup for a family \"\n", - " \"of four.\"\n", - ")\n", - "prompts = [\n", - " prompt1,\n", - "]\n", - "print(prompts)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "4cc1277e", - "metadata": {}, - "outputs": [], - "source": [ - "# Create a sampling params object.\n", - "max_tokens = 5\n", - "sampling_params = SamplingParams(max_tokens=max_tokens, temperature=0.0)\n" - ] - }, - { - "cell_type": "markdown", - "id": "cffb16c5", - "metadata": {}, - "source": [ - "### 3. Generate the response" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "522c0610", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "=============== GENERATE\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Processed prompts: 0%| | 0/1 [00:00 bool: return VLLM_TARGET_DEVICE == "cpu" -def _is_spyre() -> bool: - return VLLM_TARGET_DEVICE == "spyre" - - def _is_openvino() -> bool: return VLLM_TARGET_DEVICE == "openvino" @@ -511,8 +507,6 @@ def get_vllm_version() -> str: if neuron_version != MAIN_CUDA_VERSION: neuron_version_str = neuron_version.replace(".", "")[:3] version += f"{sep}neuron{neuron_version_str}" - elif _is_spyre(): - version += f"{sep}spyre" elif _is_hpu(): # Get the Intel Gaudi Software Suite version gaudi_sw_version = str(get_gaudi_sw_version()) @@ -560,7 +554,7 @@ def _read_requirements(filename: str) -> List[str]: return resolved_requirements if _no_device(): - requirements = _read_requirements("requirements-cuda.txt") + requirements = _read_requirements("requirements-common.txt") elif _is_cuda(): requirements = _read_requirements("requirements-cuda.txt") cuda_major, cuda_minor = torch.version.cuda.split(".") @@ -577,8 +571,6 @@ def _read_requirements(filename: str) -> List[str]: requirements = _read_requirements("requirements-rocm.txt") elif _is_neuron(): requirements = _read_requirements("requirements-neuron.txt") - elif _is_spyre(): - requirements = _read_requirements("requirements-spyre.txt") elif _is_hpu(): requirements = _read_requirements("requirements-hpu.txt") elif _is_openvino(): diff --git a/tests/spyre/spyre_util.py b/tests/spyre/spyre_util.py deleted file mode 100644 index c2985453c..000000000 --- a/tests/spyre/spyre_util.py +++ /dev/null @@ -1,283 +0,0 @@ -import math -import os -from typing import Any, Dict, List, Tuple, Union - -import numpy as np -from sentence_transformers import SentenceTransformer, util -from transformers import AutoModelForCausalLM, AutoTokenizer - -from vllm import LLM, SamplingParams - -DISABLE_ASSERTS = False # used for debugging - -ISCLOSE_REL_TOL_CPU = 0.1 -ISCLOSE_REL_TOL_SPYRE = 0.1 - - -# vLLM / Spyre -def generate_spyre_vllm_output(model: str, prompts: List[str], - warmup_shapes: List[Tuple[int, int, int]], - max_model_len: int, block_size: int, - sampling_params: Union[SamplingParams, - List[SamplingParams]], - tensor_parallel_size: int, - backend: str) -> List[Dict[str, Any]]: - - warmup_prompt_length = [t[0] for t in warmup_shapes] - warmup_new_tokens = [t[1] for t in warmup_shapes] - warmup_batch_size = [t[2] for t in warmup_shapes] - - os.environ['VLLM_SPYRE_WARMUP_PROMPT_LENS'] = ','.join( - str(val) for val in warmup_prompt_length) - os.environ['VLLM_SPYRE_WARMUP_NEW_TOKENS'] = ','.join( - str(val) for val in warmup_new_tokens) - os.environ['VLLM_SPYRE_WARMUP_BATCH_SIZES'] = ','.join( - str(val) for val in warmup_batch_size) - os.environ['VLLM_SPYRE_DYNAMO_BACKEND'] = backend - - vllm_model = LLM(model=model, - tokenizer=model, - max_model_len=max_model_len, - block_size=block_size, - tensor_parallel_size=tensor_parallel_size, - device="spyre") - - vllm_outputs = vllm_model.generate(prompts, sampling_params) - - results = [] - for req_output in vllm_outputs: - result = {} - result['text'] = req_output.outputs[0].text - result['token_ids'] = req_output.outputs[0].token_ids - result['tokens'] = tuple([ - req_output.outputs[0].logprobs[i][t].decoded_token - for i, t in enumerate(result['token_ids']) - ]) - result['logprobs'] = tuple([ - req_output.outputs[0].logprobs[i][t].logprob - for i, t in enumerate(result['token_ids']) - ]) - results.append(result) - - return results - - -# Hugging Face -def generate_hf_output( - model: str, prompts: List[str], - max_new_tokens: Union[int, List[int]]) -> List[Dict[str, Any]]: - - if not isinstance(max_new_tokens, list): - max_new_tokens = [max_new_tokens] * len(prompts) - - hf_model = AutoModelForCausalLM.from_pretrained(model) - hf_tokenizer = AutoTokenizer.from_pretrained(model) - - results = [] - for prompt_index, prompt in enumerate(prompts): - hf_input_tokens = hf_tokenizer(prompt, return_tensors="pt").input_ids - hf_output = hf_model.generate( - hf_input_tokens, - do_sample=False, - max_new_tokens=max_new_tokens[prompt_index], - return_dict_in_generate=True, - output_scores=True) - - # decode output tokens after first removing input tokens (prompt) - hf_generated_text = hf_tokenizer.batch_decode( - hf_output.sequences[:, len(hf_input_tokens[0]):])[0] - hf_transition_scores = hf_model.compute_transition_scores( - hf_output.sequences, hf_output.scores, normalize_logits=True) - - # return HF generated text, tokens, token ids and logprobs - result = {} - result['text'] = hf_generated_text - result['token_ids'] = [] - result['tokens'] = [] - result['logprobs'] = [] - for tok_index, hf_logprob in enumerate(hf_transition_scores[0]): - hf_token_id = hf_output.sequences[0][tok_index + - len(hf_input_tokens[0])] - result['token_ids'].append(hf_token_id.item()) - result['tokens'].append(hf_tokenizer.decode(hf_token_id)) - result['logprobs'].append(hf_logprob.item()) - result['token_ids'] = tuple(result['token_ids']) - result['tokens'] = tuple(result['tokens']) - result['logprobs'] = tuple(result['logprobs']) - results.append(result) - - return results - - -# compare results -def compare_results(model: str, prompts: List[str], - warmup_shapes: List[Tuple[int, int, - int]], tensor_parallel_size: int, - backend: str, vllm_results: List[Dict[str, Any]], - hf_results: List[Dict[str, Any]]): - - print(f"\nmodel: {model:s}") - print(f"warmup shapes: {warmup_shapes}") - print(f"tp size: {tensor_parallel_size}") - print(f"backend: {backend:s}") - print(f"\n#prompts: {len(prompts):d}") - print(f"#HF results: {len(hf_results):d}" - f"{'' if len(hf_results) == len(prompts) else ' ERROR':s}") - print(f"#vLLM results: {len(vllm_results):d}" - f"{'' if len(vllm_results) == len(prompts) else ' ERROR':s}") - print() - - assert DISABLE_ASSERTS or len(hf_results) == len(vllm_results) - assert DISABLE_ASSERTS or len(hf_results) == len(prompts) - - for prompt_index, (prompt, hf_result, vllm_result) in enumerate( - zip(prompts, hf_results, vllm_results)): - err_msg = '' if hf_result['text'] == vllm_result['text'] else ' ERROR' - print(f"\nprompt {prompt_index:3d}: {repr(prompt):s}") - print("generated:") - print(f" HF: {repr(hf_result['text']):s}") - print(f" vLLM: {repr(vllm_result['text']):s}{err_msg}") - print() - - assert DISABLE_ASSERTS or backend == 'sendnn_decoder' or\ - hf_result['token_ids'] == vllm_result['token_ids'] - - if len(hf_result['tokens']) > 0: - print(" token id. token logprob " - " token id. token logprob") - - logprob_abs_diff_list = [] - logprob_rel_diff_list = [] - - for hf_token, hf_token_id, hf_logprob, vllm_token,\ - vllm_token_id, vllm_logprob in zip( - hf_result['tokens'], hf_result['token_ids'], - hf_result['logprobs'], vllm_result['tokens'], - vllm_result['token_ids'], vllm_result['logprobs']): - logprob_abs_diff = math.fabs(hf_logprob - vllm_logprob) - logprob_abs_diff_list.append(logprob_abs_diff) - logprob_rel_diff = math.fabs(logprob_abs_diff / hf_logprob) - logprob_rel_diff_list.append(logprob_rel_diff) - - print( - f"HF: {hf_token_id:8d} {repr(hf_token):14s} " - f"{hf_logprob:14f} " - f"vLLM: {vllm_token_id:8d} {repr(vllm_token):14s} " - f"{vllm_logprob:14f} ", - end='') - - if backend == 'sendnn_decoder': - rel_tol = ISCLOSE_REL_TOL_SPYRE - else: - rel_tol = ISCLOSE_REL_TOL_CPU - - if hf_token_id != vllm_token_id: # different tokens - if backend == 'sendnn_decoder' and math.isclose( - hf_logprob, vllm_logprob, rel_tol=rel_tol): - # probably still OK - print('DIVERGING') - break - else: - print('ERROR') - assert DISABLE_ASSERTS or False - break - else: # identical tokens - if math.isclose(hf_logprob, vllm_logprob, rel_tol=rel_tol): - print() - else: - print('ERROR') - assert DISABLE_ASSERTS or False - break - - print() - print("logprob absolute differences: " - f"average={np.mean(logprob_abs_diff_list):f} " - f"maximum={np.max(logprob_abs_diff_list):f}") - print("logprob relative differences: " - f"average={np.mean(logprob_rel_diff_list):f} " - f"maximum={np.max(logprob_rel_diff_list):f}") - - print() - - -# vLLM / Spyre -def spyre_vllm_embeddings(model: str, prompts: List[str], - warmup_shapes: List[Tuple[int, - int]], max_model_len: int, - block_size: int, tensor_parallel_size: int, - backend: str) -> List[Dict[str, Any]]: - - warmup_prompt_length = [t[0] for t in warmup_shapes] - warmup_new_tokens = [0] * len(warmup_shapes) - warmup_batch_size = [t[1] for t in warmup_shapes] - - os.environ['VLLM_SPYRE_WARMUP_PROMPT_LENS'] = ','.join( - str(val) for val in warmup_prompt_length) - os.environ['VLLM_SPYRE_WARMUP_NEW_TOKENS'] = ','.join( - str(val) for val in warmup_new_tokens) - os.environ['VLLM_SPYRE_WARMUP_BATCH_SIZES'] = ','.join( - str(val) for val in warmup_batch_size) - os.environ['VLLM_SPYRE_DYNAMO_BACKEND'] = backend - - vllm_model = LLM(model=model, - tokenizer=model, - max_model_len=max_model_len, - block_size=block_size, - tensor_parallel_size=tensor_parallel_size, - device="spyre") - - vllm_outputs = vllm_model.encode(prompts) - - results = [] - for req_output in vllm_outputs: - result = {} - result["embeddings"] = req_output.outputs.embedding - results.append(result) - - return results - - -# Hugging Face -def st_embeddings(model: str, prompts: List[str]) -> List[Dict[str, Any]]: - - model = SentenceTransformer(model) - - results = [] - for prompt in prompts: - embeddings = model.encode(prompt) - - # return ST generated embeddings - result = {} - result['embeddings'] = embeddings - results.append(result) - - return results - - -# compare results -def compare_embedding_results(model: str, prompts: List[str], - warmup_shapes: List[Tuple[int, int]], - tensor_parallel_size: int, backend: str, - vllm_results: List[Dict[str, Any]], - hf_results: List[Dict[str, Any]]): - - print(f"\nmodel: {model:s}") - print(f"warmup shapes: {warmup_shapes}") - print(f"tp size: {tensor_parallel_size}") - print(f"backend: {backend:s}") - print(f"\n#prompts: {len(prompts):d}") - print(f"#HF results: {len(hf_results):d}" - f"{'' if len(hf_results) == len(prompts) else ' ERROR':s}") - print(f"#vLLM results: {len(vllm_results):d}" - f"{'' if len(vllm_results) == len(prompts) else ' ERROR':s}") - print() - - assert DISABLE_ASSERTS or len(hf_results) == len(vllm_results) - assert DISABLE_ASSERTS or len(hf_results) == len(prompts) - - for hf_result, vllm_result in zip(hf_results, vllm_results): - - sim = util.pytorch_cos_sim(hf_result["embeddings"], - vllm_result["embeddings"]) - - assert math.isclose(sim, 1.0, rel_tol=0.05) diff --git a/tests/spyre/test_spyre_basic.py b/tests/spyre/test_spyre_basic.py deleted file mode 100644 index 0cd53a72a..000000000 --- a/tests/spyre/test_spyre_basic.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Verification of vLLM output by comparing with HF - -Run `python -m pytest tests/spyre/test_spyre_basic.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_results, generate_hf_output, - generate_spyre_vllm_output) - -from vllm import SamplingParams - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompts", [[ - "Provide a list of instructions for preparing" - " chicken soup for a family of four.", "Hello", - "What is the weather today like?", "Who are you?" -]]) -@pytest.mark.parametrize("warmup_shape", [(64, 20, 4), (64, 20, 8), - (128, 20, 4), (128, 20, 8)] - ) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - warmup_shape: Tuple[int, int, int], - backend: str, -) -> None: - ''' - The warmup is based on a single shape. After the warmup, - one request with the provided prompts is input to vLLM. - The same prompts are also input to HF. The generated output - including text, token ids, and logprobs, is verified to be - identical for vLLM and HF. - - If errors occur, these can be analyzed/debugged by setting - 'DISABLE_ASSERTS = True' in spyre_util.py and by rerunning the - test using 'pytest --capture=no tests/spyre/test_spyre_basic.py' - After debugging, DISABLE_ASSERTS should be reset to 'False'. - ''' - - max_new_tokens = warmup_shape[1] - - vllm_sampling_params = SamplingParams( - max_tokens=max_new_tokens, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=True) - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=1, - backend=backend) - - hf_results = generate_hf_output(model=model, - prompts=prompts, - max_new_tokens=max_new_tokens) - - compare_results(model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - tensor_parallel_size=1, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/tests/spyre/test_spyre_embeddings.py b/tests/spyre/test_spyre_embeddings.py deleted file mode 100644 index 8c8056158..000000000 --- a/tests/spyre/test_spyre_embeddings.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Verification of vLLM output by comparing with HF - -Run `python -m pytest tests/spyre/test_spyre_embeddings.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_embedding_results, spyre_vllm_embeddings, - st_embeddings) - - -@pytest.mark.parametrize("model", ["/models/all-roberta-large-v1"]) -@pytest.mark.parametrize("prompts", [[ - "The capital of France is Paris." - "Provide a list of instructions for preparing" - " chicken soup for a family of four.", "Hello", - "What is the weather today like?", "Who are you?" -]]) -@pytest.mark.parametrize("warmup_shape", - [(64, 4), (64, 8), (128, 4), - (128, 8)]) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - warmup_shape: Tuple[int, int], - backend: str, -) -> None: - ''' - The warmup is based on a single shape. After the warmup, - one request with the provided prompts is input to vLLM. - The same prompts are also input to HF. The generated embeddings - are verified to be identical for vLLM and SentenceTransformers. - ''' - - vllm_results = spyre_vllm_embeddings(model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - max_model_len=256, - block_size=256, - tensor_parallel_size=1, - backend=backend) - - hf_results = st_embeddings(model=model, prompts=prompts) - - compare_embedding_results(model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - tensor_parallel_size=1, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/tests/spyre/test_spyre_max_new_tokens.py b/tests/spyre/test_spyre_max_new_tokens.py deleted file mode 100644 index fccb67084..000000000 --- a/tests/spyre/test_spyre_max_new_tokens.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Verification of vLLM output by comparing with HF - -Run `python -m pytest tests/spyre/test_spyre_max_new_tokens.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_results, generate_hf_output, - generate_spyre_vllm_output) - -from vllm import SamplingParams - -template = ( - "Below is an instruction that describes a task. Write a response that " - "appropriately completes the request. Be polite in your response to the " - "user.\n\n### Instruction:\n{}\n\n### Response:") - -prompt1 = template.format("Provide a recipe for chicken soup.") -prompt2 = template.format("Provide a list of instructions for preparing " - "chicken soup for a family of four.") - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompts", [[prompt1, prompt2, prompt2, prompt2], - [prompt2, prompt2, prompt2, prompt1], - [prompt2, prompt2, prompt2, prompt2]]) -@pytest.mark.parametrize("stop_last", [True, False]) -@pytest.mark.parametrize("warmup_shape", [(64, 10, 4)] - ) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - stop_last: bool, - warmup_shape: Tuple[int, int, int], - backend: str, -) -> None: - ''' - The warmup is based on a single shape. After the warmup, - one request with the provided prompts is input to vLLM. - The same prompts are also input to HF. The generated output - including text, token ids, and logprobs, is verified to be - identical for vLLM and HF. - - If errors occur, these can be analyzed/debugged by setting - 'DISABLE_ASSERTS = True' in spyre_util.py and by rerunning the - test using 'pytest --capture=no tests/spyre/test_spyre_max_new_tokens.py' - After debugging, DISABLE_ASSERTS should be reset to 'False'. - ''' - - max_new_tokens_warmup = warmup_shape[1] - max_new_tokens_early_stop = 1 - - vllm_sampling_params_normal = SamplingParams( - max_tokens=max_new_tokens_warmup, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=False) - - vllm_sampling_params_early_stop = SamplingParams( - max_tokens=max_new_tokens_early_stop, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=False) - - vllm_sampling_params = [vllm_sampling_params_normal] * 3 - hf_max_new_tokens = [max_new_tokens_warmup] * 3 - - # stop last or first sequence in batch early - if stop_last: - vllm_sampling_params = vllm_sampling_params + [ - vllm_sampling_params_early_stop - ] - hf_max_new_tokens = hf_max_new_tokens + [max_new_tokens_early_stop] - else: - vllm_sampling_params = [vllm_sampling_params_early_stop - ] + vllm_sampling_params - hf_max_new_tokens = [max_new_tokens_early_stop] + hf_max_new_tokens - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=1, - backend=backend) - - hf_results = generate_hf_output(model=model, - prompts=prompts, - max_new_tokens=hf_max_new_tokens) - - compare_results(model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - tensor_parallel_size=1, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/tests/spyre/test_spyre_max_prompt_length.py b/tests/spyre/test_spyre_max_prompt_length.py deleted file mode 100644 index e2fdd9e18..000000000 --- a/tests/spyre/test_spyre_max_prompt_length.py +++ /dev/null @@ -1,101 +0,0 @@ -"""Verification of handling prompt length exceeding warmup shapes - -Run `python -m pytest tests/spyre/test_spyre_max_prompt_length.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_results, generate_hf_output, - generate_spyre_vllm_output) -from transformers import AutoTokenizer - -from vllm import SamplingParams - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompts", [ - 7 * [ - "Hello", - "Below is an instruction that describes a task. Write a response" - " that appropriately completes the request. Be polite in your response" - " to the user. Provide a list of instructions for preparing chicken " - "soup for a family of four. Indicate if the weather forecast looks " - "good for today. Explain in a brief summary comprised of at most 50" - " words what you are." - ] -]) -@pytest.mark.parametrize("warmup_shapes", - [[(64, 20, 4)], [(64, 20, 4), (128, 20, 4)]] - ) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - warmup_shapes: List[Tuple[int, int, int]], - backend: str, -) -> None: - ''' - The warmup is based on one or multiple shapes. After the warmup, - one request with multiple provided prompts is input to vLLM. - At least one provided prompt should have a length longer than the - maximum prompt length defined by the warmup shapes. It is useful - to define enough prompts to fill multiple batches entirely and - partially, in order to test the maximum prompt length check - also in relation with the position of a prompt within a batch (not - likely that this will be an issue, but just to be sure). - It is verified that only for the prompts that - do not exceed the maximum prompt length, "non-empty" output is - generated. The output is verified using HF. - - If errors occur, these can be analyzed/debugged by setting - 'DISABLE_ASSERTS = True' in spyre_util.py and by rerunning the test - using 'pytest --capture=no tests/spyre/test_spyre_max_prompt_length.py' - After debugging, DISABLE_ASSERTS should be reset to 'False'. - ''' - - max_prompt_length = max([t[0] for t in warmup_shapes]) - max_new_tokens = max([t[1] for t in warmup_shapes]) - - vllm_sampling_params = SamplingParams( - max_tokens=max_new_tokens, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=True) - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=1, - backend=backend) - - hf_results = generate_hf_output(model=model, - prompts=prompts, - max_new_tokens=max_new_tokens) - - # for prompts longer than the max_prompt_length, the corresponding - # output in hf_results is reset to 'empty' in order to create the - # expected output for vLLM - hf_tokenizer = AutoTokenizer.from_pretrained(model) - for prompt_index, prompt in enumerate(prompts): - hf_input_tokens = hf_tokenizer(prompt, return_tensors="pt").input_ids - if len(hf_input_tokens[0]) > max_prompt_length: - hf_results[prompt_index] = { - 'text': '', - 'token_ids': (), - 'tokens': (), - 'logprobs': () - } - - compare_results(model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - tensor_parallel_size=1, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/tests/spyre/test_spyre_seed.py b/tests/spyre/test_spyre_seed.py deleted file mode 100644 index 01bebdce5..000000000 --- a/tests/spyre/test_spyre_seed.py +++ /dev/null @@ -1,75 +0,0 @@ -"""Verification of seeded random sampling to be deterministic - -Run `python -m pytest tests/spyre/test_spyre_seed.py`. -""" - -import math -from typing import Tuple - -import pytest -from spyre_util import generate_spyre_vllm_output - -from vllm import SamplingParams - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompt", [ - "Provide a list of instructions for preparing" - " chicken soup for a family of four." -]) -@pytest.mark.parametrize("temperature", [0.1, 1.0]) -@pytest.mark.parametrize("seed", [42]) -@pytest.mark.parametrize("warmup_shape", [(64, 20, 4), (64, 20, 8), - (128, 20, 4), (128, 20, 8)] - ) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_seed( - model: str, - prompt: str, - temperature: float, - seed: int, - warmup_shape: Tuple[int, int, int], - backend: str, -) -> None: - ''' - The warmup is based on a single shape. After the warmup, - output is generated for one request with 16 identical prompts - using random sampling (non-zero temperature) in combination - with a seed. The generated output, including text, token ids, - logprobs is verified to be identical for all 16 sequences. - ''' - - max_new_tokens = warmup_shape[1] - - prompts = [prompt] * 16 - - vllm_sampling_params = SamplingParams( - max_tokens=max_new_tokens, - temperature=temperature, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=True, - seed=seed) - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=[warmup_shape], - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=1, - backend=backend) - - # compare all generated outputs against the first generated output - for vllm_result in vllm_results: - assert vllm_result['text'] == vllm_results[0]['text'] - - # compare logprobs for all tokens between - # the current and the first sequence - assert len(vllm_result['logprobs']) == len(vllm_results[0]['logprobs']) - for token_id, logprob, token_id_0, logprob_0 in zip( - vllm_result['token_ids'], vllm_result['logprobs'], - vllm_results[0]['token_ids'], vllm_results[0]['logprobs']): - assert token_id == token_id_0 - assert math.isclose(logprob, logprob_0, rel_tol=0.1) diff --git a/tests/spyre/test_spyre_tensor_parallel.py b/tests/spyre/test_spyre_tensor_parallel.py deleted file mode 100644 index f6d2626fc..000000000 --- a/tests/spyre/test_spyre_tensor_parallel.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Verification of vLLM output by comparing with HF - -Run `python -m pytest tests/spyre/test_spyre_tensor_parallel.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_results, generate_hf_output, - generate_spyre_vllm_output) - -from vllm import SamplingParams - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompts", [[ - "Provide a list of instructions for preparing" - " chicken soup for a family of four.", "Hello", - "What is the weather today like?", "Who are you?" -]]) -@pytest.mark.parametrize("warmup_shapes", [[(64, 20, 4)]] - ) #,[(64,20,8)],[(128,20,4)],[(128,20,8)]]) -# (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("tp_size", [2]) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - warmup_shapes: List[Tuple[int, int, int]], - tp_size: int, - backend: str, -) -> None: - ''' - The warmup is based on one or multiple shapes. After the warmup, - one request with the provided prompts is input to vLLM which - is executed in tensor-parallel fashion on Spyres. - The same prompts are also input to HF. The generated output - including text, token ids, and logprobs, is verified to be - identical for vLLM and HF. - - If errors occur, these can be analyzed/debugged by setting - 'DISABLE_ASSERTS = True' in spyre_util.py and by rerunning the - test using 'pytest --capture=no tests/spyre/test_spyre_tensore_parallel.py' - After debugging, DISABLE_ASSERTS should be reset to 'False'. - ''' - - max_new_tokens = max([t[1] for t in warmup_shapes]) - - vllm_sampling_params = SamplingParams( - max_tokens=max_new_tokens, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=True) - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=tp_size, - backend=backend) - - hf_results = generate_hf_output(model=model, - prompts=prompts, - max_new_tokens=max_new_tokens) - - compare_results(model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - tensor_parallel_size=tp_size, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/tests/spyre/test_spyre_warmup_shapes.py b/tests/spyre/test_spyre_warmup_shapes.py deleted file mode 100644 index 675b9188f..000000000 --- a/tests/spyre/test_spyre_warmup_shapes.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Verification of Spyre warmup shapes - -Run `python -m pytest tests/spyre/test_spyre_warmup_shapes.py`. -""" - -from typing import List, Tuple - -import pytest -from spyre_util import (compare_results, generate_hf_output, - generate_spyre_vllm_output) - -from vllm import SamplingParams - - -@pytest.mark.parametrize("model", ["/models/llama-194m"]) -@pytest.mark.parametrize("prompts", [ - 7 * [ - "Hello", - "Below is an instruction that describes a task. Write a response that " - "appropriately completes the request. Be polite in your response to " - "the user. Provide a list of instructions for preparing chicken soup" - " for a family of four. Indicate if the weather forecast looks good " - "for today. Explain in a brief summary comprised of at most 50 words" - " what you are." - ] -]) -@pytest.mark.parametrize("warmup_shapes", [[(64, 20, 8), (128, 20, 4)]] - ) # (prompt_length/new_tokens/batch_size) -@pytest.mark.parametrize("backend", - ["eager"]) #, "inductor", "sendnn_decoder"]) -def test_output( - model: str, - prompts: List[str], - warmup_shapes: List[Tuple[int, int, int]], - backend: str, -) -> None: - ''' - The warmup is based on two shapes, that 'overlap' each - other. After the warmup, one request with the provided - prompts is input to vLLM. There should be at least one - prompt corresponding to each of the two warmup shapes. - It is useful to define enough prompts to fill multiple - batches entirely and partially, in order to test the - handling of overlapping warmup shapes also in relation - with the position of a prompt within a batch (not - likely that this will be an issue, but just to be sure). - The same prompts are also input to HF. The generated - output including text, token ids, and logprobs, is - verified to be identical for vLLM and HF. - - If errors occur, these can be analyzed/debugged by setting - 'DISABLE_ASSERTS = True' in spyre_util.py and by rerunning the - test using 'pytest --capture=no tests/spyre/test_spyre_warmup_shapes.py' - After debugging, DISABLE_ASSERTS should be reset to 'False'. - ''' - - max_new_tokens = max([t[1] for t in warmup_shapes]) - - vllm_sampling_params = SamplingParams( - max_tokens=max_new_tokens, - temperature=0, - logprobs=0, # return logprobs of generated tokens only - ignore_eos=True) - - vllm_results = generate_spyre_vllm_output( - model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - max_model_len=2048, - block_size=2048, - sampling_params=vllm_sampling_params, - tensor_parallel_size=1, - backend=backend) - - hf_results = generate_hf_output(model=model, - prompts=prompts, - max_new_tokens=max_new_tokens) - - compare_results(model=model, - prompts=prompts, - warmup_shapes=warmup_shapes, - tensor_parallel_size=1, - backend=backend, - vllm_results=vllm_results, - hf_results=hf_results) diff --git a/vllm/config.py b/vllm/config.py index 63b77ddcb..150192599 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -3,7 +3,6 @@ import enum import hashlib import json -import operator import sys import warnings from contextlib import contextmanager @@ -560,7 +559,6 @@ def _verify_quantization(self) -> None: "awq_marlin", "fbgemm_fp8", "compressed_tensors", "compressed-tensors", "experts_int8", "quark" ] - spyre_supported_quantization = ["gptq"] if self.quantization is not None: self.quantization = self.quantization.lower() @@ -603,11 +601,6 @@ def _verify_quantization(self) -> None: "%s quantization is not fully " "optimized yet. The speed can be slower than " "non-quantized models.", self.quantization) - if current_platform.is_spyre( - ) and self.quantization not in spyre_supported_quantization: - raise ValueError( - f"{self.quantization} quantization is currently not " - f"supported in Spyre Backend.") def _verify_cuda_graph(self) -> None: if self.max_seq_len_to_capture is None: @@ -1434,6 +1427,9 @@ class SchedulerConfig: chunked_prefill_enabled: bool = field(init=False) + # scheduler class or path + scheduler_cls: Union[str, Type[object]] = "vllm.core.scheduler.Scheduler" + def compute_hash(self) -> str: """ WARNING: Whenever a new field is added to this config, @@ -1492,40 +1488,6 @@ def __post_init__(self) -> None: self.max_num_batched_tokens) self.chunked_prefill_enabled = self.enable_chunked_prefill - from vllm.platforms import current_platform - self.spyre_scheduling_enabled = current_platform.is_spyre() - if self.spyre_scheduling_enabled: - # load warmup shapes and sort by "speed" - wup_prompt_lens = envs.VLLM_SPYRE_WARMUP_PROMPT_LENS or [] - wup_batch_sizes = envs.VLLM_SPYRE_WARMUP_BATCH_SIZES or [] - if len(wup_prompt_lens) != len(wup_batch_sizes): - raise RuntimeError( - "The lists in VLLM_SPYRE_WARMUP_PROMPT_LENS and " - "VLLM_SPYRE_WARMUP_BATCH_SIZES must have equal length") - if self.runner_type == "pooling": - wup_new_tokens = [0] * len(wup_prompt_lens) - else: - wup_new_tokens = envs.VLLM_SPYRE_WARMUP_NEW_TOKENS or [] - if len(wup_new_tokens) != len(wup_prompt_lens): - raise RuntimeError( - "The lists in VLLM_SPYRE_WARMUP_PROMPT_LENS and " - "VLLM_SPYRE_WARMUP_NEW_TOKENS must have equal length") - - print("[SchedulerConfig] VLLM_SPYRE_WARMUP_PROMPT_LENS =", - wup_prompt_lens) - print("[SchedulerConfig] VLLM_SPYRE_WARMUP_NEW_TOKENS =", - wup_new_tokens) - print("[SchedulerConfig] VLLM_SPYRE_WARMUP_BATCH_SIZES =", - wup_batch_sizes) - - self.spyre_warmup_shapes = tuple( - sorted([{ - 'prompt_length': pl, - 'new_tokens': nt, - 'batch_size': bs - } for pl, nt, bs in zip(wup_prompt_lens, wup_new_tokens, - wup_batch_sizes)], - key=operator.itemgetter('batch_size', 'prompt_length'))) self._verify_args() def _verify_args(self) -> None: @@ -1597,7 +1559,7 @@ def __init__(self, device: str = "auto") -> None: self.device_type = device # Some device types require processing inputs on CPU - if self.device_type in ["neuron", "spyre", "openvino"]: + if self.device_type in ["neuron", "openvino"]: self.device = torch.device("cpu") elif self.device_type in ["tpu"]: self.device = None diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 0834706c7..b3d396f9c 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -919,10 +919,6 @@ def _schedule_prefills( ignored_seq_groups: List[SequenceGroup] = [] seq_groups: List[ScheduledSequenceGroup] = [] - if self.scheduler_config.spyre_scheduling_enabled: - applicable_spyre_warmup_shapes = list( - self.scheduler_config.spyre_warmup_shapes) - waiting_queue = self.waiting leftover_waiting_sequences: Deque[SequenceGroup] = deque() @@ -1003,54 +999,6 @@ def _schedule_prefills( ): break - # check if current request can be scheduled based on the applicable - # spyre warmup shapes - if self.scheduler_config.spyre_scheduling_enabled: - max_tokens = 0 - if seq_group.sampling_params is not None and\ - seq_group.sampling_params.max_tokens is not None: - max_tokens = seq_group.sampling_params.max_tokens - updated_spyre_warmup_shapes = [ - shape for shape in applicable_spyre_warmup_shapes - if num_new_tokens <= shape['prompt_length'] - and max_tokens <= shape['new_tokens'] - and len(seq_groups) < shape['batch_size'] - ] - if not updated_spyre_warmup_shapes: - if not seq_groups: - # request was tested against all spyre warmup shapes: - # request cannot be processed - if (seq_group.sampling_params is not None - and seq_group.sampling_params.max_tokens - is not None): - logger.warning( - "No applicable warmup shape exists for " - "combination of prompt length (%d tokens) " - "and maximum number of output tokens to be " - "generated (%d tokens)", num_new_tokens, - seq_group.sampling_params.max_tokens) - else: - logger.warning( - "No applicable warmup shape exists for " - "combination of prompt length (%d tokens) " - "and undefined maximum number of output " - "tokens", num_new_tokens) - for seq in waiting_seqs: - seq.status = SequenceStatus.FINISHED_IGNORED - ignored_seq_groups.append(seq_group) - waiting_queue.popleft() - continue - else: - # request was only tested against spyre warmup shapes - # that remain after processing previous requests in - # waiting queue: request will be evaluated again in - # a future scheduling step - leftover_waiting_sequences.appendleft(seq_group) - waiting_queue.popleft() - continue - else: - applicable_spyre_warmup_shapes = updated_spyre_warmup_shapes - # Can schedule this request. if curr_loras is not None and lora_int_id > 0: curr_loras.add(lora_int_id) @@ -1084,15 +1032,6 @@ def _schedule_prefills( ) budget.add_num_seqs(seq_group.request_id, num_new_seqs) - # Check if number of scheduled requests has reached the maximum - # batch size of the applicable warmup shapes - if self.scheduler_config.spyre_scheduling_enabled and len( - seq_groups) >= max([ - shape['batch_size'] - for shape in applicable_spyre_warmup_shapes - ]): - break - # Queue requests that couldn't be scheduled. waiting_queue.extendleft(leftover_waiting_sequences) if len(seq_groups) > 0: @@ -1130,11 +1069,8 @@ def _schedule_default(self) -> SchedulerOutputs: running_scheduled = SchedulerRunningOutputs.create_empty() swapped_in = SchedulerSwappedInOutputs.create_empty() - # Schedule new prefills only when no requests have been swapped - # and all previous decodes have completed. - if not self.swapped and ( - not self.scheduler_config.spyre_scheduling_enabled - or not self.running): + # If any requests are swapped, prioritized swapped requests. + if not self.swapped: prefills = self._schedule_prefills(budget, curr_loras, enable_chunking=False) @@ -1335,13 +1271,8 @@ def _can_append_slots(self, seq_group: SequenceGroup, # chunked-prefill are enabled together. assert self.scheduler_config.is_multi_step and enable_chunking - if self.scheduler_config.spyre_scheduling_enabled: - # heuristic below doesn't make sense when using very large - # blocks - return True - else: - return self.block_manager.can_append_slots( - seq_group=seq_group, num_lookahead_slots=num_lookahead_slots) + return self.block_manager.can_append_slots( + seq_group=seq_group, num_lookahead_slots=num_lookahead_slots) def _allow_async_output_proc(self, seq_group: SequenceGroup) -> bool: # async_output_proc is allowed only when we have a single sequence diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 74f184fb3..18363da33 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -37,7 +37,6 @@ "openvino", "tpu", "xpu", - "spyre", "hpu", ] diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 88c21f9a6..695d6fa0a 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -17,8 +17,7 @@ from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, ObservabilityConfig, ParallelConfig, SchedulerConfig, VllmConfig) -from vllm.core.scheduler import (ScheduledSequenceGroup, Scheduler, - SchedulerOutputs) +from vllm.core.scheduler import ScheduledSequenceGroup, SchedulerOutputs from vllm.engine.arg_utils import EngineArgs from vllm.engine.metrics_types import StatLoggerBase, Stats from vllm.engine.output_processor.interfaces import ( @@ -56,7 +55,8 @@ BaseTokenizerGroup, init_tokenizer_from_configs) from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled, usage_message) -from vllm.utils import Counter, Device, deprecate_kwargs, weak_bind +from vllm.utils import (Counter, Device, deprecate_kwargs, + resolve_obj_by_qualname, weak_bind) from vllm.version import __version__ as VLLM_VERSION logger = init_logger(__name__) @@ -344,6 +344,12 @@ def get_tokenizer_for_seq(sequence: Sequence) -> AnyTokenizer: # Create the scheduler. # NOTE: the cache_config here have been updated with the numbers of # GPU and CPU blocks, which are profiled in the distributed executor. + + if isinstance(self.vllm_config.scheduler_config.scheduler_cls, str): + Scheduler = resolve_obj_by_qualname( + self.vllm_config.scheduler_config.scheduler_cls) + else: + Scheduler = self.vllm_config.scheduler_config.scheduler_cls self.scheduler = [ Scheduler( self.scheduler_config, self.cache_config, self.lora_config, diff --git a/vllm/envs.py b/vllm/envs.py index c8f4b0fb3..b7b597ea1 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -67,9 +67,6 @@ VLLM_USE_TRITON_AWQ: bool = False VLLM_ALLOW_RUNTIME_LORA_UPDATING: bool = False VLLM_SKIP_P2P_CHECK: bool = False - VLLM_SPYRE_WARMUP_PROMPT_LENS: Optional[List[int]] = None - VLLM_SPYRE_WARMUP_NEW_TOKENS: Optional[List[int]] = None - VLLM_SPYRE_WARMUP_BATCH_SIZES: Optional[List[int]] = None VLLM_DISABLED_KERNELS: List[str] = [] VLLM_USE_V1: bool = False VLLM_ENABLE_V1_MULTIPROCESSING: bool = True @@ -470,41 +467,6 @@ def get_default_config_root(): lambda: float(os.getenv("VLLM_LOG_BATCHSIZE_INTERVAL", "-1")), "VLLM_DISABLE_COMPILE_CACHE": lambda: bool(int(os.getenv("VLLM_DISABLE_COMPILE_CACHE", "0"))), - - # Defines the prompt lengths the Spyre accelerator should be prepared - # for, formatted as comma separated list. - "VLLM_SPYRE_WARMUP_PROMPT_LENS": - lambda: [ - int(p) for p in os.getenv(key='VLLM_SPYRE_WARMUP_PROMPT_LENS', - default='64').split(',') - ], - - # Defines the max output tokens the Spyre accelerator should be prepared - # for, formatted as comma separated list. - "VLLM_SPYRE_WARMUP_NEW_TOKENS": - lambda: [ - int(d) for d in os.getenv(key='VLLM_SPYRE_WARMUP_NEW_TOKENS', - default='20').split(',') - ], - - # Defines the batch sizes the Spyre accelerator should be prepared - # for, formatted as comma separated list. - "VLLM_SPYRE_WARMUP_BATCH_SIZES": - lambda: [ - int(b) for b in os.getenv(key='VLLM_SPYRE_WARMUP_BATCH_SIZES', - default='1').split(',') - ], - - # Defines the backend that torch.compile will use when using Spyre - # Available options: - # - "sendnn_decoder": Compile for execution on Spyre hardware for - # decoder models - # - "sendnn": Compile for execution on Spyre hardware for - # encoder models - # - "inductor": Compile for execution on CPU (for debug and testing) - # - "eager": Skip compile entirely (for debug and testing - "VLLM_SPYRE_DYNAMO_BACKEND": - lambda: os.getenv("VLLM_SPYRE_DYNAMO_BACKEND", "sendnn_decoder"), } # end-env-vars-definition diff --git a/vllm/model_executor/model_loader/spyre.py b/vllm/model_executor/model_loader/spyre.py deleted file mode 100644 index b1479c3c6..000000000 --- a/vllm/model_executor/model_loader/spyre.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Utilities for selecting and loading Spyre models.""" -import sys -from typing import Optional - -import torch -import torch._inductor.config -import torch.distributed as dist -import torch.nn as nn -from fms.models import get_model -from transformers import PretrainedConfig - -import vllm.envs as envs -from vllm.config import ModelConfig, ParallelConfig -from vllm.logger import init_logger -from vllm.model_executor.layers.logits_processor import LogitsProcessor -from vllm.model_executor.layers.sampler import Sampler, SamplerOutput -from vllm.model_executor.sampling_metadata import SamplingMetadata - -try: - from torch_sendnn import torch_sendnn # noqa: F401 -except ImportError: - print("WARNING: Disabled: torch_sendnn") - pass -try: - import backends.dynamo_tracer # noqa: F401 -except ImportError: - print("WARNING: Disabled: dynamo_tracer") - pass - -BACKEND_LIST = ['sendnn_decoder', 'inductor'] - -logger = init_logger(__name__) - - -class SpyreCausalLM(nn.Module): - - def __init__( - self, - config: PretrainedConfig, - ) -> None: - super().__init__() - self.config = config - self.logits_processor = LogitsProcessor(config.vocab_size, - logits_as_input=True) - self.sampler = Sampler() - self.past_key_value_states = None - self.dtype = torch.float16 if envs.VLLM_SPYRE_DYNAMO_BACKEND == \ - 'sendnn_decoder' else torch.float32 - # boolean tensor of length batch size with indices: - # True for unfinished sequences and - # False for finished or padded sequences - self.indices = None - - # Lazy initialized - self.model: nn.Module - - def forward( - self, - input_ids: torch.Tensor, - positions: torch.Tensor, - masks: torch.Tensor, - is_prompt: bool, - ) -> torch.Tensor: - - if is_prompt: - self.past_key_value_states = None - - extra_kwargs = {} - if envs.VLLM_SPYRE_DYNAMO_BACKEND != "sendnn_decoder": - # Bug in 2.3.1 fixed in 2.4.1 for SDPA flash - # cpu impl when padding too much - extra_kwargs["attn_algorithm"] = "math" - - output = self.model( - input_ids, - position_ids=positions, - mask=masks, - past_key_value_states=self.past_key_value_states, - use_cache=True, - only_last_token=True, - **extra_kwargs, - ) - - logits, past_key_value_states = output - self.past_key_value_states = past_key_value_states - - # mark dynamic - if self.past_key_value_states is not None: - for layer in self.past_key_value_states: - for tensor in layer: - torch._dynamo.mark_dynamic(tensor, 2) - - # removing finished or padded sequences - logits = logits[self.indices] - - return logits - - def compute_logits(self, hidden_states: torch.Tensor, - sampling_metadata: SamplingMetadata) -> torch.Tensor: - logits = self.logits_processor(None, hidden_states, sampling_metadata) - return logits - - def sample( - self, - logits: torch.Tensor, - sampling_metadata: SamplingMetadata, - ) -> Optional[SamplerOutput]: - next_tokens = self.sampler(logits, sampling_metadata) - return next_tokens - - def load_weights(self, model_config: ModelConfig, max_prompt_length: int, - max_decode_length: int, - distributed_strategy: Optional[str], **kwargs): - - if self.dtype is not model_config.dtype: - logger.info( - "Ignoring user-provided dtype=%s and using dtype=%s instead.", - model_config.dtype, self.dtype) - - if model_config.quantization == "gptq": - - # note, we have to find a better way to package this - # shouldn't it be part of FMS? - sys.path.append("/home/senuser/aiu-fms") - - if envs.VLLM_SPYRE_DYNAMO_BACKEND == "sendnn_decoder": - from aiu_as_addon import aiu_adapter, aiu_linear # noqa: F401 - linear_type = "gptq_aiu" - print("Loaded `aiu_as_addon` functionalities") - else: - from cpu_addon import cpu_linear # noqa: F401 - linear_type = "gptq_cpu" - print("Loaded `cpu_addon` functionalities") - - quant_cfg = model_config._parse_quant_hf_config() - - linear_config = { - "linear_type": linear_type, - "group_size": quant_cfg['group_size'], - "desc_act": quant_cfg['desc_act'], - } - data_type = None - model_source = "hf_gptq_aiu" - else: - linear_config = {"linear_type": "torch_linear"} - data_type = self.dtype - model_source = "hf" - - # we can use fused weights unless running on Spyre - fused_weights = envs.VLLM_SPYRE_DYNAMO_BACKEND != "sendnn_decoder" - - self.model = get_model(architecture="hf_configured", - variant=model_config.model, - model_path=model_config.model, - source=model_source, - data_type=data_type, - distributed_strategy=distributed_strategy, - group=dist.group.WORLD, - fused_weights=fused_weights, - linear_config=linear_config) - - compile_mode = "default" - - self.model.eval() - torch.set_grad_enabled(False) - - _target_cache_size = max(int(max_decode_length * 2), - int(max_prompt_length * 2.5)) - if hasattr(torch._dynamo.config, "accumulated_cache_size_limit") and \ - _target_cache_size > torch._dynamo.config.\ - accumulated_cache_size_limit: - _prev = torch._dynamo.config.accumulated_cache_size_limit - torch._dynamo.config.accumulated_cache_size_limit = \ - _target_cache_size - print("NOTICE: Adjusting " - "torch._dynamo.config.accumulated_cache_size_limit" - f" from {_prev} to " - f"{torch._dynamo.config.accumulated_cache_size_limit} " - f"to accommodate prompt size of {max_prompt_length} " - f"and decode tokens of {max_decode_length}") - - if _target_cache_size > torch._dynamo.config.cache_size_limit: - _prev = torch._dynamo.config.cache_size_limit - torch._dynamo.config.cache_size_limit = _target_cache_size - print( - "NOTICE: Adjusting torch._dynamo.config.cache_size_limit from" - f" {_prev} to {torch._dynamo.config.cache_size_limit} to " - f"accommodate prompt size of {max_prompt_length} and " - f"decode tokens of {max_decode_length}") - - if envs.VLLM_SPYRE_DYNAMO_BACKEND in BACKEND_LIST: - self.model = torch.compile(self.model, - mode=compile_mode, - backend=envs.VLLM_SPYRE_DYNAMO_BACKEND) - - -def get_spyre_model(model_config: ModelConfig, parallel_config: ParallelConfig, - max_prompt_length, max_decode_length) -> nn.Module: - - # Create a model instance. - model = SpyreCausalLM(model_config.hf_config) - - # Load the weights from the cached or downloaded files. - model.load_weights( - model_config, - max_prompt_length=max_prompt_length, - max_decode_length=max_decode_length, - distributed_strategy="tp" if parallel_config.world_size > 1 else None) - - return model diff --git a/vllm/model_executor/model_loader/spyre_setup.py b/vllm/model_executor/model_loader/spyre_setup.py deleted file mode 100644 index 4ebb2c536..000000000 --- a/vllm/model_executor/model_loader/spyre_setup.py +++ /dev/null @@ -1,145 +0,0 @@ -import json -import os -import sys - -import torch - -# ============================================================== -# Common utilities -# ============================================================== -#------------- -# Discover the world size and my rank (envars set by torchrun) -# https://pytorch.org/docs/stable/elastic/run.html#environment-variables -#------------- -local_rank = int(os.getenv("LOCAL_RANK", 0)) -rank = int(os.getenv("RANK", 0)) -world_rank = rank -world_size = int(os.getenv("WORLD_SIZE", 1)) - -def dprint(text): - print(f"[{rank:2d}/{world_size:2d}]: {text}") - -# ============================================================== -# Common setup -# ============================================================== -def spyre_setup(rank=0, world_size=1, local_rank=0, local_size=1, verbose=False): - # ------------- - # Envar setup for backend - # ------------- - # Environment variable created by the runtime to identify the specific Spyre card that is assigned to this rank - spyre_config_file_envar = "AIU_CONFIG_FILE_" + str(rank) - - # Default to senulator backend unless user specified otherwise - os.environ.setdefault("FLEX_COMPUTE", "SENULATOR") - os.environ.setdefault("FLEX_DEVICE", "MOCK") - - # Each rank needs a unique space to write its binaries - # For both 'export' and '__pycache' - # https://docs.python.org/3/library/sys.html#sys.pycache_prefix - os.environ.setdefault("DEEPRT_EXPORT_DIR", "export") - os.environ.setdefault("DTCOMPILER_EXPORT_DIR", "export") - if world_size > 1: - os.environ["DEEPRT_EXPORT_DIR"] += f"/{rank}" - os.environ["DTCOMPILER_EXPORT_DIR"] += f"/{rank}" - sys.pycache_prefix=os.getenv("DEEPRT_EXPORT_DIR")+"/py-" + str(rank) - os.environ.setdefault("DTCOMPILER_KEEP_EXPORT", "1") - - # Inform Flex of the size of this job - os.environ.setdefault("FLEX_RDMA_WORLD_SIZE", str(world_size)) - os.environ.setdefault("FLEX_RDMA_WORLD_RANK", str(rank)) - os.environ.setdefault("FLEX_RDMA_LOCAL_SIZE", str(world_size)) - os.environ.setdefault("FLEX_RDMA_LOCAL_RANK", str(rank)) - for peer_rank in range(world_size): - pcie_env_str="AIU_WORLD_RANK_"+str(peer_rank) - flex_env_str="FLEX_RDMA_PCI_BUS_ADDR_"+str(peer_rank) - if os.getenv(pcie_env_str) is None: - raise RuntimeError(f"Error: The environment variable {pcie_env_str} is not defined") - if os.getenv(flex_env_str) is None: - raise RuntimeError(f"Error: The environment variable {flex_env_str} is not defined") - if os.getenv("DUMP_MEMMAP") is not None: - if os.getenv("SDSC_REF_DIR") is None: - os.environ["SDSC_REF_DIR"] = os.environ["DEEPRT_EXPORT_DIR"] - else: - os.environ["SDSC_REF_DIR"] += f"/{rank}" - assert ( - os.getenv("DUMP_MEMMAP_DIR") is not None - ), "DUMP_MEMMAP_DIR not set while DUMP_MEMMAP set" - os.environ["DUMP_MEMMAP_DIR"] += f"/{rank}" - os.makedirs( - os.environ["DUMP_MEMMAP_DIR"], exist_ok=True - ) # directory needs to exist - - for peer_rank in range(world_size): - pcie_env_str = "AIU_WORLD_RANK_" + str(peer_rank) - flex_env_str = "FLEX_RDMA_PCI_BUS_ADDR_" + str(peer_rank) - if os.getenv("FLEX_COMPUTE") == "SENULATOR": - if os.getenv(pcie_env_str) is not None: - os.environ[flex_env_str] = os.getenv(pcie_env_str) - else: - os.environ[pcie_env_str] = f"0000:{rank:02x}:01.0" - os.environ[flex_env_str] = f"0000:{rank:02x}:01.0" - else: - if os.getenv(flex_env_str) is None: - if os.getenv("PCIDEVICE_IBM_COM_SENTIENT_PF") is not None: - os.environ[pcie_env_str] = os.getenv( - "PCIDEVICE_IBM_COM_SENTIENT_PF" - ) - - if os.getenv(pcie_env_str) is not None: - os.environ[flex_env_str] = os.getenv(pcie_env_str) - else: - raise RuntimeError( - f"[{rank}/{world_size}]: ERROR: {flex_env_str} and {pcie_env_str} were not set for peer {peer_rank}." - ) - if rank == 0 and verbose: - dprint(f"PCI Addr Rank {peer_rank} {pcie_env_str}={os.environ[pcie_env_str]}") - dprint(f"PCI Addr Rank {peer_rank} {flex_env_str}={os.environ[flex_env_str]}") - - if rank == 0 and verbose: - dprint(f"FLEX_COMPUTE=" + os.getenv("FLEX_COMPUTE")) - dprint(f"FLEX_DEVICE=" + os.getenv("FLEX_DEVICE")) - dprint(f"DEEPRT_EXPORT_DIR=" + os.getenv("DEEPRT_EXPORT_DIR")) - dprint(f"DTCOMPILER_EXPORT_DIR=" + os.getenv("DTCOMPILER_EXPORT_DIR")) - if os.getenv(spyre_config_file_envar) is not None: - dprint(f"{spyre_config_file_envar}=" + os.environ[spyre_config_file_envar]) - if os.getenv("SENLIB_DEVEL_CONFIG_FILE") is not None: - dprint(f"SENLIB_DEVEL_CONFIG_FILE=" + os.environ["SENLIB_DEVEL_CONFIG_FILE"]) - if os.getenv(flex_env_str) is not None: - dprint(f"{flex_env_str}=" + os.environ[flex_env_str]) - dprint(f"FLEX_RDMA_LOCAL_RANK=" + os.getenv("FLEX_RDMA_LOCAL_RANK")) - dprint(f"FLEX_RDMA_LOCAL_SIZE=" + os.getenv("FLEX_RDMA_LOCAL_SIZE")) - dprint(f"FLEX_RDMA_WORLD_RANK=" + os.getenv("FLEX_RDMA_WORLD_RANK")) - dprint(f"FLEX_RDMA_WORLD_SIZE=" + os.getenv("FLEX_RDMA_WORLD_SIZE")) - - if os.getenv("FLEX_COMPUTE") == "SENTIENT": - pcie_env_str = "AIU_WORLD_RANK_" + str(rank) - if os.getenv(pcie_env_str) is not None: - device_id = os.getenv(pcie_env_str) - else: - with open(os.getenv(spyre_config_file_envar)) as fd: - data = json.load(fd) - device_id = data["GENERAL"]["sen_bus_id"] - dprint(f"Spyre: Enabled ({device_id})") - else: - dprint(f"Spyre: Disabled (Senulator)") - - -# ============================================================== -# Distributed setup -# ============================================================== -def spyre_dist_setup(rank, world_size, local_rank=-0, local_size=-1, verbose=False): - if local_rank < 0: - local_rank = rank - if local_size < 0: - local_size = world_size - - if os.getenv("TORCHELASTIC_RUN_ID") is None: - os.environ["MASTER_ADDR"] = "localhost" - os.environ["MASTER_PORT"] = "12355" - elif rank == 0 or verbose: - dprint(f"Detected running via torchrun") - - if rank == 0 or verbose: - dprint(f"Parallel Backend: {torch.distributed.get_backend()}") - - spyre_setup(rank, world_size) \ No newline at end of file diff --git a/vllm/platforms/__init__.py b/vllm/platforms/__init__.py index 7f558a29f..6ca95b41d 100644 --- a/vllm/platforms/__init__.py +++ b/vllm/platforms/__init__.py @@ -129,17 +129,6 @@ def openvino_platform_plugin() -> Optional[str]: return "vllm.platforms.openvino.OpenVinoPlatform" if is_openvino else None -def spyre_platform_plugin() -> Optional[str]: - is_spyre = False - try: - from importlib.metadata import version - is_spyre = "spyre" in version("vllm") - except Exception: - pass - - return "vllm.platforms.spyre.SpyrePlatform" if is_spyre else None - - builtin_platform_plugins = { 'tpu': tpu_platform_plugin, 'cuda': cuda_platform_plugin, @@ -149,7 +138,6 @@ def spyre_platform_plugin() -> Optional[str]: 'cpu': cpu_platform_plugin, 'neuron': neuron_platform_plugin, 'openvino': openvino_platform_plugin, - 'spyre': spyre_platform_plugin, } diff --git a/vllm/platforms/interface.py b/vllm/platforms/interface.py index 03b5be092..f2ecec320 100644 --- a/vllm/platforms/interface.py +++ b/vllm/platforms/interface.py @@ -47,7 +47,6 @@ class PlatformEnum(enum.Enum): NEURON = enum.auto() OPENVINO = enum.auto() OOT = enum.auto() - SPYRE = enum.auto() UNSPECIFIED = enum.auto() @@ -130,9 +129,6 @@ def is_neuron(self) -> bool: def is_openvino(self) -> bool: return self._enum == PlatformEnum.OPENVINO - def is_spyre(self) -> bool: - return self._enum == PlatformEnum.SPYRE - def is_out_of_tree(self) -> bool: return self._enum == PlatformEnum.OOT diff --git a/vllm/platforms/spyre.py b/vllm/platforms/spyre.py deleted file mode 100644 index 1d4578584..000000000 --- a/vllm/platforms/spyre.py +++ /dev/null @@ -1,68 +0,0 @@ -from typing import TYPE_CHECKING, Optional - -import torch - -from vllm.logger import init_logger - -if TYPE_CHECKING: - from vllm.config import VllmConfig -else: - VllmConfig = None -import vllm.envs as envs - -from .interface import Platform, PlatformEnum - -logger = init_logger(__name__) - - -class SpyrePlatform(Platform): - _enum = PlatformEnum.SPYRE - device_name: str = "spyre" - device_type: str = "spyre" - - @classmethod - def get_device_name(cls, device_id: int = 0) -> str: - return "spyre" - - @classmethod - def is_async_output_supported(cls, enforce_eager: Optional[bool]) -> bool: - """ - Check if the current platform supports async output. - """ - return False - - @classmethod - def check_and_update_config(cls, vllm_config: VllmConfig) -> None: - parallel_config = vllm_config.parallel_config - scheduler_config = vllm_config.scheduler_config - - if scheduler_config.is_multi_step: - raise NotImplementedError - - if parallel_config.worker_cls == "auto": - if envs.VLLM_USE_V1: - raise NotImplementedError - else: - parallel_config.worker_cls = \ - "vllm.worker.spyre_worker.SpyreWorker" - - cache_config = vllm_config.cache_config - if cache_config: - # spyre needs block_size = max_model_len - vllm_config.cache_config.block_size = \ - vllm_config.model_config.max_model_len - - @classmethod - def is_pin_memory_available(cls) -> bool: - logger.warning("Pin memory is not supported on Spyre.") - return False - - @classmethod - def inference_mode(cls): - """A device-specific wrapper of `torch.inference_mode`. - - This wrapper is recommended because some hardware backends such as TPU - do not support `torch.inference_mode`. In such a case, they will fall - back to `torch.no_grad` by overriding this method. - """ - return torch.no_grad() diff --git a/vllm/worker/spyre_embedding_model_runner.py b/vllm/worker/spyre_embedding_model_runner.py deleted file mode 100644 index 8ef3d744d..000000000 --- a/vllm/worker/spyre_embedding_model_runner.py +++ /dev/null @@ -1,193 +0,0 @@ -import time -from typing import Dict, Iterable, List, Optional, Tuple - -import torch -from transformers import AutoModel - -import vllm.envs as envs -from vllm.config import (DeviceConfig, ModelConfig, ParallelConfig, - SchedulerConfig) -from vllm.logger import init_logger -from vllm.model_executor.layers.pooler import Pooler, PoolingType -from vllm.model_executor.pooling_metadata import PoolingMetadata -from vllm.pooling_params import PoolingParams -from vllm.sequence import (IntermediateTensors, PoolerOutput, SequenceData, - SequenceGroupMetadata) - -from .spyre_model_runner import ModelInputForSpyre, SpyreModelRunner - -logger = init_logger(__name__) - -BACKEND_LIST = ['sendnn', 'inductor'] - - -class SpyreEmbeddingModelRunner(SpyreModelRunner): - - # Map of request_id -> generator used for seeded random sampling - generators: Dict[str, torch.Generator] = {} - - def __init__( - self, - model_config: ModelConfig, - parallel_config: ParallelConfig, - scheduler_config: SchedulerConfig, - device_config: DeviceConfig, - is_driver_worker: bool, - ): - super().__init__(model_config=model_config, - parallel_config=parallel_config, - scheduler_config=scheduler_config, - device_config=device_config, - is_driver_worker=is_driver_worker) - - pooler_config = model_config.pooler_config - self.pooler = Pooler.from_config_with_defaults( - pooler_config, - pooling_type=PoolingType.CLS, - normalize=True, - softmax=False) - - def load_model(self, prompt_lens: Iterable[int], - num_decode_tokens: Iterable[int]) -> None: - self.model = AutoModel.from_pretrained(self.model_config.model) - self.model.eval() - torch.set_grad_enabled(False) - if envs.VLLM_SPYRE_DYNAMO_BACKEND in BACKEND_LIST: - self.model = torch.compile(self.model, - mode="default", - dynamic=False, - backend=envs.VLLM_SPYRE_DYNAMO_BACKEND) - - @property - def vocab_size(self) -> int: - return self.model.config.vocab_size - - def prepare_input_tensors( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - finished_requests_ids: Optional[List[str]] = None, - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, PoolingMetadata]: - # NOTE: We assume that all sequences in the group are all prompts - (input_tokens, input_positions, input_masks, - seq_lens) = self._prepare_prompt(seq_group_metadata_list) - - pooling_metadata = self._prepare_pooling( - seq_group_metadata_list=seq_group_metadata_list, - prompt_lens=seq_lens) - return (input_tokens, input_positions, input_masks, pooling_metadata) - - def _prepare_pooling( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - prompt_lens: List[int], - ) -> PoolingMetadata: - """Prepare PoolingMetadata for the sequence group metadata list.""" - seq_groups: List[Tuple[List[int], PoolingParams]] = [] - for i, seq_group_metadata in enumerate(seq_group_metadata_list): - seq_ids = list(seq_group_metadata.seq_data.keys()) - pooling_params = seq_group_metadata.pooling_params - seq_groups.append((seq_ids, pooling_params)) - - seq_data: Dict[int, SequenceData] = {} - for seq_group_metadata in seq_group_metadata_list: - seq_data.update(seq_group_metadata.seq_data) - - pooling_metadata = PoolingMetadata( - seq_groups=seq_groups, - seq_data=seq_data, - prompt_lens=prompt_lens, - ) - - return pooling_metadata - - def pad_input_ids( - self, - input_ids_list: List[torch.Tensor], - min_pad_length: int = 0, - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - - padded_input_ids_list, mask_list, position_ids_list = self.\ - _prepare_pad_input_ids(input_ids_list, min_pad_length) - - input_ids = torch.stack(padded_input_ids_list) - mask = torch.stack(mask_list) - position_ids = torch.stack(position_ids_list) - - return input_ids, position_ids, mask - - def prepare_model_input( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - virtual_engine: int = 0, - finished_requests_ids: Optional[List[str]] = None - ) -> ModelInputForSpyre: - - (input_tokens, input_positions, input_masks, - pooling_metadata) = self.prepare_input_tensors( - seq_group_metadata_list, finished_requests_ids) - - return ModelInputForSpyre(input_tokens=input_tokens, - input_positions=input_positions, - input_masks=input_masks, - pooling_metadata=pooling_metadata) - - def execute_model( - self, - model_input: ModelInputForSpyre, - kv_caches: Optional[List[torch.Tensor]] = None, - intermediate_tensors: Optional[IntermediateTensors] = None, - num_steps: int = 1, - **kwargs, - ) -> Optional[List[PoolerOutput]]: - - t0 = time.time() - - outputs = self.model( - input_ids=model_input.input_tokens, - # Let the Embedding layer use it's default - # because the rules can be a bit different - # e.g. For Roberta models the inputs start - # at padding_inx +1 - #position_ids=input_positions, - attention_mask=model_input.input_masks) - hidden_states = outputs["last_hidden_state"] - - unpadded = [] - max_len = hidden_states.shape[1] - - if model_input.pooling_metadata is not None: - for i, seq_len in enumerate( - model_input.pooling_metadata.prompt_lens): - unpadded.append(hidden_states[i, max_len - seq_len:, :]) - - hidden_states = torch.concat(unpadded) - - pooler_output = self.pooler( - hidden_states=hidden_states, - pooling_metadata=model_input.pooling_metadata) - - t1 = time.time() - t0 - print("[spyre_model_runner:execute_model] t_token: %.2fms" % - (t1 * 1000)) - - return [pooler_output] - - def _raw_model_forward( - self, - input_ids: torch.Tensor, - mask: Optional[torch.Tensor] = None, - position_ids: Optional[torch.Tensor] = None, - past_key_value_states: Optional[List[Tuple[torch.Tensor, - torch.Tensor]]] = None, - use_cache: bool = False, - only_last_token: bool = False, - attn_algorithm: Optional[str] = None - ) -> Tuple[torch.Tensor, Optional[List[Tuple[torch.Tensor, - torch.Tensor]]]]: - - hidden_states, _ = self.model( - input_ids=input_ids, - attention_mask=mask, - #position_ids=position_ids - ) - return hidden_states, None diff --git a/vllm/worker/spyre_model_runner.py b/vllm/worker/spyre_model_runner.py deleted file mode 100644 index dbf917bff..000000000 --- a/vllm/worker/spyre_model_runner.py +++ /dev/null @@ -1,421 +0,0 @@ -import time -from dataclasses import dataclass -from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, - Type, TypeVar) - -import torch -from torch import nn - -from vllm.config import (DeviceConfig, ModelConfig, ParallelConfig, - SchedulerConfig) -from vllm.logger import init_logger -from vllm.model_executor import SamplingMetadata -from vllm.model_executor.layers.sampler import SamplerOutput -from vllm.model_executor.model_loader.spyre import get_spyre_model -from vllm.sequence import IntermediateTensors, SequenceGroupMetadata -from vllm.utils import is_pin_memory_available -from vllm.worker.model_runner_base import ( - ModelRunnerBase, ModelRunnerInputBase, - _add_sampling_metadata_broadcastable_dict, - _init_sampling_metadata_from_tensor_dict) - -if TYPE_CHECKING: - from vllm.attention.backends.abstract import AttentionBackend - from vllm.model_executor.pooling_metadata import PoolingMetadata - -logger = init_logger(__name__) - -TModelInputForSpyre = TypeVar('TModelInputForSpyre', - bound="ModelInputForSpyre") - - -@dataclass(frozen=True) -class ModelInputForSpyre(ModelRunnerInputBase): - """ - Used by the SpyreModelRunner. - """ - input_tokens: Optional[torch.Tensor] = None - input_positions: Optional[torch.Tensor] = None - input_masks: Optional[torch.Tensor] = None - sampling_metadata: Optional[SamplingMetadata] = None - pooling_metadata: Optional["PoolingMetadata"] = None - is_prompt: Optional[bool] = None - # unused - virtual_engine: Optional[int] = None - - def as_broadcastable_tensor_dict(self) -> Dict[str, Any]: - tensor_dict = { - "input_tokens": self.input_tokens, - "input_positions": self.input_positions, - "input_masks": self.input_masks, - "is_prompt": self.is_prompt, - } - _add_sampling_metadata_broadcastable_dict(tensor_dict, - self.sampling_metadata) - return tensor_dict - - @classmethod - def from_broadcasted_tensor_dict( - cls: Type[TModelInputForSpyre], - tensor_dict: Dict[str, Any], - attn_backend: Optional["AttentionBackend"] = None, - ) -> TModelInputForSpyre: - tensor_dict = _init_sampling_metadata_from_tensor_dict(tensor_dict) - return cls(**tensor_dict) - - -class SpyreModelRunner(ModelRunnerBase[ModelInputForSpyre]): - - def __init__( - self, - model_config: ModelConfig, - parallel_config: ParallelConfig, - scheduler_config: SchedulerConfig, - device_config: DeviceConfig, - is_driver_worker: bool, - ): - self.model_config = model_config - self.parallel_config = parallel_config - self.scheduler_config = scheduler_config - self.device_config = device_config - self.is_driver_worker = is_driver_worker - - self.pad_token_id = 0 - if model_config is not None: - if model_config.hf_config is not None: - self.pad_token_id = getattr(model_config.hf_config, - "pad_token_id", None) or 0 - if model_config.get_sliding_window(): - logger.warning("Sliding window is not supported on Spyre. " - "The model will run without sliding window.") - self.device_config = (device_config - if device_config is not None else DeviceConfig()) - self.device = self.device_config.device - self.pin_memory = is_pin_memory_available() - # position_ids of all the sequences in current batch - self._position_ids: torch.Tensor = None - # attention masks of all the sequences in current batch - self._mask: torch.Tensor = None - # mapping: request id to index in batch - self._req_ids2idx: dict = {} - # Lazy initialization: after load_model. - self.model: nn.Module - - def load_model(self, prompt_lens: Iterable[int], - num_decode_tokens: Iterable[int]) -> None: - max_pad_length = max(prompt_lens) - max_decode_length = max(num_decode_tokens) - self.model = get_spyre_model(self.model_config, - parallel_config=self.parallel_config, - max_prompt_length=max_pad_length, - max_decode_length=max_decode_length) - - @property - def vocab_size(self) -> int: - return self.model.model.config.src_vocab_size - - def _prepare_prompt( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, List[int]]: - assert len(seq_group_metadata_list) > 0 - input_token_list: List[torch.Tensor] = [] - - # find warmup shape to be used for padding and batching - applicable_spyre_warmup_shapes = [ - shape for shape in self.scheduler_config.spyre_warmup_shapes - if len(seq_group_metadata_list) <= shape['batch_size'] - ] - for seq_group_metadata in seq_group_metadata_list: - seq_data = seq_group_metadata.seq_data[list( - seq_group_metadata.seq_data.keys())[0]] - # retrieve initial (unpadded) tokens - prompt_tokens = seq_data.get_token_ids() - new_tokens = seq_group_metadata.sampling_params.max_tokens\ - if seq_group_metadata.sampling_params is not None else 0 - - updated_spyre_warmup_shapes = [ - shape for shape in applicable_spyre_warmup_shapes - if len(prompt_tokens) <= shape['prompt_length'] - and new_tokens <= shape['new_tokens'] - ] - applicable_spyre_warmup_shapes = updated_spyre_warmup_shapes - - assert applicable_spyre_warmup_shapes - - # If multiple warmup shapes apply, the first one is selected. - # For improving performance, the warmup shapes in scheduler_config - # are ordered by "processing speed". - min_pad_length_batch = applicable_spyre_warmup_shapes[0][ - 'prompt_length'] - padded_batch_size = applicable_spyre_warmup_shapes[0]['batch_size'] - - self._req_ids2idx = {} - for idx, seq_group_metadata in enumerate(seq_group_metadata_list): - assert seq_group_metadata.is_prompt - self._req_ids2idx[seq_group_metadata.request_id] = idx - seq_ids = list(seq_group_metadata.seq_data.keys()) - assert len(seq_ids) == 1 - seq_id = seq_ids[0] - - seq_data = seq_group_metadata.seq_data[seq_id] - # retrieve initial (unpadded) tokens - prompt_tokens = seq_data.get_token_ids() - - input_token_list.append( - torch.tensor(prompt_tokens, - dtype=torch.long, - device=torch.device("cpu"))) - - actual_batch_size = len(input_token_list) - self.model.indices = torch.cat([ - torch.ones(actual_batch_size, dtype=torch.bool, device='cpu'), - torch.zeros(padded_batch_size - actual_batch_size, - dtype=torch.bool, - device='cpu') - ]) - - # padding to compiled batch size - while len(input_token_list) < padded_batch_size: - input_token_list.append( - torch.zeros(min_pad_length_batch, - dtype=torch.long, - device=torch.device("cpu"))) - - # get position ids and attention mask - input_tokens, self._position_ids, self._mask = self.pad_input_ids( - input_token_list, min_pad_length=min_pad_length_batch) - - seq_lens = [t.shape[0] for t in input_token_list] - - return input_tokens, self._position_ids, self._mask, seq_lens - - def _prepare_decode( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - assert len(seq_group_metadata_list) > 0 - input_tokens: List[List[int]] = [ - [0] for _ in range(self._position_ids.shape[0]) - ] - - for seq_group_metadata in seq_group_metadata_list: - assert not seq_group_metadata.is_prompt - seq_ids = list(seq_group_metadata.seq_data.keys()) - assert len(seq_ids) == 1 - seq_id = seq_ids[0] - - seq_data = seq_group_metadata.seq_data[seq_id] - generation_token = seq_data.get_last_token_id() - input_tokens[self._req_ids2idx[seq_group_metadata.request_id]] = [ - generation_token - ] - - # update position ids and attention mask - self._update_position_ids() - self._update_mask() - - input_tokens = torch.tensor(input_tokens, - dtype=torch.long, - device=self.device) - - return input_tokens, self._position_ids, self._mask - - def _update_position_ids(self) -> None: - """Updating the position ids of all sequences - in a batch. Will be called in decoding phase""" - - self._position_ids = self._position_ids[:, -1] + 1 - self._position_ids = self._position_ids.unsqueeze(-1) - - def _update_mask(self) -> None: - """Updating/extending the attention masks of all - sequences in a batch. Will be called in decoding phase""" - - assert self._mask is not None - masks = self._mask - - masks_new = [] - for mask in masks: - # get the last row of the 3d mask - mask_new = mask[-1:, :] - - # extend the mask one slot - mask_new = torch.cat( - ( - mask_new, - torch.zeros( - 1, 1, dtype=mask_new.dtype, device=mask_new.device), - ), - dim=1, - ) - masks_new.append(mask_new) - - self._mask = torch.stack(masks_new, dim=0) - - def make_model_input_from_broadcasted_tensor_dict( - self, tensor_dict: Dict[str, Any]) -> ModelInputForSpyre: - return ModelInputForSpyre.from_broadcasted_tensor_dict(tensor_dict) - - def prepare_model_input( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - virtual_engine: int = 0, - finished_requests_ids: Optional[List[str]] = None - ) -> ModelInputForSpyre: - - # NOTE: We assume that all sequences in the group are all prompts or - # all decodes. - is_prompt = seq_group_metadata_list[0].is_prompt - # Prepare input tensors. - if is_prompt: - (input_tokens, input_positions, input_masks, - _) = self._prepare_prompt(seq_group_metadata_list) - seq_lens = [ - input_tokens.shape[1] for i in range(input_tokens.shape[0]) - ] - else: - # updating indices: set indices of newly finished sequences False - if finished_requests_ids: - for seq_id in finished_requests_ids: - self.model.indices[self._req_ids2idx[seq_id]] = False - (input_tokens, input_positions, - input_masks) = self._prepare_decode(seq_group_metadata_list) - seq_lens = [] - - sampling_metadata = SamplingMetadata.prepare( - seq_group_metadata_list, - seq_lens, - # query_lens is not needed if chunked prefill is not - # supported. Since Spyre worker doesn't support chunked prefill - # just use seq_lens instead. - seq_lens, - self.device, - self.pin_memory, - self.get_generators(finished_requests_ids)) - - return ModelInputForSpyre(input_tokens=input_tokens, - input_positions=input_positions, - input_masks=input_masks, - sampling_metadata=sampling_metadata, - is_prompt=is_prompt) - - def execute_model( - self, - model_input: ModelInputForSpyre, - kv_caches: Optional[List[torch.Tensor]] = None, - intermediate_tensors: Optional[IntermediateTensors] = None, - num_steps: int = 1, - **kwargs, - ) -> Optional[List[SamplerOutput]]: - - t0 = time.time() - - if num_steps > 1: - raise ValueError( - "SpyreModelRunner does not support multi-step execution.") - - hidden_states = self.model( - input_ids=model_input.input_tokens, - positions=model_input.input_positions, - masks=model_input.input_masks, - is_prompt=model_input.is_prompt, - ) - - # Only perform sampling in the driver worker. - if not self.is_driver_worker: - return [] - - # Compute the logits. - logits = self.model.compute_logits(hidden_states, - model_input.sampling_metadata) - - # Sample the next token. - output = self.model.sample( - logits=logits, - sampling_metadata=model_input.sampling_metadata, - ) - t1 = time.time() - t0 - print("[spyre_model_runner:execute_model] t_token: %.2fms" % - (t1 * 1000)) - - return [output] - - def _prepare_pad_input_ids( - self, - input_ids_list: List[torch.Tensor], - min_pad_length: int = 0, - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - """left side padding implemented as - in fms.utils.generation.pad_input_id""" - max_len = max([min_pad_length] + - [seq.size(0) for seq in input_ids_list]) - padded_input_ids_list = [] - mask_list = [] - position_ids_list = [] - for input_ids_i in input_ids_list: - seq_len = input_ids_i.size(0) - if max_len > seq_len: - print(f"[SpyreModelRunner] INFO: Padding request of length " - f"{seq_len} tokens to {max_len} tokens.") - pads = torch.ones(max_len - seq_len, - dtype=torch.long, - device=input_ids_i.device) * self.pad_token_id - non_pads = torch.ones(seq_len, - dtype=torch.long, - device=input_ids_i.device) - - pos_ids_pads = pads - pos_ids_seq = torch.arange(0, - seq_len, - dtype=torch.long, - device=input_ids_i.device) - - # Setting this to 0, however if 0 is the eos, we will end up - # truncating the output if using truncate_after_eos once this - # workflow works for nested tensor, this can probably be removed - padded_input_ids_list.append(torch.cat((pads, input_ids_i))) - mask_list.append(torch.cat((torch.zeros_like(pads), non_pads))) - position_ids_list.append(torch.cat((pos_ids_pads, pos_ids_seq))) - - return padded_input_ids_list, mask_list, position_ids_list - - def pad_input_ids( - self, - input_ids_list: List[torch.Tensor], - min_pad_length: int = 0, - ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - - padded_input_ids_list, mask_list, position_ids_list = self.\ - _prepare_pad_input_ids(input_ids_list, min_pad_length) - - input_ids = torch.stack(padded_input_ids_list) - mask = torch.stack(mask_list).bool() - # this is a causal mask for generation - mask = (mask.unsqueeze(-1) == mask.unsqueeze(-2)).tril() - mask = torch.where(mask.logical_not(), -torch.inf, 0.0) - mask = mask.to(self.model.dtype) - position_ids = torch.stack(position_ids_list) - - return input_ids, position_ids, mask - - def _raw_model_forward( - self, - input_ids: torch.Tensor, - mask: Optional[torch.Tensor] = None, - position_ids: Optional[torch.Tensor] = None, - past_key_value_states: Optional[List[Tuple[torch.Tensor, - torch.Tensor]]] = None, - use_cache: bool = False, - only_last_token: bool = False, - attn_algorithm: Optional[str] = None - ) -> Tuple[torch.Tensor, Optional[List[Tuple[torch.Tensor, - torch.Tensor]]]]: - - return self.model.model(input_ids, - mask=mask, - position_ids=position_ids, - past_key_value_states=past_key_value_states, - use_cache=use_cache, - only_last_token=only_last_token, - attn_algorithm=attn_algorithm) diff --git a/vllm/worker/spyre_worker.py b/vllm/worker/spyre_worker.py deleted file mode 100644 index 9e38ed0e3..000000000 --- a/vllm/worker/spyre_worker.py +++ /dev/null @@ -1,325 +0,0 @@ -"""A Spyre worker class.""" -import json -import os -import platform -import time -from typing import List, Optional, Tuple - -import torch -import torch.distributed as dist - -import vllm.envs as envs -from vllm.config import VllmConfig -from vllm.distributed import (ensure_model_parallel_initialized, - init_distributed_environment) -from vllm.model_executor import set_random_seed -from vllm.model_executor.model_loader import spyre_setup -from vllm.sequence import ExecuteModelRequest -from vllm.worker.spyre_embedding_model_runner import SpyreEmbeddingModelRunner -from vllm.worker.spyre_model_runner import SpyreModelRunner -from vllm.worker.worker_base import (LocalOrDistributedWorkerBase, - LoraNotSupportedWorkerBase, WorkerBase, - WorkerInput) - - -class SpyreWorker(LoraNotSupportedWorkerBase, LocalOrDistributedWorkerBase): - """A worker class that executes the model on a group of Spyre cores. - """ - - def __init__( - self, - vllm_config: VllmConfig, - local_rank: int, - rank: int, - distributed_init_method: str, - is_driver_worker: bool = False, - ) -> None: - WorkerBase.__init__(self, vllm_config=vllm_config) - self.local_rank = local_rank - self.rank = rank - self.distributed_init_method = distributed_init_method - self.is_driver_worker = is_driver_worker - if self.parallel_config and is_driver_worker: - assert rank % self.parallel_config.tensor_parallel_size == 0, \ - "Driver worker should be rank 0 of tensor parallel group." - if self.model_config.trust_remote_code: - # note: lazy import to avoid importing torch before initializing - from vllm.utils import init_cached_hf_modules - init_cached_hf_modules() - - if self.model_config.task == "embed": - self.model_runner: SpyreModelRunner = SpyreEmbeddingModelRunner( - self.model_config, self.parallel_config, self.scheduler_config, - self.device_config, self.is_driver_worker) - else: - self.model_runner = SpyreModelRunner(self.model_config, - self.parallel_config, - self.scheduler_config, - self.device_config, - self.is_driver_worker) - self._env_initialized = False - - def init_distributed_environment(self) -> None: - """Initialize the distributed environment.""" - - torch._C._distributed_c10d._register_process_group( - "default", dist.group.WORLD) - - if envs.VLLM_SPYRE_DYNAMO_BACKEND in ["sendnn", "sendnn_decoder"]: - spyre_setup.spyre_dist_setup( - rank=self.rank, - world_size=self.parallel_config.world_size, - verbose=True) - - # A small all_reduce for warmup. - torch.distributed.all_reduce(torch.zeros(1).cpu()) - - def init_device(self) -> None: - - if platform.machine() == "s390x": - from torch.serialization import LoadEndianness - torch.serialization.set_default_load_endianness( - LoadEndianness.LITTLE) - - if not self._env_initialized: - - init_distributed_environment( - world_size=self.parallel_config.world_size, - rank=self.rank, - distributed_init_method="env://", - backend="gloo", - ) - - if self.parallel_config.world_size > 1: - self.init_distributed_environment() - elif envs.VLLM_SPYRE_DYNAMO_BACKEND in [ - "sendnn", "sendnn_decoder" - ]: - spyre_setup.spyre_setup(rank=0, world_size=1, verbose=True) - - ensure_model_parallel_initialized( - self.parallel_config.tensor_parallel_size, - self.parallel_config.pipeline_parallel_size, - ) - - self._env_initialized = True - - # Set random seed. - set_random_seed(self.model_config.seed) - - def load_model(self): - assert self._env_initialized - - with open(os.path.join(self.model_config.model, 'config.json'), - 'rb') as f: - config = json.load(f) - - restricted_tokens = [] - if tok := config.get("bos_token_id") is not None: - restricted_tokens.append(int(tok)) - if tok := config.get("eos_token_id") is not None: - restricted_tokens.append(int(tok)) - - print("[SpyreWorker] load model...") - # TODO: check additionally if the Spyre card has enough memory - # for all requested model warmups - # printing env variables for debugging purposes - load_model_start_t = time.time() - - wup_prompt_lens, wup_new_tokens = zip( - *[(s["prompt_length"], s["new_tokens"]) - for s in self.scheduler_config.spyre_warmup_shapes]) - - self.model_runner.load_model(prompt_lens=wup_prompt_lens, - num_decode_tokens=wup_new_tokens) - - load_model_end_t = time.time() - load_model_total_t = load_model_end_t - load_model_start_t - print(f"\tload model took {load_model_total_t}s") - - print(f"[SpyreWorker] Start warming up " - f"{len(wup_new_tokens)} " - f"different prompt/decode/batchsize-shape combinations.") - all_warmup_start_t = time.time() - for i, (prompt_len, num_decode_tokens, batch_size) in enumerate([ - (s["prompt_length"], s["new_tokens"], s["batch_size"]) - for s in self.scheduler_config.spyre_warmup_shapes - ]): - if self.model_config.task != "embed": - # TODO: remove if spyre supports - # lower number of output tokens - assert num_decode_tokens >= 3, ( - "VLLM_SPYRE_WARMUP_NEW_TOKENS must be " - "at least 2 (spyre requirement).") - # warmup individual combination - print(f"[SpyreWorker] Warmup {i+1}/" - f"{len(wup_new_tokens)} " - f"prompt/decode/batchsize-shape combinations...") - print(f"[SpyreWorker] Warming up for prompt length {prompt_len}, " - f"decoding {num_decode_tokens} tokens with batch " - f"size {batch_size}") - self._warmup_spyre_fixed_size(prompt_len, num_decode_tokens, - restricted_tokens, batch_size) - all_warmup_end_t = time.time() - all_warmup_total_t = all_warmup_end_t - all_warmup_start_t - print(f"[SpyreWorker] All warmups for " - f"{len(wup_new_tokens)} different " - f"prompt/decode/batchsize-shape combinations finished. " - f"Total warmup time {all_warmup_total_t}s.") - - def _warmup_spyre_fixed_size(self, prompt_len, num_decode_tokens, - special_token_ids, batch_size): - # warmup the model - warmup_start_t = time.time() - # NOTE(ngl): empty tensor causes spyre to hang, so using - # randint without 0 and the eos and bos token - - # Create a list of valid values between 1 (inclusive) and vocab - # size (exclusive) by excluding the eos and bos token ids - # (in special_token_ids) - vocab_size = self.model_runner.vocab_size - valid_token_ids = [ - i for i in range(1, vocab_size) if i not in set(special_token_ids) - ] - # Convert to tensor for sampling - valid_token_ids_tensor = torch.tensor(valid_token_ids, - dtype=torch.long, - device=torch.device("cpu")) - # Sample from the valid token ids - warmup_tokens_tensor = valid_token_ids_tensor[torch.randint( - 0, len(valid_token_ids_tensor), (batch_size, prompt_len))] - - extra_kwargs = {} - if envs.VLLM_SPYRE_DYNAMO_BACKEND not in ["sendnn", "sendnn_decoder"]: - # Bug in 2.3.1 fixed in 2.4.1 for SDPA flash cpu - # impl when padding too much - extra_kwargs["attn_algorithm"] = "math" - - print(f"[SpyreWorker] warmup for prompt length " - f"{prompt_len} and max output tokens {num_decode_tokens}.") - - # 1. trace - print("[SpyreWorker] warmup 1/2...") - # TODO: torch_sendnn.CleanGraph() should be necessary? - # warmup 1st forward pass - self._warmup_model_forward_pass(warmup_tokens_tensor, - valid_token_ids_tensor, prompt_len, - num_decode_tokens, batch_size, - extra_kwargs) - - # 2. compile - print("[SpyreWorker] warmup 2/2...") - if envs.VLLM_SPYRE_DYNAMO_BACKEND == "sendnn_decoder": - from torch_sendnn import torch_sendnn - ul_start_time = time.time() - torch_sendnn.update_lazyhandle() - ul_stop_time = time.time() - ul_total_t = ul_stop_time - ul_start_time - print(f"update_lazyhandle() done (duration: {ul_total_t}s)") - - # warmup 2nd forward pass - self._warmup_model_forward_pass(warmup_tokens_tensor, - valid_token_ids_tensor, prompt_len, - num_decode_tokens, batch_size, - extra_kwargs) - - warmup_end_t = time.time() - warmup_total_t = warmup_end_t - warmup_start_t - print("[SpyreWorker] ... warmup finished.") - print(f"\twarmup took {warmup_total_t}s (for prompt length" - f"{prompt_len} and max output tokens {num_decode_tokens})") - - def _warmup_model_forward_pass(self, warmup_tokens_tensor, - valid_token_ids_tensor, prompt_len, - num_decode_tokens, batch_size, - extra_kwargs): - # padding warmup tokens to obtain the - # corresponding position ids and mask - warmup_tokens_pad, self.model_runner._position_ids, \ - self.model_runner._mask = self.model_runner.pad_input_ids( - warmup_tokens_tensor, min_pad_length=prompt_len) - - logits, past_key_value_states = self.model_runner._raw_model_forward( - warmup_tokens_pad, - position_ids=self.model_runner._position_ids, - mask=self.model_runner._mask, - past_key_value_states=None, - use_cache=True, - only_last_token=True, - **extra_kwargs) - # decoding - for i in range(num_decode_tokens - 1): - # sampling next input token from vocab without bos and eos tokens - decode_tokens = valid_token_ids_tensor[torch.randint( - 0, len(valid_token_ids_tensor), (batch_size, 1))] - - # update mask and position_ids - self.model_runner._update_mask() - self.model_runner._update_position_ids() - - if past_key_value_states is not None: - for layer in past_key_value_states: - for tensor in layer: - torch._dynamo.mark_dynamic(tensor, 2) - - logits, past_key_value_states = self.model_runner.\ - _raw_model_forward( - decode_tokens, - position_ids=self.model_runner._position_ids, - mask=self.model_runner._mask, - past_key_value_states=past_key_value_states, - use_cache=True, - only_last_token=True, - **extra_kwargs) - - def determine_num_available_blocks(self) -> Tuple[int, int]: - """Determine the number of available KV blocks. - - Swapping is not yet supported, so always return num_cpu_blocks=0. - - We configure num_gpu_blocks to be equal to max_num_seqs. - """ - # Set the number of GPU blocks to be the same as the maximum number of - # sequences that can be processed in a single batch. This is equivalent - # to schedule without PagedAttention. - num_gpu_blocks = self.scheduler_config.max_num_seqs - - # Swap not yet supported with Spyre backend. - num_cpu_blocks = 0 - - return num_gpu_blocks, num_cpu_blocks - - def initialize_cache(self, num_gpu_blocks: int, - num_cpu_blocks: int) -> None: - """Initialize the KV cache. - """ - - # Different values are not tested. - assert num_cpu_blocks == 0 - assert num_gpu_blocks == self.scheduler_config.max_num_seqs - - self.cache_config.num_gpu_blocks = num_gpu_blocks - self.cache_config.num_cpu_blocks = num_cpu_blocks - - def get_cache_block_size_bytes(self) -> int: - """Determine the size in bytes of a cache block. - - This is required for speculative decoding; it is not yet implemented. - """ - raise NotImplementedError - - @property - def do_metadata_broadcast(self) -> bool: - return True - - @property - def kv_cache(self) -> Optional[List[List[torch.Tensor]]]: - return None - - def prepare_worker_input( - self, execute_model_req: ExecuteModelRequest) -> WorkerInput: - return WorkerInput(num_seq_groups=len( - execute_model_req.seq_group_metadata_list), ) - - def execute_worker(self, worker_input: WorkerInput) -> None: - pass