Skip to content

Conversation

@wuhang2014
Copy link
Collaborator

@wuhang2014 wuhang2014 commented Sep 15, 2025

Purpose

Test Plan

Test Result


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.

Signed-off-by: wuhang <[email protected]>
@github-actions
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

filename)
continue
ec_cache = safetensors.torch.load_file(
filename)["ec_cache"].cuda()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

npu()

Copy link
Collaborator

@hsliuustc0106 hsliuustc0106 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check my reviews

detail={"error": "Internal server error", "message": str(e)},
) from e

# import time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del

except Exception:
return False

async def check_decode():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_decode()
shall we change to check_prefill() and check_collocated_pd()

@@ -0,0 +1,36 @@
#!/usr/bin/env bash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this bash file used for?

connector_cls = cls.get_connector_class(ec_transfer_config)
logger.info("Creating v1 connector with name: %s and engine_id: %s",
connector_cls.__name__, ec_transfer_config.engine_id)
# NOTE(Kuntai): v1 connector is explicitly separated into two roles.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del

request_id: str = ""
input_ids: list[int] = None

# @staticmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del

logger.warning("Encoder cache file %s does not exist",
filename)
continue
ec_cache = safetensors.torch.load_file(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ec_cache = safetensors.torch.load_file( filename)["ec_cache"].cuda()

No cleanup mechanism for failed operations
No memory management for large encoder caches
No timeout for file operations

logger.debug(transfer_config)
logger.debug("Shared storage path is %s", self._storage_path)

def start_load_caches(self, **kwargs) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def start_load_ec(self, **kwargs) -> None:
    """Start loading the EC cache from the connector buffer to worker encoder_cache"""
    metadata: ECConnectorMetadata = self._get_connector_metadata()
    assert isinstance(metadata, ECSharedStorageConnectorMetadata)
    if metadata is None:
        raise RuntimeError("Connector metadata is None - cannot proceed with cache loading")
    
    encoder_cache = kwargs.get("encoder_cache")
    if encoder_cache is None:
        raise ValueError("encoder_cache is required in kwargs")
    
    try:
        for mm_data in metadata.mm_datas:
            for input_id in mm_data.input_ids:
                if input_id in encoder_cache.get(mm_data.request_id, {}):
                    continue
                
                filename = self._generate_filename_debug(f"{mm_data.request_id}_{input_id}")
                if not os.path.exists(filename):
                    logger.warning("Encoder cache file %s does not exist", filename)
                    continue
                
                # Add proper error handling for file operations
                try:
                    data = safetensors.torch.load_file(filename)
                    if "ec_cache" not in data:
                        logger.error("Invalid cache file format: %s", filename)
                        continue
                    
                    ec_cache = data["ec_cache"].cuda()
                    
                    if mm_data.request_id not in encoder_cache:
                        encoder_cache[mm_data.request_id] = {}
                    encoder_cache[mm_data.request_id][input_id] = ec_cache
                    
                    logger.debug("Success load encoder cache for request_id %s, input_id %d",
                                mm_data.request_id, input_id)
                except Exception as e:
                    logger.error("Failed to load cache from %s: %s", filename, e)
                    continue
    except Exception as e:
        logger.error("Error in start_load_caches: %s", e)
        raise

"Success load encoder cache for request_id %s, input_id %d",
mm_data.request_id, input_id)

def save_caches(self, **kwargs) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def save_ec(self, **kwargs) -> None:
    """Start saving the EC cache for each mm_datas from encoder cache"""
    if not self.is_producer:
        return
    
    encoder_cache = kwargs.get("encoder_cache")
    if encoder_cache is None:
        raise ValueError("encoder_cache is required in kwargs")
    
    mm_hash = kwargs.get("mm_hash")
    request_id = kwargs.get("request_id")
    input_id = kwargs.get("input_id")
    
    try:
        if mm_hash:
            if mm_hash not in encoder_cache:
                raise ValueError(f"mm_hash {mm_hash} not found in encoder_cache")
            filename = self._generate_filename_debug(mm_hash)
            ec_cache = encoder_cache[mm_hash]
        else:
            if request_id is None or input_id is None:
                raise ValueError("request_id and input_id are required when mm_hash is not provided")
            
            if request_id not in encoder_cache or input_id not in encoder_cache[request_id]:
                raise ValueError(f"Cache not found for request_id={request_id}, input_id={input_id}")
            
            filename = self._generate_filename_debug(f"{request_id}_{input_id}")
            ec_cache = encoder_cache[request_id][input_id]
        
        tensors = {"ec_cache": ec_cache.detach().cpu()}
        safetensors.torch.save_file(tensors, filename)
        
        logger.debug("Save cache successful for mm_hash=%s, request_id=%s, input_id=%s",
                    mm_hash, request_id, input_id)
    except Exception as e:
        logger.error("Failed to save cache: %s", e)
        raise

result = []
request_id = request.request_id
for input_id in range(len(request.mm_positions)):
if self._found_match_for_mm_data(f"{request_id}_{input_id}"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the input arg for this function should be mm_hash

self.mm_datas.append(MMMeta.make_mm_meta(request_id, input_ids))


class ECSharedStorageConnector(ECConnectorBase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def __del__(self):
    """Cleanup resources on destruction."""
    try:
        # Clean up any temporary files or resources
        if hasattr(self, '_temp_files'):
            for temp_file in self._temp_files:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
    except Exception as e:
        logger.warning(f"Error during cleanup: {e}")

encoder_cache[mm_data.request_id][input_id] = ec_cache
logger.debug(
"Success load encoder cache for request_id %s, input_id %d",
mm_data.request_id, input_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def __del__(self, filename):
    """Cleanup resources on destruction."""
    try:
          if os.path.exists(filename):
                 os.remove(filename)
    except Exception as e:
        logger.warning(f"Error during cleanup: {e}")

del self._cache[filename]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the delete operations for each (req_id, input_id) take long time? if so, maybe we should del files for each req_id

Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
Signed-off-by: wuhang <[email protected]>
@hsliuustc0106 hsliuustc0106 changed the title epd shared storage epd shared storage v0.10.0 Sep 30, 2025
@hsliuustc0106
Copy link
Collaborator

shall we close this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants