Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/_e2e_nightly_multi_node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ jobs:
pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
pip install jinja2-cli

#apt-get update -y && apt-get install -y git curl

- name: Install kubectl
run: |
# Install kubectl
Expand All @@ -112,6 +110,8 @@ jobs:
run: |
# prepare for lws entrypoint scripts
install -D tests/e2e/nightly/multi_node/scripts/run.sh /root/.cache/tests/run.sh
# clear log directory
rm -fr $RESULT_FILE

- name: Clear resources
run: |
Expand Down Expand Up @@ -263,5 +263,5 @@ jobs:
- name: Post process
if: always()
run: |
kubectl get pods -n $NAMESPACE
kubectl delete -f ./lws.yaml
kubectl get pods -n $NAMESPACE --ignore-not-found=true
kubectl delete -f ./lws.yaml --ignore-not-found=true || true
6 changes: 0 additions & 6 deletions .github/workflows/vllm_ascend_test_nightly_a3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ jobs:
- name: multi-node-qwenw8a8-2node
config_file_path: Qwen3-235B-W8A8.yaml
size: 2
- name: multi-node-glm-2node
config_file_path: GLM-4_5.yaml
size: 2
- name: multi-node-dpsk3.2-exp-2node
config_file_path: DeepSeek-V3_2-Exp-bf16.yaml
size: 2
Expand Down Expand Up @@ -134,9 +131,6 @@ jobs:
- name: deepseek3_2-exp-w8a8
os: linux-aarch64-a3-16
tests: tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py
- name: glm-4-5
os: linux-aarch64-a3-16
tests: tests/e2e/nightly/models/test_glm4_5.py
uses: ./.github/workflows/_e2e_nightly_single_node.yaml
with:
vllm: v0.12.0
Expand Down
1 change: 1 addition & 0 deletions examples/disaggregated_prefill_v1/gen_ranktable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ if [ -n "$LOCAL_DEVICE_IDS" ]; then
fi

if [[ -n "${GEN_RANKTABLE}" || ! -e ${PWD}/ranktable.json ]]; then
timeout 180s \
GLOO_SOCKET_IFNAME=$NETWORK_CARD_NAME torchrun \
--nproc_per_node 1 \
--nnodes ${NNODES} \
Expand Down
115 changes: 80 additions & 35 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import contextlib
import gc
import json
import logging
import os
import shlex
import subprocess
Expand All @@ -35,6 +36,7 @@
import torch
from modelscope import snapshot_download # type: ignore[import-untyped]
from PIL import Image
from requests.exceptions import RequestException
from torch import nn
from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer,
BatchEncoding, BatchFeature)
Expand Down Expand Up @@ -70,6 +72,7 @@
PromptImageInput = _PromptMultiModalInput[Image.Image]
PromptAudioInput = _PromptMultiModalInput[Tuple[np.ndarray, int]]
PromptVideoInput = _PromptMultiModalInput[np.ndarray]
logger = logging.getLogger(__name__)

_TEST_DIR = os.path.dirname(__file__)

