Skip to content

Commit c5852e9

Browse files
committed
[Feat] Support record operations
1 parent 0986b89 commit c5852e9

File tree

3 files changed

+334
-0
lines changed

3 files changed

+334
-0
lines changed

ucm/integration/vllm/ucm_connector.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from vllm.v1.request import Request
1919

2020
from ucm.logger import init_logger
21+
from ucm.shared.metrics.async_op_db import AsyncUCMOperDB
2122
from ucm.store.factory import UcmConnectorFactory
2223
from ucm.store.ucmstore import Task, UcmKVStoreBase
2324
from ucm.utils import Config
@@ -159,6 +160,8 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
159160
config["kv_block_size"] / 1024 / 1024,
160161
config["io_size"] / 1024,
161162
)
163+
if role == KVConnectorRole.WORKER and get_tp_group().is_first_rank:
164+
self.oper_db = AsyncUCMOperDB()
162165

163166
def generate_hash(self, block_size: int, request: "Request") -> list[str]:
164167
token_ids = request.all_token_ids
@@ -465,6 +468,14 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
465468
request_to_task[request_id] = self.store.load(
466469
ucm_total_block_ids, ucm_offsets, dst_tensor_addr
467470
)
471+
if self.oper_db:
472+
self.oper_db.log_operation(
473+
{
474+
"op_type": "load",
475+
"blocks": ucm_block_ids,
476+
"block_size": self.block_size,
477+
}
478+
)
468479
else:
469480
request_to_task[request_id] = None
470481
req_broadcast_addr[request_id] = dst_tensor_addr
@@ -527,6 +538,14 @@ def wait_for_save(self) -> None:
527538
request_to_task[request_id] = self.store.dump(
528539
ucm_total_block_ids, ucm_offsets, dst_tensor_addr
529540
)
541+
if self.oper_db:
542+
self.oper_db.log_operation(
543+
{
544+
"op_type": "dump",
545+
"blocks": ucm_block_ids,
546+
"block_size": self.block_size,
547+
}
548+
)
530549
request_to_blocks[request_id] = ucm_block_ids
531550

532551
for request_id, task in request_to_task.items():

