Skip to content

Commit e956a06

Browse files
committed
[Fix] remove files
1 parent 271e2b6 commit e956a06

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed

ucm/integration/vllm/ucm_connector.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
import hashlib
22
import itertools
3+
import json
34
import os
45
import pickle
56
from dataclasses import dataclass, field
7+
import queue
8+
import threading
9+
import time
610
from typing import TYPE_CHECKING, Callable, List, Optional
711

12+
from sympy import Dict
813
import torch
14+
from transformers import Any
915
from vllm.config import VllmConfig
1016
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
1117
KVConnectorBase_V1,
@@ -159,6 +165,64 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
159165
config["kv_block_size"] / 1024 / 1024,
160166
config["io_size"] / 1024,
161167
)
168+
self.record_oper: bool = self.launch_config.get("record_oper", False)
169+
if self.record_oper:
170+
self.write_thread = threading.Thread(target=self._async_record_loop, daemon=True)
171+
self.write_thread.start()
172+
173+
def log_operation(self, operation_data: Dict[str, Any]) -> None:
174+
"""Record operation log (non-blocking)"""
175+
176+
default_data = {
177+
"timestamp": time.time(),
178+
"op_type": "None",
179+
"block_size": self.block_size
180+
}
181+
log_entry = {**default_data, **operation_data}
182+
183+
try:
184+
self.log_queue.put_nowait(log_entry)
185+
except queue.Full:
186+
logger.error(
187+
f"Log queue is full, dropping one log: {log_entry.get('request_id')}"
188+
)
189+
190+
def _async_record_loop(self):
191+
self.log_queue = queue.Queue(maxsize=10000) # Max cache: 10000 entries
192+
log_path = self.launch_config.get("record_oper_path", "/vllm-workspace/ucm_logs")
193+
batch_size = self.launch_config.get("record_oper_batch_size", 100)
194+
flush_interval = self.launch_config.get("record_oper_flush_interval", 5.0)
195+
batch_buffer = []
196+
last_flush_time = time.time()
197+
while True:
198+
try:
199+
# Get log from queue (1 second timeout)
200+
is_flush = False
201+
current_time = time.time()
202+
log_entry = self.log_queue.get(timeout=1.0)
203+
batch_buffer.append(log_entry)
204+
205+
# Flush if conditions are met
206+
if (
207+
len(batch_buffer) >= batch_size
208+
or (current_time - last_flush_time) >= flush_interval
209+
):
210+
is_flush = True
211+
last_flush_time = current_time
212+
self.log_queue.task_done()
213+
except queue.Empty:
214+
if (current_time - last_flush_time) >= flush_interval:
215+
last_flush_time = current_time
216+
except Exception as e:
217+
logger.error(f"Log thread exception: {str(e)}")
218+
219+
if is_flush:
220+
with open(log_path, "a", encoding="utf-8") as f:
221+
for log_entry in self.batch_buffer:
222+
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
223+
batch_buffer.clear()
224+
225+
162226

163227
def generate_hash(self, block_size: int, request: "Request") -> list[str]:
164228
token_ids = request.all_token_ids
@@ -465,6 +529,13 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
465529
request_to_task[request_id] = self.store.load(
466530
ucm_total_block_ids, ucm_offsets, dst_tensor_addr
467531
)
532+
if self.record_oper:
533+
self.log_operation(
534+
{
535+
"op_type": "load",
536+
"blocks": ucm_block_ids,
537+
}
538+
)
468539
else:
469540
request_to_task[request_id] = None
470541
req_broadcast_addr[request_id] = dst_tensor_addr
@@ -527,6 +598,13 @@ def wait_for_save(self) -> None:
527598
request_to_task[request_id] = self.store.dump(
528599
ucm_total_block_ids, ucm_offsets, dst_tensor_addr
529600
)
601+
if self.record_oper:
602+
self.log_operation(
603+
{
604+
"op_type": "dump",
605+
"blocks": ucm_block_ids,
606+
}
607+
)
530608
request_to_blocks[request_id] = ucm_block_ids
531609

532610
for request_id, task in request_to_task.items():

0 commit comments

Comments
 (0)