Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion vllm/disaggregated/disagg_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,17 @@ async def _abort_handler(self, req: GenerationRequest):
self.engine.abort(request_id=req.request_id)

async def _heartbeat_handler(self, req: HeartbeatRequest):
try:
self.engine.check_health()
status = "OK"
except Exception:
status = "FAIL"
logger.exception("Check health Failed.")

msg = (ResponseType.HEARTBEAT,
self.encoder.encode(
HeartbeatResponse(request_id=req.request_id, status="OK")))
HeartbeatResponse(request_id=req.request_id,
status=status)))
await self.to_proxy.send_multipart(msg, copy=False)

async def _generate(
Expand Down
20 changes: 20 additions & 0 deletions vllm/disaggregated/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def __init__(
health_check_interval=10,
health_threshold=3,
):
self._check_type("enable_health_monitor", enable_health_monitor, bool)
self._check_positive_int("health_check_interval",
health_check_interval)
self._check_positive_int("health_threshold", health_threshold)
self._check_subclass("router", router, RoutingInterface)

self.queues: dict[str, asyncio.Queue] = {}

self.encoder = msgspec.msgpack.Encoder()
Expand Down Expand Up @@ -498,6 +504,20 @@ async def get_vllm_config(self) -> VllmConfig:
async def reset_mm_cache(self) -> None:
raise NotImplementedError

def _check_type(self, name, value, expected_type):
if not isinstance(value, expected_type):
raise TypeError(f"{name} must be {expected_type.__name__}, ",
f"got {type(value).__name__}")

def _check_positive_int(self, name, value):
if not isinstance(value, int) or value <= 0:
raise ValueError(f"{name} must be a positive integer")

def _check_subclass(self, name, value, base_class):
if not isinstance(value, type) or not issubclass(value, base_class):
raise TypeError(
f"{name} must be a subclass of {base_class.__name__}")


def _has_mm_data(prompt: PromptType) -> bool:
if isinstance(prompt, dict):
Expand Down
2 changes: 2 additions & 0 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ async def do_log_stats(

async def check_health(self) -> None:
logger.debug("Called check_health.")
if self.errored:
raise self.dead_error

async def start_profile(self) -> None:
await self.engine_core.profile_async(True)
Expand Down
Loading