From 72166578f621af83a6d95d9af81e3fe418cbe53e Mon Sep 17 00:00:00 2001 From: xly Date: Sun, 30 Jun 2024 22:01:43 +0100 Subject: [PATCH 01/11] add serverless-llm loader --- vllm/config.py | 1 + vllm/executor/distributed_gpu_executor.py | 11 ++ vllm/model_executor/model_loader/loader.py | 155 ++++++++++++++++++++- vllm/worker/model_runner.py | 14 ++ vllm/worker/worker.py | 12 ++ 5 files changed, 192 insertions(+), 1 deletion(-) diff --git a/vllm/config.py b/vllm/config.py index d9e4a619ee0..9d257858b6f 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -498,6 +498,7 @@ class LoadFormat(str, enum.Enum): TENSORIZER = "tensorizer" SHARDED_STATE = "sharded_state" BITSANDBYTES = "bitsandbytes" + SERVERLESS_LLM = "serverless_llm" @dataclass diff --git a/vllm/executor/distributed_gpu_executor.py b/vllm/executor/distributed_gpu_executor.py index f7c608af1ad..e9c09c6f6e1 100644 --- a/vllm/executor/distributed_gpu_executor.py +++ b/vllm/executor/distributed_gpu_executor.py @@ -113,6 +113,17 @@ def save_sharded_state( path=path, pattern=pattern, max_size=max_size) + + def save_serverless_llm_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + self._run_workers("save_serverless_llm_state", + path=path, + pattern=pattern, + max_size=max_size) @abstractmethod def _driver_execute_model( diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index 06de2fcc1cc..f633ea24b28 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -418,7 +418,6 @@ def save_model( tensorizer_config=tensorizer_config, ) - class ShardedStateLoader(BaseModelLoader): """ Model loader that directly loads each worker's model state dict, which @@ -576,6 +575,157 @@ def save_model( os.path.join(path, filename), ) +class ServerlessLLMLoader(BaseModelLoader): + # DEFAULT_PATTERN = "model-rank-{rank}-part-{part}.safetensors" + + def __init__(self, load_config: LoadConfig): + super().__init__(load_config) + extra_config = ({} if load_config.model_loader_extra_config is None + else load_config.model_loader_extra_config.copy()) + # self.pattern = extra_config.pop("pattern", self.DEFAULT_PATTERN) + if extra_config: + raise ValueError(f"Unexpected extra config keys for load format " + f"{load_config.load_format}: " + f"{load_config.model_loader_extra_config.keys()}") + + @staticmethod + def _filter_subtensors( + tensors: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]: + """ + Filter out all tensors that share the same memory or a subset of the + memory of another tensor. + """ + same_storage_groups = collections.defaultdict(list) + for key, tensor in tensors.items(): + if tensor.numel(): + ptr = tensor.untyped_storage().data_ptr() + same_storage_groups[tensor.device, ptr].append((key, tensor)) + + def get_end_ptr(tensor: torch.Tensor) -> int: + return tensor.view(-1)[-1].data_ptr() + tensor.element_size() + + result = {} + for group in same_storage_groups.values(): + for k, t in group: + a, b = t.data_ptr(), get_end_ptr(t) + for k2, t2 in group: + if not t2.is_contiguous(): + continue + a2, b2 = t2.data_ptr(), get_end_ptr(t2) + if a < a2 or b2 < b: + continue + if a2 < a or b < b2 or not t.is_contiguous(): + break # t2 covers strictly more memory than t. + if k2 < k: + # Same tensors, keep the one with the smaller key. + break + else: + result[k] = t + return result + + def load_model(self, *, model_config: ModelConfig, + device_config: DeviceConfig, + lora_config: Optional[LoRAConfig], + vision_language_config: Optional[VisionLanguageConfig], + parallel_config: ParallelConfig, + scheduler_config: SchedulerConfig, + cache_config: CacheConfig) -> nn.Module: + + from serverless_llm_store import load_dict + from serverless_llm_store.client import SllmStoreClient + from serverless_llm_store._C import ( + save_tensors, + restore_tensors, + allocate_cuda_memory, + get_cuda_memory_handles, + get_device_uuid_map, + ) + + from vllm.distributed import get_tensor_model_parallel_rank + + assert os.path.isdir(model_config.model) + + local_model_path = model_config.model + rank = get_tensor_model_parallel_rank() + + # client = SllmStoreClient("localhost:8073") + + sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) + + with set_default_torch_dtype(model_config.dtype): + with torch.device(device_config.device): + model = _initialize_model(model_config, self.load_config, + lora_config, vision_language_config, + cache_config) + state_dict = self._filter_subtensors(model.state_dict()) + + + for key in sllm_state_dict: + tensor = sllm_state_dict[key] + # If loading with LoRA enabled, additional padding may + # be added to certain parameters. We only load into a + # narrowed view of the parameter data. + param_data = state_dict[key].data + param_shape = state_dict[key].shape + for dim, size in enumerate(tensor.shape): + if size < param_shape[dim]: + param_data = param_data.narrow(dim, 0, size) + if tensor.shape != param_shape: + logger.warning( + "loading tensor of shape %s into " + "parameter '%s' of shape %s", tensor.shape, + key, param_shape) + param_data.copy_(tensor) + state_dict.pop(key) + if state_dict: + raise ValueError( + f"Missing keys {tuple(state_dict)} in loaded state!") + return model.eval() + + @staticmethod + def save_model( + model: torch.nn.Module, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + from vllm.distributed import get_tensor_model_parallel_rank + from serverless_llm_store._C import save_tensors + + # if pattern is None: + # pattern = ShardedStateLoader.DEFAULT_PATTERN + rank = get_tensor_model_parallel_rank() + state_dict = ShardedStateLoader._filter_subtensors(model.state_dict()) + + # move all tensors to CPU + for key, tensor in state_dict.items(): + state_dict[key] = tensor.cpu() + + tensor_names = list(state_dict.keys()) + tensor_data_index = {} + for name, param in state_dict.items(): + param_storage = param.untyped_storage() + data_ptr = param_storage.data_ptr() + size = param_storage.size() + tensor_data_index[name] = (data_ptr, size) + + print(tensor_data_index) + rank_path = os.path.join(path, f"rank_{rank}") + if not os.path.exists(rank_path): + os.makedirs(rank_path) + # save tensors + tensor_offsets = save_tensors(tensor_names, tensor_data_index, rank_path) + + # create tensor index + tensor_index = {} + for name, param in state_dict.items(): + # name: offset, size + tensor_index[name] = (tensor_offsets[name], tensor_data_index[name][1], tuple(param.shape), tuple(param.stride()), str(param.dtype)) + + # save tensor index + with open(os.path.join(rank_path, "tensor_index.json"), "w") as f: + json.dump(tensor_index, f) + class BitsAndBytesModelLoader(BaseModelLoader): """Model loader to load model weights with BitAndBytes quantization.""" @@ -826,6 +976,9 @@ def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: if load_config.load_format == LoadFormat.SHARDED_STATE: return ShardedStateLoader(load_config) + + if load_config.load_format == LoadFormat.SERVERLESS_LLM: + return ServerlessLLMLoader(load_config) if load_config.load_format == LoadFormat.BITSANDBYTES: return BitsAndBytesModelLoader(load_config) diff --git a/vllm/worker/model_runner.py b/vllm/worker/model_runner.py index 476e9ba3bb4..02bbceca44f 100644 --- a/vllm/worker/model_runner.py +++ b/vllm/worker/model_runner.py @@ -222,6 +222,20 @@ def save_sharded_state( pattern=pattern, max_size=max_size, ) + + def save_serverless_llm_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + from vllm.model_executor.model_loader.loader import ServerlessLLMLoader + ServerlessLLMLoader.save_model( + self.model, + path, + pattern=pattern, + max_size=max_size, + ) def save_tensorized_model( self, diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 7a378a862d0..b4c73f432f8 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -132,6 +132,18 @@ def save_sharded_state( pattern=pattern, max_size=max_size, ) + + def save_serverless_llm_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + self.model_runner.save_serverless_llm_state( + path, + pattern=pattern, + max_size=max_size, + ) def save_tensorized_model( self, From f9d1fc788686904a8479cea30ee48e46d187fa64 Mon Sep 17 00:00:00 2001 From: xly Date: Sun, 30 Jun 2024 23:38:27 +0100 Subject: [PATCH 02/11] custom load api --- vllm/model_executor/model_loader/loader.py | 71 +++++++++++++++------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index f633ea24b28..352d16dd268 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -642,15 +642,24 @@ def load_model(self, *, model_config: ModelConfig, ) from vllm.distributed import get_tensor_model_parallel_rank + import uuid assert os.path.isdir(model_config.model) local_model_path = model_config.model + local_model_path = os.path.join(local_model_path, f"rank_{rank}") rank = get_tensor_model_parallel_rank() - # client = SllmStoreClient("localhost:8073") + tensor_index_path = os.path.join(local_model_path, "tensor_index.json") + with open(tensor_index_path, "r") as f: + tensor_index = json.load(f) - sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) + device_uuid_map = get_device_uuid_map() + device_uuid = device_uuid_map[device_config.device] + replica_uuid = str(uuid.uuid4()) + client = SllmStoreClient("localhost:8073") + + # sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) with set_default_torch_dtype(model_config.dtype): with torch.device(device_config.device): @@ -659,27 +668,45 @@ def load_model(self, *, model_config: ModelConfig, cache_config) state_dict = self._filter_subtensors(model.state_dict()) + memory_ptrs = {device_uuid: []} + tensor_copy_chunks = {device_uuid: []} + for name, param in state_dict.items(): + data_ptr = param.untyped_storage().data_ptr() + memory_ptrs[device_uuid].append(data_ptr) + + offset, size = tensor_index[name] + tensor_copy_chunks[device_uuid].append( + (offset, size, 0) + ) + cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) - for key in sllm_state_dict: - tensor = sllm_state_dict[key] - # If loading with LoRA enabled, additional padding may - # be added to certain parameters. We only load into a - # narrowed view of the parameter data. - param_data = state_dict[key].data - param_shape = state_dict[key].shape - for dim, size in enumerate(tensor.shape): - if size < param_shape[dim]: - param_data = param_data.narrow(dim, 0, size) - if tensor.shape != param_shape: - logger.warning( - "loading tensor of shape %s into " - "parameter '%s' of shape %s", tensor.shape, - key, param_shape) - param_data.copy_(tensor) - state_dict.pop(key) - if state_dict: - raise ValueError( - f"Missing keys {tuple(state_dict)} in loaded state!") + ret = client.load_into_gpu( + local_model_path, + replica_uuid, + tensor_copy_chunks, + cuda_memory_handles, + ) + client.confirm_model_loaded(model_name_or_path, replica_uuid) + # for key in sllm_state_dict: + # tensor = sllm_state_dict[key] + # # If loading with LoRA enabled, additional padding may + # # be added to certain parameters. We only load into a + # # narrowed view of the parameter data. + # param_data = state_dict[key].data + # param_shape = state_dict[key].shape + # for dim, size in enumerate(tensor.shape): + # if size < param_shape[dim]: + # param_data = param_data.narrow(dim, 0, size) + # if tensor.shape != param_shape: + # logger.warning( + # "loading tensor of shape %s into " + # "parameter '%s' of shape %s", tensor.shape, + # key, param_shape) + # param_data.copy_(tensor) + # state_dict.pop(key) + # if state_dict: + # raise ValueError( + # f"Missing keys {tuple(state_dict)} in loaded state!") return model.eval() @staticmethod From 925277154a540d03dadddacfb03bdc35a448f347 Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Tue, 2 Jul 2024 16:45:04 +0100 Subject: [PATCH 03/11] save load sllm --- examples/load_sllm_state.py | 51 +++++++++++++ examples/save_load_sllm_state.sh | 9 +++ examples/save_sllm_state.py | 85 ++++++++++++++++++++++ vllm/model_executor/model_loader/loader.py | 46 +++++++----- 4 files changed, 172 insertions(+), 19 deletions(-) create mode 100644 examples/load_sllm_state.py create mode 100644 examples/save_load_sllm_state.sh create mode 100644 examples/save_sllm_state.py diff --git a/examples/load_sllm_state.py b/examples/load_sllm_state.py new file mode 100644 index 00000000000..5a172cf512f --- /dev/null +++ b/examples/load_sllm_state.py @@ -0,0 +1,51 @@ +""" +Saves each worker's model state dict directly to a checkpoint, which enables a +fast load path for large tensor-parallel models where each worker only needs to +read its own shard rather than the entire checkpoint. + +Example usage: + +python save_sharded_state.py \ + --model /path/to/load \ + --quantization deepspeedfp \ + --tensor-parallel-size 8 \ + --output /path/to/save + +Then, the model can be loaded with + +llm = LLM( + model="/path/to/save", + load_format="sharded_state", + quantization="deepspeedfp", + tensor_parallel_size=8, +) +""" +import argparse +import dataclasses +import os +import shutil +from pathlib import Path + +from vllm import LLM, EngineArgs + +parser = argparse.ArgumentParser() +EngineArgs.add_cli_args(parser) +parser.add_argument("--output", + "-o", + required=True, + type=str, + help="path to output checkpoint") + +if __name__ == "__main__": + args = parser.parse_args() + # main(args) + + llm = LLM( + model=args.output, + load_format="serverless_llm", + tensor_parallel_size=2, + ) + + input_text = "Hello, world!" + + print(llm.generate(input_text)) diff --git a/examples/save_load_sllm_state.sh b/examples/save_load_sllm_state.sh new file mode 100644 index 00000000000..a33520ef03f --- /dev/null +++ b/examples/save_load_sllm_state.sh @@ -0,0 +1,9 @@ +CUDA_VISIBLE_DEVICES=0,1 python save_sllm_state.py \ + --model /mnt/raid0sata1/huggingface/hub/models--facebook--opt-125m/snapshots/27dcfa74d334bc871f3234de431e71c6eeba5dd6 \ + --tensor-parallel-size 4 \ + --output /mnt/raid0nvme1/xly/test_data/vllm/opt-125m + +CUDA_VISIBLE_DEVICES=0,1 python load_sllm_state.py \ + --model /home/fuji/.cache/huggingface/hub/models--facebook--opt-1.3b/snapshots/3f5c25d0bc631cb57ac65913f76e22c2dfb61d62 \ + --tensor-parallel-size 2 \ + --output /home/fuji/sllm_models/opt-1.3b \ No newline at end of file diff --git a/examples/save_sllm_state.py b/examples/save_sllm_state.py new file mode 100644 index 00000000000..c74ff684d30 --- /dev/null +++ b/examples/save_sllm_state.py @@ -0,0 +1,85 @@ +""" +Saves each worker's model state dict directly to a checkpoint, which enables a +fast load path for large tensor-parallel models where each worker only needs to +read its own shard rather than the entire checkpoint. + +Example usage: + +python save_sharded_state.py \ + --model /path/to/load \ + --quantization deepspeedfp \ + --tensor-parallel-size 8 \ + --output /path/to/save + +Then, the model can be loaded with + +llm = LLM( + model="/path/to/save", + load_format="sharded_state", + quantization="deepspeedfp", + tensor_parallel_size=8, +) +""" +import argparse +import dataclasses +import os +import shutil +from pathlib import Path + +from vllm import LLM, EngineArgs + +parser = argparse.ArgumentParser() +EngineArgs.add_cli_args(parser) +parser.add_argument("--output", + "-o", + required=True, + type=str, + help="path to output checkpoint") +parser.add_argument("--file-pattern", + type=str, + help="string pattern of saved filenames") +parser.add_argument("--max-file-size", + type=str, + default=5 * 1024**3, + help="max size (in bytes) of each safetensors file") + + +def main(args): + engine_args = EngineArgs.from_cli_args(args) + if engine_args.enable_lora: + raise ValueError("Saving with enable_lora=True is not supported!") + model_path = engine_args.model + if not Path(model_path).is_dir(): + raise ValueError("model path must be a local directory") + # Create LLM instance from arguments + llm = LLM(**dataclasses.asdict(engine_args)) + # Prepare output directory + Path(args.output).mkdir(exist_ok=True) + # Dump worker states to output directory + model_executor = llm.llm_engine.model_executor + model_executor.save_serverless_llm_state(path=args.output, + pattern=args.file_pattern, + max_size=args.max_file_size) + # Copy metadata files to output directory + for file in os.listdir(model_path): + if os.path.splitext(file)[1] not in (".bin", ".pt", ".safetensors"): + if os.path.isdir(os.path.join(model_path, file)): + shutil.copytree(os.path.join(model_path, file), + os.path.join(args.output, file)) + else: + shutil.copy(os.path.join(model_path, file), args.output) + +from vllm.distributed import get_tensor_model_parallel_rank +if __name__ == "__main__": + args = parser.parse_args() + main(args) + + # llm = LLM( + # model=args.output, + # load_format="serverless_llm", + # tensor_parallel_size=2, + # ) + + # input_text = "Hello, world!" + + # print(llm.generate(input_text)) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index 352d16dd268..f8c96ccf648 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -630,13 +630,10 @@ def load_model(self, *, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, cache_config: CacheConfig) -> nn.Module: - - from serverless_llm_store import load_dict + print("Loading model") from serverless_llm_store.client import SllmStoreClient from serverless_llm_store._C import ( - save_tensors, restore_tensors, - allocate_cuda_memory, get_cuda_memory_handles, get_device_uuid_map, ) @@ -645,17 +642,19 @@ def load_model(self, *, model_config: ModelConfig, import uuid assert os.path.isdir(model_config.model) + + rank = get_tensor_model_parallel_rank() local_model_path = model_config.model local_model_path = os.path.join(local_model_path, f"rank_{rank}") - rank = get_tensor_model_parallel_rank() + tensor_index_path = os.path.join(local_model_path, "tensor_index.json") with open(tensor_index_path, "r") as f: tensor_index = json.load(f) device_uuid_map = get_device_uuid_map() - device_uuid = device_uuid_map[device_config.device] + device_uuid = device_uuid_map[rank] replica_uuid = str(uuid.uuid4()) client = SllmStoreClient("localhost:8073") @@ -668,25 +667,34 @@ def load_model(self, *, model_config: ModelConfig, cache_config) state_dict = self._filter_subtensors(model.state_dict()) - memory_ptrs = {device_uuid: []} - tensor_copy_chunks = {device_uuid: []} - for name, param in state_dict.items(): + memory_ptrs = {rank: []} + tensor_copy_chunks = {rank: []} + for idx, (name, param) in enumerate(state_dict.items()): data_ptr = param.untyped_storage().data_ptr() - memory_ptrs[device_uuid].append(data_ptr) + memory_ptrs[rank].append(data_ptr) + offset, size, _, _, _ = tensor_index[name] - offset, size = tensor_index[name] - tensor_copy_chunks[device_uuid].append( - (offset, size, 0) + tensor_copy_chunks[rank].append( + (offset, size, 0, idx) ) cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) + model_name = local_model_path.split("/")[-2:][0] + "/" + local_model_path.split("/")[-1] + + ret = client.load_into_cpu(model_name) + if not ret or ret == False: + raise ValueError(f"Failed to load model {model_name} into CPU") + + ret = client.load_into_gpu( - local_model_path, + model_name, replica_uuid, - tensor_copy_chunks, - cuda_memory_handles, - ) - client.confirm_model_loaded(model_name_or_path, replica_uuid) + {device_uuid: tensor_copy_chunks[rank]}, + {device_uuid: cuda_memory_handles[rank]} + ) + if not ret or ret == False: + raise ValueError(f"Failed to load model {model_name} into GPU") + client.confirm_model_loaded(model_name, replica_uuid) # for key in sllm_state_dict: # tensor = sllm_state_dict[key] # # If loading with LoRA enabled, additional padding may @@ -737,7 +745,7 @@ def save_model( tensor_data_index[name] = (data_ptr, size) print(tensor_data_index) - rank_path = os.path.join(path, f"rank_{rank}") + rank_path = path + f"rank_{rank}" if not os.path.exists(rank_path): os.makedirs(rank_path) # save tensors From cbd772c324ec2ca6377df8255d563900f18b6a64 Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Fri, 5 Jul 2024 11:04:58 +0100 Subject: [PATCH 04/11] early load cpu & single gpu save --- vllm/executor/gpu_executor.py | 10 +++++++ vllm/model_executor/model_loader/loader.py | 35 ++++------------------ 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/vllm/executor/gpu_executor.py b/vllm/executor/gpu_executor.py index 3ad201f4757..d71da2bac69 100644 --- a/vllm/executor/gpu_executor.py +++ b/vllm/executor/gpu_executor.py @@ -106,6 +106,16 @@ def check_health(self) -> None: # GPUExecutor will always be healthy as long as # it's running. return + + def save_serverless_llm_state( + self, + path: str, + pattern: Optional[str] = None, + max_size: Optional[int] = None, + ) -> None: + self.driver_worker.save_serverless_llm_state( + path=path, pattern=pattern, max_size=max_size + ) class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase): diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index f8c96ccf648..40658f8e9d9 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -647,7 +647,10 @@ def load_model(self, *, model_config: ModelConfig, local_model_path = model_config.model local_model_path = os.path.join(local_model_path, f"rank_{rank}") - + model_name = local_model_path.split("/")[-2:][0] + "/" + local_model_path.split("/")[-1] + ret = client.load_into_cpu(model_name) + if not ret or ret == False: + raise ValueError(f"Failed to load model {model_name} into CPU") tensor_index_path = os.path.join(local_model_path, "tensor_index.json") with open(tensor_index_path, "r") as f: @@ -674,17 +677,11 @@ def load_model(self, *, model_config: ModelConfig, memory_ptrs[rank].append(data_ptr) offset, size, _, _, _ = tensor_index[name] + # every tensor has its own base address, so GPU offset is always 0 tensor_copy_chunks[rank].append( (offset, size, 0, idx) ) cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) - model_name = local_model_path.split("/")[-2:][0] + "/" + local_model_path.split("/")[-1] - - ret = client.load_into_cpu(model_name) - if not ret or ret == False: - raise ValueError(f"Failed to load model {model_name} into CPU") - - ret = client.load_into_gpu( model_name, @@ -695,26 +692,6 @@ def load_model(self, *, model_config: ModelConfig, if not ret or ret == False: raise ValueError(f"Failed to load model {model_name} into GPU") client.confirm_model_loaded(model_name, replica_uuid) - # for key in sllm_state_dict: - # tensor = sllm_state_dict[key] - # # If loading with LoRA enabled, additional padding may - # # be added to certain parameters. We only load into a - # # narrowed view of the parameter data. - # param_data = state_dict[key].data - # param_shape = state_dict[key].shape - # for dim, size in enumerate(tensor.shape): - # if size < param_shape[dim]: - # param_data = param_data.narrow(dim, 0, size) - # if tensor.shape != param_shape: - # logger.warning( - # "loading tensor of shape %s into " - # "parameter '%s' of shape %s", tensor.shape, - # key, param_shape) - # param_data.copy_(tensor) - # state_dict.pop(key) - # if state_dict: - # raise ValueError( - # f"Missing keys {tuple(state_dict)} in loaded state!") return model.eval() @staticmethod @@ -730,7 +707,7 @@ def save_model( # if pattern is None: # pattern = ShardedStateLoader.DEFAULT_PATTERN rank = get_tensor_model_parallel_rank() - state_dict = ShardedStateLoader._filter_subtensors(model.state_dict()) + state_dict = ServerlessLLMLoader._filter_subtensors(model.state_dict()) # move all tensors to CPU for key, tensor in state_dict.items(): From c54898c41f940de3154599441dfb66776fbcaf83 Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Mon, 8 Jul 2024 13:57:51 +0100 Subject: [PATCH 05/11] update loader --- vllm/model_executor/model_loader/loader.py | 72 +++++++++++----------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index 40658f8e9d9..429d6f2d3d7 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -633,9 +633,9 @@ def load_model(self, *, model_config: ModelConfig, print("Loading model") from serverless_llm_store.client import SllmStoreClient from serverless_llm_store._C import ( - restore_tensors, get_cuda_memory_handles, get_device_uuid_map, + get_device_ptrs_from_mem_handles, ) from vllm.distributed import get_tensor_model_parallel_rank @@ -643,14 +643,16 @@ def load_model(self, *, model_config: ModelConfig, assert os.path.isdir(model_config.model) + client = SllmStoreClient("localhost:8073") rank = get_tensor_model_parallel_rank() local_model_path = model_config.model local_model_path = os.path.join(local_model_path, f"rank_{rank}") - model_name = local_model_path.split("/")[-2:][0] + "/" + local_model_path.split("/")[-1] + model_name = "/".join(local_model_path.split("/")[-2:]) + ret = client.load_into_cpu(model_name) if not ret or ret == False: - raise ValueError(f"Failed to load model {model_name} into CPU") + raise ValueError(f"Failed to load model {model_name} into CPU") tensor_index_path = os.path.join(local_model_path, "tensor_index.json") with open(tensor_index_path, "r") as f: @@ -659,7 +661,6 @@ def load_model(self, *, model_config: ModelConfig, device_uuid_map = get_device_uuid_map() device_uuid = device_uuid_map[rank] replica_uuid = str(uuid.uuid4()) - client = SllmStoreClient("localhost:8073") # sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) @@ -672,16 +673,38 @@ def load_model(self, *, model_config: ModelConfig, memory_ptrs = {rank: []} tensor_copy_chunks = {rank: []} + + # idx = 0 + # for name, param in model.named_parameters(recurse=True): + # if not name in state_dict: + # continue + # data_ptr = param.data_ptr() + # memory_ptrs[rank].append(data_ptr) + + # offset, size, _, _, _ = tensor_index[name] + # tensor_copy_chunks[rank].append((offset, size, 0, idx)) + # idx += 1 + # print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") + for idx, (name, param) in enumerate(state_dict.items()): data_ptr = param.untyped_storage().data_ptr() memory_ptrs[rank].append(data_ptr) offset, size, _, _, _ = tensor_index[name] - # every tensor has its own base address, so GPU offset is always 0 - tensor_copy_chunks[rank].append( - (offset, size, 0, idx) - ) - cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) + tensor_copy_chunks[rank].append((offset, size, 0, idx)) + print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") + + cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) + # device_ptrs = get_device_ptrs_from_mem_handles(cuda_memory_handles) + + # for k, ptr in enumerate(device_ptrs[rank]): + # assert hex(ptr) == hex(memory_ptrs[rank][k]), f"Memory ptrs do not match: {hex(ptr)} != {hex(memory_ptrs[rank][k])}" + # cuda_memory_handles = { + # rank: [ + # get_cuda_memory_handles({rank: ptr})[rank] + # for ptr in memory_ptrs[rank] + # ] + # } ret = client.load_into_gpu( model_name, @@ -702,41 +725,16 @@ def save_model( max_size: Optional[int] = None, ) -> None: from vllm.distributed import get_tensor_model_parallel_rank - from serverless_llm_store._C import save_tensors + from serverless_llm_store import save_dict - # if pattern is None: - # pattern = ShardedStateLoader.DEFAULT_PATTERN rank = get_tensor_model_parallel_rank() state_dict = ServerlessLLMLoader._filter_subtensors(model.state_dict()) # move all tensors to CPU for key, tensor in state_dict.items(): state_dict[key] = tensor.cpu() - - tensor_names = list(state_dict.keys()) - tensor_data_index = {} - for name, param in state_dict.items(): - param_storage = param.untyped_storage() - data_ptr = param_storage.data_ptr() - size = param_storage.size() - tensor_data_index[name] = (data_ptr, size) - - print(tensor_data_index) - rank_path = path + f"rank_{rank}" - if not os.path.exists(rank_path): - os.makedirs(rank_path) - # save tensors - tensor_offsets = save_tensors(tensor_names, tensor_data_index, rank_path) - - # create tensor index - tensor_index = {} - for name, param in state_dict.items(): - # name: offset, size - tensor_index[name] = (tensor_offsets[name], tensor_data_index[name][1], tuple(param.shape), tuple(param.stride()), str(param.dtype)) - - # save tensor index - with open(os.path.join(rank_path, "tensor_index.json"), "w") as f: - json.dump(tensor_index, f) + + save_dict(state_dict, os.path.join(path, f"rank_{rank}")) class BitsAndBytesModelLoader(BaseModelLoader): From 2b9fe5b24b8f4fbd399bc4e4559ecddd21bd2e11 Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Tue, 9 Jul 2024 15:40:04 +0100 Subject: [PATCH 06/11] vllm loader new API --- vllm/model_executor/model_loader/loader.py | 166 ++++++++++++++------- 1 file changed, 114 insertions(+), 52 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index 429d6f2d3d7..a7bc5b02de5 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -632,90 +632,152 @@ def load_model(self, *, model_config: ModelConfig, cache_config: CacheConfig) -> nn.Module: print("Loading model") from serverless_llm_store.client import SllmStoreClient + from serverless_llm_store import load_into_cpu_non_blocking, load_into_gpu_non_blocking, wait_dict_loaded from serverless_llm_store._C import ( get_cuda_memory_handles, get_device_uuid_map, - get_device_ptrs_from_mem_handles, + allocate_cuda_memory, ) from vllm.distributed import get_tensor_model_parallel_rank + from accelerate import dispatch_model, init_empty_weights import uuid assert os.path.isdir(model_config.model) - client = SllmStoreClient("localhost:8073") + # client = SllmStoreClient("localhost:8073") rank = get_tensor_model_parallel_rank() local_model_path = model_config.model local_model_path = os.path.join(local_model_path, f"rank_{rank}") - model_name = "/".join(local_model_path.split("/")[-2:]) - ret = client.load_into_cpu(model_name) - if not ret or ret == False: - raise ValueError(f"Failed to load model {model_name} into CPU") + # model name is everything after models + model_name = local_model_path.split("models/")[1] + storage_path = local_model_path.split("models/")[0] + if storage_path.endswith("/"): + storage_path = os.path.join(storage_path, "models") + else: + storage_path = storage_path + "models" + device_map = {"": rank} + + load_into_cpu_non_blocking(model_name, device_map, storage_path) + replica_uuid, sllm_state_dict, device_map = load_into_gpu_non_blocking(model_name, device_map, storage_path) - tensor_index_path = os.path.join(local_model_path, "tensor_index.json") - with open(tensor_index_path, "r") as f: - tensor_index = json.load(f) + # tensor_index_path = os.path.join(local_model_path, "tensor_index.json") + # with open(tensor_index_path, "r") as f: + # tensor_index = json.load(f) - device_uuid_map = get_device_uuid_map() - device_uuid = device_uuid_map[rank] - replica_uuid = str(uuid.uuid4()) + # device_uuid_map = get_device_uuid_map() + # device_uuid = device_uuid_map[rank] + # replica_uuid = str(uuid.uuid4()) # sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) with set_default_torch_dtype(model_config.dtype): with torch.device(device_config.device): model = _initialize_model(model_config, self.load_config, - lora_config, vision_language_config, - cache_config) + lora_config, vision_language_config, + cache_config) + model = model.eval() + + # set all parameters to meta device state_dict = self._filter_subtensors(model.state_dict()) + key_list = list(state_dict.keys()) - memory_ptrs = {rank: []} - tensor_copy_chunks = {rank: []} + for key, param in model.named_parameters(recurse=True): + if key in key_list: + param.data = torch.empty(1, device=torch.device("cuda")) - # idx = 0 - # for name, param in model.named_parameters(recurse=True): - # if not name in state_dict: - # continue - # data_ptr = param.data_ptr() - # memory_ptrs[rank].append(data_ptr) + # model = dispatch_model(model, {"": torch.device("meta")}) + torch.cuda.empty_cache() + + wait_dict_loaded(model_name, replica_uuid) + + for key, param in model.named_parameters(recurse=True): + if key in key_list: + tensor = sllm_state_dict[key] + # param_data = param.data + # param_shape = param.shape + # print(f"{param_shape=}, {param.device=}") + # for dim, size in enumerate(tensor.shape): + # if size < param_shape[dim]: + # param_data = param_data.narrow(dim, 0, size) + # if tensor.shape != param_shape: + # logger.warning( + # "loading tensor of shape %s into " + # "parameter '%s' of shape %s", tensor.shape, key, param_shape) + # param_data.copy_(tensor) + param.data = tensor + state_dict.pop(key) + if state_dict: + raise ValueError( + f"Missing keys {tuple(state_dict)} in loaded state!") + + + + # tensor_meta_index = {} + # tensor_data_index = {} + # for name, (offset, size, shape, stride, dtype) in tensor_index.items(): + # tensor_meta_index[name] = (shape, stride, dtype) + # tensor_data_index[name] = (offset, size) + + # total_memory_size = 0 + # tensor_offsets = [] + # tensor_chunks = [] + # for name in state_dict.keys(): + # cpu_offsets, memory_size = tensor_index[name] + # tensor_chunks.append((cpu_offsets, size, total_memory_size, 0)) + # tensor_offsets.append(total_memory_size) + # total_memory_size += memory_size + # cuda_memory_ptrs = allocate_cuda_memory(total_memory_size) + # cuda_memory_handles = get_cuda_memory_handles(cuda_memory_ptrs) + + # memory_ptrs = {rank: []} + # tensor_copy_chunks = {rank: []} + + # # idx = 0 + # # for name, param in model.named_parameters(recurse=True): + # # if not name in state_dict: + # # continue + # # data_ptr = param.data.untyped_storage().data_ptr() + # # memory_ptrs[rank].append(data_ptr) + + # # offset, size, _, _, _ = tensor_index[name] + # # tensor_copy_chunks[rank].append((offset, size, 0, idx)) + # # idx += 1 + # # print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") + + # for idx, (name, param) in enumerate(state_dict.items()): + # # data_ptr = param.untyped_storage().data_ptr() + # data_ptr = param.view(-1)[-1].data_ptr() + # memory_ptrs[rank].append(data_ptr) # offset, size, _, _, _ = tensor_index[name] + # # every tensor has its own base address, so GPU offset is always 0 # tensor_copy_chunks[rank].append((offset, size, 0, idx)) - # idx += 1 # print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") - - for idx, (name, param) in enumerate(state_dict.items()): - data_ptr = param.untyped_storage().data_ptr() - memory_ptrs[rank].append(data_ptr) - offset, size, _, _, _ = tensor_index[name] - # every tensor has its own base address, so GPU offset is always 0 - tensor_copy_chunks[rank].append((offset, size, 0, idx)) - print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") - cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) - # device_ptrs = get_device_ptrs_from_mem_handles(cuda_memory_handles) + # cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) - # for k, ptr in enumerate(device_ptrs[rank]): - # assert hex(ptr) == hex(memory_ptrs[rank][k]), f"Memory ptrs do not match: {hex(ptr)} != {hex(memory_ptrs[rank][k])}" - # cuda_memory_handles = { - # rank: [ - # get_cuda_memory_handles({rank: ptr})[rank] - # for ptr in memory_ptrs[rank] - # ] - # } + # # for k, ptr in enumerate(device_ptrs[rank]): + # # assert hex(ptr) == hex(memory_ptrs[rank][k]), f"Memory ptrs do not match: {hex(ptr)} != {hex(memory_ptrs[rank][k])}" + # # cuda_memory_handles = { + # # rank: [ + # # get_cuda_memory_handles({rank: ptr})[rank] + # # for ptr in memory_ptrs[rank] + # # ] + # # } - ret = client.load_into_gpu( - model_name, - replica_uuid, - {device_uuid: tensor_copy_chunks[rank]}, - {device_uuid: cuda_memory_handles[rank]} - ) - if not ret or ret == False: - raise ValueError(f"Failed to load model {model_name} into GPU") - client.confirm_model_loaded(model_name, replica_uuid) - return model.eval() + # ret = client.load_into_gpu( + # model_name, + # replica_uuid, + # {device_uuid: tensor_copy_chunks[rank]}, + # {device_uuid: cuda_memory_handles[rank]} + # ) + # if not ret or ret == False: + # raise ValueError(f"Failed to load model {model_name} into GPU") + # client.confirm_model_loaded(model_name, replica_uuid) + return model @staticmethod def save_model( @@ -732,7 +794,7 @@ def save_model( # move all tensors to CPU for key, tensor in state_dict.items(): - state_dict[key] = tensor.cpu() + state_dict[key] = tensor.cpu().contiguous() save_dict(state_dict, os.path.join(path, f"rank_{rank}")) From 3fd5452c591370728618feba1f0a17b148a50017 Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Wed, 10 Jul 2024 01:01:18 +0100 Subject: [PATCH 07/11] set empty tensor --- examples/load_sllm_state.py | 9 ++++++-- examples/save_sllm_state.py | 7 ++++++ vllm/model_executor/model_loader/loader.py | 27 ++++++++++++++-------- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/examples/load_sllm_state.py b/examples/load_sllm_state.py index 5a172cf512f..e3dbdb824d0 100644 --- a/examples/load_sllm_state.py +++ b/examples/load_sllm_state.py @@ -43,9 +43,14 @@ llm = LLM( model=args.output, load_format="serverless_llm", - tensor_parallel_size=2, + # load_format="sharded_state", + gpu_memory_utilization=0.9, + distributed_executor_backend="mp", + max_model_len = 512, + tensor_parallel_size=args.tensor_parallel_size, + # num_gpu_blocks_override=128, ) - input_text = "Hello, world!" + input_text = "Explain thread and process in python." print(llm.generate(input_text)) diff --git a/examples/save_sllm_state.py b/examples/save_sllm_state.py index c74ff684d30..4070cb7d7fe 100644 --- a/examples/save_sllm_state.py +++ b/examples/save_sllm_state.py @@ -46,12 +46,19 @@ def main(args): engine_args = EngineArgs.from_cli_args(args) + engine_args.distributed_executor_backend = "mp" + engine_args.gpu_memory_utilization = 0.4 + engine_args.max_seq_len_to_capture = 512 + engine_args.max_model_len = 512 + engine_args.max_num_seqs = 1 + engine_args.num_gpu_blocks_override = 128 if engine_args.enable_lora: raise ValueError("Saving with enable_lora=True is not supported!") model_path = engine_args.model if not Path(model_path).is_dir(): raise ValueError("model path must be a local directory") # Create LLM instance from arguments + print(dataclasses.asdict(engine_args)) llm = LLM(**dataclasses.asdict(engine_args)) # Prepare output directory Path(args.output).mkdir(exist_ok=True) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index a7bc5b02de5..acea1ab5cc6 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -14,6 +14,7 @@ import torch from huggingface_hub import HfApi, hf_hub_download from torch import nn +import gc from vllm.config import (CacheConfig, DeviceConfig, LoadConfig, LoadFormat, LoRAConfig, ModelConfig, ParallelConfig, @@ -622,6 +623,7 @@ def get_end_ptr(tensor: torch.Tensor) -> int: else: result[k] = t return result + def load_model(self, *, model_config: ModelConfig, device_config: DeviceConfig, @@ -630,9 +632,9 @@ def load_model(self, *, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, cache_config: CacheConfig) -> nn.Module: - print("Loading model") from serverless_llm_store.client import SllmStoreClient from serverless_llm_store import load_into_cpu_non_blocking, load_into_gpu_non_blocking, wait_dict_loaded + from serverless_llm_store import load_dict from serverless_llm_store._C import ( get_cuda_memory_handles, get_device_uuid_map, @@ -660,9 +662,6 @@ def load_model(self, *, model_config: ModelConfig, storage_path = storage_path + "models" device_map = {"": rank} - load_into_cpu_non_blocking(model_name, device_map, storage_path) - replica_uuid, sllm_state_dict, device_map = load_into_gpu_non_blocking(model_name, device_map, storage_path) - # tensor_index_path = os.path.join(local_model_path, "tensor_index.json") # with open(tensor_index_path, "r") as f: # tensor_index = json.load(f) @@ -672,26 +671,34 @@ def load_model(self, *, model_config: ModelConfig, # replica_uuid = str(uuid.uuid4()) # sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) - with set_default_torch_dtype(model_config.dtype): - with torch.device(device_config.device): + # with torch.device(device_config.device): + with torch.device("cpu"): model = _initialize_model(model_config, self.load_config, lora_config, vision_language_config, cache_config) model = model.eval() - # set all parameters to meta device state_dict = self._filter_subtensors(model.state_dict()) key_list = list(state_dict.keys()) for key, param in model.named_parameters(recurse=True): if key in key_list: - param.data = torch.empty(1, device=torch.device("cuda")) - + param.data = torch.empty(1, device="cuda") + # print(f"{param.shape=}, {param.device=}") + gc.collect() + # for key in state_dict: + # state_dict[key] = torch.empty(1, device="cuda") # model = dispatch_model(model, {"": torch.device("meta")}) - torch.cuda.empty_cache() + + + # print cuda free memory + # print("Memory reserved ", torch.cuda.memory_reserved() / 1024 / 1024 / 1024, "GB") + load_into_cpu_non_blocking(model_name, device_map, storage_path) + replica_uuid, sllm_state_dict, device_map = load_into_gpu_non_blocking(model_name, device_map, storage_path) wait_dict_loaded(model_name, replica_uuid) + # sllm_state_dict = load_dict(model_name, device_map, storage_path) for key, param in model.named_parameters(recurse=True): if key in key_list: From 623a35e73fa5098656157af76724394c25d9781b Mon Sep 17 00:00:00 2001 From: Leyang Xue Date: Mon, 22 Jul 2024 14:11:35 +0100 Subject: [PATCH 08/11] clean up loader --- vllm/model_executor/model_loader/loader.py | 111 +-------------------- 1 file changed, 2 insertions(+), 109 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index acea1ab5cc6..c1ec03e6404 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -632,22 +632,11 @@ def load_model(self, *, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, cache_config: CacheConfig) -> nn.Module: - from serverless_llm_store.client import SllmStoreClient - from serverless_llm_store import load_into_cpu_non_blocking, load_into_gpu_non_blocking, wait_dict_loaded - from serverless_llm_store import load_dict - from serverless_llm_store._C import ( - get_cuda_memory_handles, - get_device_uuid_map, - allocate_cuda_memory, - ) - + from serverless_llm_store import load_dict_single_device from vllm.distributed import get_tensor_model_parallel_rank - from accelerate import dispatch_model, init_empty_weights - import uuid assert os.path.isdir(model_config.model) - # client = SllmStoreClient("localhost:8073") rank = get_tensor_model_parallel_rank() local_model_path = model_config.model @@ -660,17 +649,7 @@ def load_model(self, *, model_config: ModelConfig, storage_path = os.path.join(storage_path, "models") else: storage_path = storage_path + "models" - device_map = {"": rank} - - # tensor_index_path = os.path.join(local_model_path, "tensor_index.json") - # with open(tensor_index_path, "r") as f: - # tensor_index = json.load(f) - - # device_uuid_map = get_device_uuid_map() - # device_uuid = device_uuid_map[rank] - # replica_uuid = str(uuid.uuid4()) - # sllm_state_dict = load_dict(os.path.join(local_model_path, f"rank_{rank}", device_config.device)) with set_default_torch_dtype(model_config.dtype): # with torch.device(device_config.device): with torch.device("cpu"): @@ -685,105 +664,19 @@ def load_model(self, *, model_config: ModelConfig, for key, param in model.named_parameters(recurse=True): if key in key_list: param.data = torch.empty(1, device="cuda") - # print(f"{param.shape=}, {param.device=}") gc.collect() - # for key in state_dict: - # state_dict[key] = torch.empty(1, device="cuda") - # model = dispatch_model(model, {"": torch.device("meta")}) - - # print cuda free memory - # print("Memory reserved ", torch.cuda.memory_reserved() / 1024 / 1024 / 1024, "GB") - load_into_cpu_non_blocking(model_name, device_map, storage_path) - replica_uuid, sllm_state_dict, device_map = load_into_gpu_non_blocking(model_name, device_map, storage_path) - - wait_dict_loaded(model_name, replica_uuid) - # sllm_state_dict = load_dict(model_name, device_map, storage_path) + sllm_state_dict = load_dict_single_device(model_name, storage_path) for key, param in model.named_parameters(recurse=True): if key in key_list: tensor = sllm_state_dict[key] - # param_data = param.data - # param_shape = param.shape - # print(f"{param_shape=}, {param.device=}") - # for dim, size in enumerate(tensor.shape): - # if size < param_shape[dim]: - # param_data = param_data.narrow(dim, 0, size) - # if tensor.shape != param_shape: - # logger.warning( - # "loading tensor of shape %s into " - # "parameter '%s' of shape %s", tensor.shape, key, param_shape) - # param_data.copy_(tensor) param.data = tensor state_dict.pop(key) if state_dict: raise ValueError( f"Missing keys {tuple(state_dict)} in loaded state!") - - - # tensor_meta_index = {} - # tensor_data_index = {} - # for name, (offset, size, shape, stride, dtype) in tensor_index.items(): - # tensor_meta_index[name] = (shape, stride, dtype) - # tensor_data_index[name] = (offset, size) - - # total_memory_size = 0 - # tensor_offsets = [] - # tensor_chunks = [] - # for name in state_dict.keys(): - # cpu_offsets, memory_size = tensor_index[name] - # tensor_chunks.append((cpu_offsets, size, total_memory_size, 0)) - # tensor_offsets.append(total_memory_size) - # total_memory_size += memory_size - - # cuda_memory_ptrs = allocate_cuda_memory(total_memory_size) - # cuda_memory_handles = get_cuda_memory_handles(cuda_memory_ptrs) - - # memory_ptrs = {rank: []} - # tensor_copy_chunks = {rank: []} - - # # idx = 0 - # # for name, param in model.named_parameters(recurse=True): - # # if not name in state_dict: - # # continue - # # data_ptr = param.data.untyped_storage().data_ptr() - # # memory_ptrs[rank].append(data_ptr) - - # # offset, size, _, _, _ = tensor_index[name] - # # tensor_copy_chunks[rank].append((offset, size, 0, idx)) - # # idx += 1 - # # print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") - - # for idx, (name, param) in enumerate(state_dict.items()): - # # data_ptr = param.untyped_storage().data_ptr() - # data_ptr = param.view(-1)[-1].data_ptr() - # memory_ptrs[rank].append(data_ptr) - # offset, size, _, _, _ = tensor_index[name] - # # every tensor has its own base address, so GPU offset is always 0 - # tensor_copy_chunks[rank].append((offset, size, 0, idx)) - # print(f"Loading tensor {name} with offset {offset} and size {size}, device {param.device}, {hex(data_ptr)}, {idx}") - - # cuda_memory_handles = get_cuda_memory_handles(memory_ptrs) - - # # for k, ptr in enumerate(device_ptrs[rank]): - # # assert hex(ptr) == hex(memory_ptrs[rank][k]), f"Memory ptrs do not match: {hex(ptr)} != {hex(memory_ptrs[rank][k])}" - # # cuda_memory_handles = { - # # rank: [ - # # get_cuda_memory_handles({rank: ptr})[rank] - # # for ptr in memory_ptrs[rank] - # # ] - # # } - - # ret = client.load_into_gpu( - # model_name, - # replica_uuid, - # {device_uuid: tensor_copy_chunks[rank]}, - # {device_uuid: cuda_memory_handles[rank]} - # ) - # if not ret or ret == False: - # raise ValueError(f"Failed to load model {model_name} into GPU") - # client.confirm_model_loaded(model_name, replica_uuid) return model @staticmethod From 4e788b0d344557578345e15efaa22ff143798a4a Mon Sep 17 00:00:00 2001 From: future-xy Date: Wed, 11 Sep 2024 20:20:35 +0100 Subject: [PATCH 09/11] feat: use save and load model api --- vllm/model_executor/model_loader/loader.py | 35 +++++++++++++++------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index c1ec03e6404..1cbb39c83d5 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -576,6 +576,7 @@ def save_model( os.path.join(path, filename), ) + class ServerlessLLMLoader(BaseModelLoader): # DEFAULT_PATTERN = "model-rank-{rank}-part-{part}.safetensors" @@ -624,7 +625,6 @@ def get_end_ptr(tensor: torch.Tensor) -> int: result[k] = t return result - def load_model(self, *, model_config: ModelConfig, device_config: DeviceConfig, lora_config: Optional[LoRAConfig], @@ -632,7 +632,7 @@ def load_model(self, *, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, cache_config: CacheConfig) -> nn.Module: - from serverless_llm_store import load_dict_single_device + from serverless_llm_store.torch import load_dict from vllm.distributed import get_tensor_model_parallel_rank assert os.path.isdir(model_config.model) @@ -641,14 +641,24 @@ def load_model(self, *, model_config: ModelConfig, local_model_path = model_config.model local_model_path = os.path.join(local_model_path, f"rank_{rank}") + + def remove_prefix(path, prefix): + # Normalize the paths to ensure consistency across different platforms + path = os.path.normpath(path) + prefix = os.path.normpath(prefix) + + # Check if the path starts with the prefix + if path.startswith(prefix): + # Return the path without the prefix + return path[len(prefix):].lstrip(os.sep) + + # Return the original path if the prefix doesn't exist + return path - # model name is everything after models - model_name = local_model_path.split("models/")[1] - storage_path = local_model_path.split("models/")[0] - if storage_path.endswith("/"): - storage_path = os.path.join(storage_path, "models") - else: - storage_path = storage_path + "models" + # vLLM needs a local model path to read model config but + # ServerlessLLM Store requires a global model path as the model ID + storage_path = os.getenv("STORAGE_PATH", "./models") + model_path = remove_prefix(local_model_path, storage_path) with set_default_torch_dtype(model_config.dtype): # with torch.device(device_config.device): @@ -666,7 +676,10 @@ def load_model(self, *, model_config: ModelConfig, param.data = torch.empty(1, device="cuda") gc.collect() - sllm_state_dict = load_dict_single_device(model_name, storage_path) + device_id = torch.cuda.current_device() + device_map = {"": device_id} + # Note: storage path is already included in the local model path + sllm_state_dict = load_dict(model_path, device_map) for key, param in model.named_parameters(recurse=True): if key in key_list: @@ -687,7 +700,7 @@ def save_model( max_size: Optional[int] = None, ) -> None: from vllm.distributed import get_tensor_model_parallel_rank - from serverless_llm_store import save_dict + from serverless_llm_store.torch import save_dict rank = get_tensor_model_parallel_rank() state_dict = ServerlessLLMLoader._filter_subtensors(model.state_dict()) From c5e6af3982210965c2cc805f82a43f7bcfc70d85 Mon Sep 17 00:00:00 2001 From: future-xy Date: Sun, 15 Sep 2024 11:27:41 +0100 Subject: [PATCH 10/11] fix: create path --- vllm/model_executor/model_loader/loader.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index 1cbb39c83d5..d46a05e83bf 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -708,6 +708,9 @@ def save_model( # move all tensors to CPU for key, tensor in state_dict.items(): state_dict[key] = tensor.cpu().contiguous() + + if not os.path.exists(path): + os.makedirs(path) save_dict(state_dict, os.path.join(path, f"rank_{rank}")) From 242ab3f17798cc348330125512aeb3bb7410bf1a Mon Sep 17 00:00:00 2001 From: future-xy Date: Sun, 15 Sep 2024 11:39:00 +0100 Subject: [PATCH 11/11] fix: rank path --- vllm/model_executor/model_loader/loader.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index d46a05e83bf..eb50af261b7 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -709,10 +709,11 @@ def save_model( for key, tensor in state_dict.items(): state_dict[key] = tensor.cpu().contiguous() - if not os.path.exists(path): - os.makedirs(path) + save_path = os.path.join(path, f"rank_{rank}") + if not os.path.exists(save_path): + os.makedirs(save_path) - save_dict(state_dict, os.path.join(path, f"rank_{rank}")) + save_dict(state_dict, save_path) class BitsAndBytesModelLoader(BaseModelLoader):