-
Notifications
You must be signed in to change notification settings - Fork 710
[AFD]AFD implementation for dsv3 #3447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a preliminary implementation of Attention/FFN Decoupling (AFD) for DeepSeek V3 models on Ascend NPUs. The changes are extensive, adding new FFN workers, model runners, and communication utilities, and modifying the core model and forward context to support the decoupled architecture. While the implementation lays the groundwork for AFD, there are several critical issues that need to be addressed, including hardcoded network configurations and a syntax error due to duplicated fields. Additionally, there are high-severity issues related to performance and best practices, such as an inefficient tensor parallelism implementation and setting environment variables within the code. These issues will need to be resolved to make the implementation robust, configurable, and performant.
ffn.py
Outdated
| #TODO:remove hard code | ||
| init_method = 'tcp://127.0.0.1:29505' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The init_method for process group initialization is hardcoded with a local IP address and port. This is also the case on lines 94 and 101. This makes the script inflexible and difficult to use in a real distributed environment. These values should be parameterized or read from a configuration file to allow for different network setups.
| new_default_group = init_process_group( | ||
| init_method='tcp://127.0.0.1:29500', | ||
| backend='gloo', | ||
| rank=rank, | ||
| world_size=world_size, | ||
| group_name="new_hccl" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| mm_features: Optional[list[MultiModalFeatureSpec]] = None | ||
| # for back-compatibility, will be removed in next major release | ||
| mm_kwargs: Optional[list[MultiModalKwargsItem]] = None | ||
| mm_positions: Optional[list[PlaceholderRange]] = None | ||
| mm_hashes: Optional[list[PlaceholderRange]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ffn.py
Outdated
| from vllm_ascend.distributed.afd_communicators import send_object,recv_object,FFNNeedForwardData | ||
|
|
||
| import os | ||
| os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting environment variables within a library or script is generally considered bad practice as it can have unintended side effects on other parts of the application or other libraries. It's better to have the user set this environment variable before running the application. Please move this to the documentation or a startup script.
| if afd_metadata: | ||
| # Padding for AFD | ||
| num_input_tokens = num_input_tokens | ||
| (num_pad_afd, afd_tokens_start_loc, | ||
| afd_tokens_lens) = self.get_afd_padding( | ||
| afd_metadata.afd_tokens_start_loc, | ||
| afd_metadata.afd_tokens_lens) | ||
| afd_metadata.afd_tokens_start_loc = afd_tokens_start_loc | ||
| afd_metadata.afd_tokens_lens = afd_tokens_lens | ||
| num_input_tokens += num_pad_afd | ||
| num_tokens_across_dp = None | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| tp_world_size = get_tensor_model_parallel_world_size() | ||
| if tp_world_size > 1: | ||
| # All-gather hidden states from all TP ranks | ||
| gathered_hidden_states = tensor_model_parallel_all_gather( | ||
| hidden_states, dim=0) | ||
| ffn_output = self.model.compute_ffn_output(current_layer_idx, | ||
| gathered_hidden_states) | ||
| # Extract the output corresponding to current rank | ||
| start_idx = hidden_states.shape[ | ||
| 0] * get_tensor_model_parallel_rank() | ||
| end_idx = start_idx + hidden_states.shape[0] | ||
| rank_ffn_output = ffn_output[start_idx:end_idx, :] | ||
| else: | ||
| # Single TP case | ||
| rank_ffn_output = self.model.compute_ffn_output( | ||
| current_layer_idx, hidden_states) | ||
|
|
||
| return rank_ffn_output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling tensor parallelism in _execute_eager_mode seems inefficient. It performs an all_gather on the input hidden_states, then computes the FFN output (which likely involves an all_reduce in the final RowParallelLinear layer), and finally manually slices the output. This results in redundant communication (all_gather followed by all_reduce). A more efficient approach would be to use a reduce_scatter operation in the final layer of the FFN computation, which would directly produce the sliced output for each rank. This would avoid the unnecessary all_gather and the manual slicing. Since this is a hot path, this inefficiency could significantly impact performance.
|
👋 Hi! Thank you for contributing to the vLLM Ascend project. The following points will speed up your PR merge:
If CI fails, you can run linting and testing checks locally according Contributing and Testing. |
|
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
422cc62 to
1ef3c29
Compare
| def is_moe_weight(self,name): | ||
| if "shared_experts" in name or "experts" in name or "gate" in name \ | ||
| or "up" in name or "down" in name: | ||
| return True | ||
| return False | ||
|
|
||
| def is_common_weight(self,name): | ||
| if "lm_head" in name or "model.norm.weight" in name or "embed_tokens" in name \ | ||
| or "input_layernorm" in name or "post_attention_layernorm" in name: | ||
| return True | ||
| return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods can move to another file, such as xxx_utils.py.
|
Looking through the code, I noticed this import statement: from vllm.distributed.afd_transfer.afd_connector import (
AFDConnectorBase, AFDConnectorFactory, AFDConnectorMetadata
)However, I couldn't find the afd_transfer directory or module in the repository. Specifically, I'm trying to locate where AFDConnectorFactory is implemented. Could you please point me to where this factory class is defined? |
|
We won't allow to change model files any more. Please use another way or contribute to vLLM directly. |
What this PR does / why we need it?
This PR corresponds to the RFC vllm-project#22799 and a follow-up PR of vllm-project#25162 and Oliver-ss/vllm#2
This is the preliminary implementation of DeepSeek V2 Lite AFD for Ascend. It currently only supports the P2P connector and the DeepSeek V2 Lite model.
Later, we are going to support the following features:
How was this patch tested?
use the following script for testing:
online_attn.sh
ffn.sh