Expand Down Expand Up @@ -161,22 +164,17 @@ def __init__(self,
max_wait_seconds = max_wait_seconds or 1800
if self.disaggregated_prefill:
assert proxy_port is not None, "for disaggregated_prefill, proxy port must be provided"
self._wait_for_server_pd(proxy_port=proxy_port,
timeout=max_wait_seconds)
self._wait_for_server_pd(timeout=max_wait_seconds)
else:
self._wait_for_server(url=self.url_for("health"),
timeout=max_wait_seconds)
self._wait_for_multiple_servers(
[(self.host, self.url_for("health"))],
timeout=max_wait_seconds)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.proc.terminate()
try:
self.proc.wait(8)
except subprocess.TimeoutExpired:
# force kill if needed
self.proc.kill()
self._terminate_server()

def _poll(self) -> Optional[int]:
"""Subclasses override this method to customize process polling"""
Expand All @@ -201,48 +199,95 @@ def hang_until_terminated(self, url) -> None:
finally:
if isinstance(client, httpx.Client):
client.close()
self._terminate_server()

def _wait_for_server_pd(self, proxy_port: int, timeout: float):
def _wait_for_server_pd(self, timeout: float):
# Wait for all api_server nodes ready
assert self.nodes_info is not None, "cluster info must be provided"
for node_info in self.nodes_info:
if node_info.headless:
continue
proxy_port = self.proxy_port

url_health = f"http://{node_info.ip}:{node_info.server_port}/health"
self._wait_for_server(url=url_health, timeout=timeout)
def url_health(ip: str, port: int) -> str:
return f"http://{ip}:{port}/health"

targets = [(node_info.ip,
url_health(node_info.ip, node_info.server_port))
for node_info in self.nodes_info if not node_info.headless]

# Wait for proxy ready
master_node = self.nodes_info[0]
url_proxy = f"http://{master_node.ip}:{proxy_port}/healthcheck"
self._wait_for_server(url=url_proxy, timeout=timeout)

def _wait_for_server(self, *, url: str, timeout: float):
# run health check
# Wait for master node proxy first
self._wait_for_multiple_servers([(master_node.ip, url_proxy)],
timeout=timeout)

# Then wait for all api_server nodes
self._wait_for_multiple_servers(targets=targets, timeout=timeout)

def _wait_for_multiple_servers(self, targets, timeout: float):
"""
targets: List[(node_ip, url)]
"""
start = time.time()
client = requests

# track readiness
ready = {node_ip: False for node_ip, _ in targets}

# polling loop
while True:
try:
if client.get(url).status_code == 200:
break
except Exception:
# this exception can only be raised by requests.get,
# which means the server is not ready yet.
# the stack trace is not useful, so we suppress it
# by using `raise from None`.
result = self._poll()
if result is not None and result != 0:
raise RuntimeError("Server exited unexpectedly.") from None

time.sleep(5)
if time.time() - start > timeout:
raise RuntimeError(
"Server failed to start in time.") from None
all_ready = True

for node_ip, url in targets:
if ready[node_ip]:
continue # already ready

try:
resp = client.get(url)
if resp.status_code == 200:
ready[node_ip] = True
logger.info(f"[READY] Node {node_ip} is ready.")
else:
all_ready = False
logger.info(f"[WAIT] {url}: HTTP {resp.status_code}")
except RequestException:
all_ready = False
logger.info(f"[WAIT] {url}: connection failed")

# underlying process died?
result = self._poll()
if result is not None and result != 0:
raise RuntimeError(
f"Server at {node_ip} exited unexpectedly."
) from None

# if all nodes ready, exit
if all_ready:
break

# check timeout
if time.time() - start > timeout:
not_ready_nodes = [n for n, ok in ready.items() if not ok]
self._terminate_server()
raise RuntimeError(
f"Timeout: these nodes did not become ready: {not_ready_nodes}"
) from None

time.sleep(5)

@property
def url_root(self) -> str:
return f"http://{self.host}:{self.port}"

def _terminate_server(self) -> None:
"""Subclasses override this method to customize server process termination"""
self.proc.terminate()
try:
self.proc.wait(8)
except subprocess.TimeoutExpired:
# force kill if needed
self.proc.kill()

def url_for(self, *parts: str) -> str:
return self.url_root + "/" + "/".join(parts)

Expand Down
50 changes: 0 additions & 50 deletions tests/e2e/nightly/multi_node/config/models/GLM-4_5.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions tests/e2e/nightly/multi_node/config/multi_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def __str__(self):
return (f"NodeInfo:\n"
f" index={self.index}\n"
f" ip={self.ip}\n"
f" server_port={self.server_port}\n"
f" headless={self.headless}")
f" headless={self.headless}\n"
f" server_port={self.server_port}")


class MultiNodeConfig:
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/nightly/multi_node/scripts/lws.yaml.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ spec:
- name: VLLM_ASCEND_REMOTE_URL
value: {{ vllm_ascend_remote_url | default("https://github.com/vllm-project/vllm-ascend.git") }}
- name: RESULT_FILE_PATH
value: {{ result_file_path | default("/root/.cache/tests/ret/test_result.txt") }}
value: {{ result_file_path | default("/root/.cache/tests/ret") }}
- name: FAIL_TAG
value: {{ fail_tag | default("FAIL_TAG") }}
command:
Expand Down
9 changes: 2 additions & 7 deletions tests/e2e/nightly/multi_node/scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,14 @@ kill_npu_processes() {
run_tests_with_log() {
set +e
kill_npu_processes
BASENAME=$(basename "$CONFIG_YAML_PATH" .yaml)
# each worker should have log file
LOG_FILE="${RESULT_FILE_PATH}/${BASENAME}_worker_${LWS_WORKER_INDEX}.log"
mkdir -p ${RESULT_FILE_PATH}
pytest -sv tests/e2e/nightly/multi_node/test_multi_node.py 2>&1 | tee $LOG_FILE
ret=${PIPESTATUS[0]}
pytest -sv tests/e2e/nightly/multi_node/test_multi_node.py
ret=$?
set -e
if [ "$LWS_WORKER_INDEX" -eq 0 ]; then
if [ $ret -eq 0 ]; then
print_success "All tests passed!"
else
print_failure "Some tests failed!"
mv LOG_FILE error_${LOG_FILE}
fi
fi
}
Expand Down
Loading