ucm/shared/metrics/async_op_db.py

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
import gzip
2+
import json
3+
import os
4+
import queue
5+
import shutil
6+
import threading
7+
import time
8+
from datetime import timedelta
9+
from pathlib import Path
10+
from typing import Any, Dict
11+
12+
import yaml
13+
14+
from ucm.logger import init_logger
15+
16+
logger = init_logger(__name__)
17+
18+
19+
class AsyncUCMOperDB:
20+
"""Asynchronous UCM operation log writer (supports log compression + auto-cleanup)"""
21+
22+
def _load_config(self, config_path: str) -> Dict[str, Any]:
23+
"""Load configuration from YAML file"""
24+
try:
25+
with open(config_path, "r") as f:
26+
config = yaml.safe_load(f)
27+
if config is None:
28+
logger.warning(
29+
f"Config file {config_path} is empty, using defaults"
30+
)
31+
return {}
32+
return config
33+
except FileNotFoundError:
34+
logger.warning(f"Config file {config_path} not found, using defaults")
35+
return {}
36+
except yaml.YAMLError as e:
37+
logger.error(f"Error parsing YAML config file {config_path}: {e}")
38+
return {}
39+
40+
def __init__(
41+
self,
42+
config_path: str = "",
43+
):
44+
if not config_path:
45+
current_file_path = Path(__file__)
46+
config_path = current_file_path.parent / "metrics_configs.yaml"
47+
config = self._load_config(config_path)
48+
oper_config = config.get("operation_db", {})
49+
if not oper_config:
50+
logger.warning(
51+
f"Find no operation db configuration in config_path {config_path}, will use default configurations instead!"
52+
)
53+
54+
# Basic configuration
55+
self.log_dir = Path(oper_config.get("log_dir", "/vllm-workspace/ucm_logs"))
56+
self.log_name = oper_config.get("log_name", "ucm_operation")
57+
self.max_file_size = oper_config.get("max_file_size", 104857600)
58+
self.batch_size = oper_config.get("batch_size", 100)
59+
self.flush_interval = oper_config.get("flush_interval", 5.0)
60+
self.encoding = oper_config.get("encoding", "utf-8")
61+
62+
# Compression configuration
63+
self.compress_rotated = oper_config.get("compress_rotated", True)
64+
self.compress_level = oper_config.get(
65+
"compress_level", 6
66+
) # gzip compression level (1-9)
67+
68+
# Auto-cleanup configuration (cleanup when any condition is met)
69+
self.max_log_files = oper_config.get("max_log_files", 30)
70+
self.max_log_days = oper_config.get("max_log_days", 7)
71+
self.max_log_total_size = oper_config.get("max_log_total_size", 1073741824)
72+
73+
# Initialize log directory
74+
self.log_dir.mkdir(parents=True, exist_ok=True)
75+
76+
# Thread-safe queue
77+
self.log_queue = queue.Queue(maxsize=10000) # Max cache: 10000 entries
78+
self.is_running = oper_config.get("enabled", False)
79+
self.current_file_path = self.log_dir / f"{self.log_name}.log"
80+
self.latest_compress_file = self.log_dir / f"{self.log_name}.log.gz"
81+
self.batch_buffer = []
82+
83+
# Start background thread (write + compression + cleanup)
84+
self.write_thread = threading.Thread(target=self._async_write_loop, daemon=True)
85+
self.write_thread.start()
86+
87+
def _get_compress_files(self) -> list[Path]:
88+
"""Get all compressed logs"""
89+
compress_files = []
90+
91+
if self.latest_compress_file.exists():
92+
compress_files.append((self.latest_compress_file, 0))
93+
94+
compress_pattern = f"{self.log_name}.log.gz.*"
95+
for file in self.log_dir.glob(compress_pattern):
96+
suffix = file.suffixes[-1].lstrip(".")
97+
if suffix.isdigit():
98+
compress_files.append((file, int(suffix)))
99+
return compress_files
100+
101+
def _roll_old_compress(self) -> None:
102+
"""Handle old compressed logs"""
103+
if not self.latest_compress_file.exists():
104+
return
105+
all_compress = self._get_compress_files()
106+
current_count = len(all_compress)
107+
108+
if current_count >= self.max_log_files:
109+
oldest_file = all_compress[-1]
110+
oldest_file.unlink(missing_ok=True)
111+
all_compress = self._get_compress_files()
112+
113+
numbered_files = [f for f, s in all_compress if s > 0]
114+
115+
for file in reversed(numbered_files):
116+
current_suffix = int(file.suffixes[-1].lstrip("."))
117+
new_suffix = current_suffix + 1
118+
new_file = file.with_suffix(f".{new_suffix}")
119+
try:
120+
file.rename(new_file)
121+
122+
except PermissionError as e:
123+
logger.error(
124+
f"Failed to rename compressed log file from {file} to {new_file}"
125+
)
126+
continue
127+
128+
old_compress_new_name = self.latest_compress_file.with_suffix(f".gz.1")
129+
try:
130+
self.latest_compress_file.rename(old_compress_new_name)
131+
except PermissionError as e:
132+
logger.error(
133+
f"Failed to rename compressed log file from {self.latest_compress_file} to {old_compress_new_name}"
134+
)
135+
raise
136+
137+
def _compress_file(self, source_file: Path) -> None:
138+
"""Compress specified log file (gzip), delete original after successful compression"""
139+
if not source_file.exists() or not self.compress_rotated:
140+
return
141+
142+
try:
143+
self._roll_old_compress()
144+
145+
with source_file.open("rb") as f_in, gzip.open(
146+
self.latest_compress_file, "wb", compresslevel=self.compress_level
147+
) as f_out:
148+
shutil.copyfileobj(f_in, f_out)
149+
150+
source_file.unlink(missing_ok=True)
151+
except Exception as e:
152+
logger.error(f"Failed to compress db log file! Exception {e}")
153+
154+
def _list_log_files(self) -> list[Path]:
155+
"""Get all log files (including uncompressed .log and compressed .log.gz), sorted by creation time ascending"""
156+
log_files = list(self.log_dir.glob(f"{self.log_name}.log")) + list(
157+
self.log_dir.glob(f"{self.log_name}.log.gz*")
158+
)
159+
# Sort by file creation time (oldest → newest)
160+
log_files.sort(key=lambda x: x.stat().st_ctime)
161+
return log_files
162+
163+
def _clean_old_logs(self) -> None:
164+
"""Auto-clean old logs (delete when any condition is met)"""
165+
log_files = self._list_log_files()
166+
if not log_files:
167+
return
168+
169+
# Condition 1: Clean by file count (retain newest N files)
170+
if self.max_log_files and len(log_files) > self.max_log_files:
171+
files_to_delete = log_files[: -self.max_log_files] # Delete oldest files
172+
for file in files_to_delete:
173+
self._delete_log_file(file)
174+
return # Stop after meeting one condition to avoid duplicate deletion
175+
176+
# Condition 2: Clean by time (retain logs from recent N days)
177+
if self.max_log_days:
178+
expire_time = (
179+
time.time() - timedelta(days=self.max_log_days).total_seconds()
180+
)
181+
files_to_delete = [f for f in log_files if f.stat().st_ctime < expire_time]
182+
for file in files_to_delete:
183+
self._delete_log_file(file)
184+
return
185+
186+
# Condition 3: Clean by total size (total size not exceeding M)
187+
if self.max_log_total_size:
188+
total_size = sum(f.stat().st_size for f in log_files)
189+
if total_size > self.max_log_total_size:
190+
# Delete oldest files first until total size meets requirement
191+
current_total = total_size
192+
for file in log_files:
193+
if current_total <= self.max_log_total_size:
194+
break
195+
current_total -= file.stat().st_size
196+
self._delete_log_file(file)
197+
198+
def _delete_log_file(self, file_path: Path) -> None:
199+
"""Delete log file (fault-tolerant handling)"""
200+
try:
201+
file_path.unlink()
202+
except Exception as e:
203+
logger.error(f"Failed to delete log {file_path.name}: {str(e)}")
204+
205+
def _check_file_rotate(self) -> None:
206+
"""Check file size, trigger rotation + compression + cleanup if needed"""
207+
if (
208+
not os.path.exists(self.current_file_path)
209+
or os.path.getsize(self.current_file_path) < self.max_file_size
210+
):
211+
return
212+
213+
# 1. Generate new file path (rotation)
214+
rotate_file = self.current_file_path.with_suffix(f".1")
215+
rotate_file.unlink(missing_ok=True)
216+
self.current_file_path.rename(rotate_file)
217+
218+
# 3. Async compress old file (non-blocking write)
219+
threading.Thread(
220+
target=self._compress_file, args=(rotate_file,), daemon=True
221+
).start()
222+
# 4. Auto-clean old logs
223+
threading.Thread(target=self._clean_old_logs, daemon=True).start()
224+
225+
def _flush_batch(self) -> None:
226+
"""Batch write logs to file"""
227+
if not self.batch_buffer:
228+
return
229+
230+
self._check_file_rotate() # Check if rotation is needed before writing
231+
232+
try:
233+
with open(self.current_file_path, "a", encoding=self.encoding) as f:
234+
for log_entry in self.batch_buffer:
235+
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
236+
self.batch_buffer.clear()
237+
except Exception as e:
238+
logger.error(f"Failed to write logs: {str(e)}")
239+
240+
def _async_write_loop(self) -> None:
241+
"""Background write loop"""
242+
last_flush_time = time.time()
243+
while self.is_running:
244+
try:
245+
# Get log from queue (1 second timeout)
246+
log_entry = self.log_queue.get(timeout=1.0)
247+
self.batch_buffer.append(log_entry)
248+
249+
# Flush if conditions are met
250+
if (
251+
len(self.batch_buffer) >= self.batch_size
252+
or (last_flush_time - last_flush_time) >= self.flush_interval
253+
):
254+
self._flush_batch()
255+
last_flush_time = time.time()
256+
257+
self.log_queue.task_done()
258+
except queue.Empty:
259+
# Check forced flush and cleanup when queue is empty
260+
if (time.time() - last_flush_time) >= self.flush_interval:
261+
self._flush_batch()
262+
last_flush_time = time.time()
263+
# Periodic cleanup (check every 30 seconds)
264+
if int(time.time() - last_flush_time) % 30 == 0:
265+
self._clean_old_logs()
266+
except Exception as e:
267+
logger.error(f"Log thread exception: {str(e)}")
268+
269+
def log_operation(self, operation_data: Dict[str, Any]) -> None:
270+
"""Record operation log (non-blocking)"""
271+
if self.is_running:
272+
return
273+
274+
default_data = {
275+
"timestamp": time.time(),
276+
"op_type": "None",
277+
}
278+
log_entry = {**default_data, **operation_data}
279+
280+
try:
281+
self.log_queue.put_nowait(log_entry)
282+
except queue.Full:
283+
logger.error(
284+
f"Log queue is full, dropping one log: {log_entry.get('request_id')}"
285+
)
286+
287+
def stop(self) -> None:
288+
"""Stop logger (ensure data writing + compression)"""
289+
self.is_running = False
290+
self.write_thread.join(timeout=10.0)
291+
self._flush_batch() # Flush remaining data
292+
293+
# Final cleanup of old logs
294+
self._clean_old_logs()
295+
296+
def __del__(self):
297+
self.stop()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Prometheus Metrics Configuration
2+
# This file defines which metrics should be enabled and their configurations
3+
operation_db:
4+
enabled: True
5+
6+
log_dir: "/vllm-workspace/ucm_logs"
7+
log_name: "ucm_operation"
8+
max_file_size: 104857600 # Max single file size: 100MB (100 * 1024 * 1024 triggers rotation)
9+
batch_size: 100 # Batch write threshold
10+
flush_interval: 5.0 # Force flush interval (5 seconds)
11+
encoding: "utf-8"
12+
# Compression related configuration
13+
compress_rotated: True # Enable rotated file compression (gzip by default)
14+
compress_level: 6 # Compression level (1=fastest, 9=smallest, 6=balanced)
15+
# Auto-cleanup related configuration
16+
max_log_files: 30 # Max number of log files to retain (including compressed)
17+
max_log_days: 7 # Max days to retain logs
18+
max_log_total_size: 1073741824 # Max total log size: 1GB (1024 * 1024 * 1024)

0 commit comments

Comments
 (0)