|
| 1 | +import gzip |
| 2 | +import json |
| 3 | +import os |
| 4 | +import queue |
| 5 | +import shutil |
| 6 | +import threading |
| 7 | +import time |
| 8 | +from datetime import timedelta |
| 9 | +from pathlib import Path |
| 10 | +from typing import Any, Dict |
| 11 | + |
| 12 | +import yaml |
| 13 | + |
| 14 | +from ucm.logger import init_logger |
| 15 | + |
| 16 | +logger = init_logger(__name__) |
| 17 | + |
| 18 | + |
| 19 | +class AsyncUCMOperDB: |
| 20 | + """Asynchronous UCM operation log writer (supports log compression + auto-cleanup)""" |
| 21 | + |
| 22 | + def _load_config(self, config_path: str) -> Dict[str, Any]: |
| 23 | + """Load configuration from YAML file""" |
| 24 | + try: |
| 25 | + with open(config_path, "r") as f: |
| 26 | + config = yaml.safe_load(f) |
| 27 | + if config is None: |
| 28 | + logger.warning( |
| 29 | + f"Config file {config_path} is empty, using defaults" |
| 30 | + ) |
| 31 | + return {} |
| 32 | + return config |
| 33 | + except FileNotFoundError: |
| 34 | + logger.warning(f"Config file {config_path} not found, using defaults") |
| 35 | + return {} |
| 36 | + except yaml.YAMLError as e: |
| 37 | + logger.error(f"Error parsing YAML config file {config_path}: {e}") |
| 38 | + return {} |
| 39 | + |
| 40 | + def __init__( |
| 41 | + self, |
| 42 | + config_path: str = "", |
| 43 | + ): |
| 44 | + if not config_path: |
| 45 | + current_file_path = Path(__file__) |
| 46 | + config_path = current_file_path.parent / "metrics_configs.yaml" |
| 47 | + config = self._load_config(config_path) |
| 48 | + oper_config = config.get("operation_db", {}) |
| 49 | + if not oper_config: |
| 50 | + logger.warning( |
| 51 | + f"Find no operation db configuration in config_path {config_path}, will use default configurations instead!" |
| 52 | + ) |
| 53 | + |
| 54 | + # Basic configuration |
| 55 | + self.log_dir = Path(oper_config.get("log_dir", "/vllm-workspace/ucm_logs")) |
| 56 | + self.log_name = oper_config.get("log_name", "ucm_operation") |
| 57 | + self.max_file_size = oper_config.get("max_file_size", 104857600) |
| 58 | + self.batch_size = oper_config.get("batch_size", 100) |
| 59 | + self.flush_interval = oper_config.get("flush_interval", 5.0) |
| 60 | + self.encoding = oper_config.get("encoding", "utf-8") |
| 61 | + |
| 62 | + # Compression configuration |
| 63 | + self.compress_rotated = oper_config.get("compress_rotated", True) |
| 64 | + self.compress_level = oper_config.get( |
| 65 | + "compress_level", 6 |
| 66 | + ) # gzip compression level (1-9) |
| 67 | + |
| 68 | + # Auto-cleanup configuration (cleanup when any condition is met) |
| 69 | + self.max_log_files = oper_config.get("max_log_files", 30) |
| 70 | + self.max_log_days = oper_config.get("max_log_days", 7) |
| 71 | + self.max_log_total_size = oper_config.get("max_log_total_size", 1073741824) |
| 72 | + |
| 73 | + # Initialize log directory |
| 74 | + self.log_dir.mkdir(parents=True, exist_ok=True) |
| 75 | + |
| 76 | + # Thread-safe queue |
| 77 | + self.log_queue = queue.Queue(maxsize=10000) # Max cache: 10000 entries |
| 78 | + self.is_running = oper_config.get("enabled", False) |
| 79 | + self.current_file_path = self.log_dir / f"{self.log_name}.log" |
| 80 | + self.latest_compress_file = self.log_dir / f"{self.log_name}.log.gz" |
| 81 | + self.batch_buffer = [] |
| 82 | + |
| 83 | + # Start background thread (write + compression + cleanup) |
| 84 | + self.write_thread = threading.Thread(target=self._async_write_loop, daemon=True) |
| 85 | + self.write_thread.start() |
| 86 | + |
| 87 | + def _get_compress_files(self) -> list[Path]: |
| 88 | + """Get all compressed logs""" |
| 89 | + compress_files = [] |
| 90 | + |
| 91 | + if self.latest_compress_file.exists(): |
| 92 | + compress_files.append((self.latest_compress_file, 0)) |
| 93 | + |
| 94 | + compress_pattern = f"{self.log_name}.log.gz.*" |
| 95 | + for file in self.log_dir.glob(compress_pattern): |
| 96 | + suffix = file.suffixes[-1].lstrip(".") |
| 97 | + if suffix.isdigit(): |
| 98 | + compress_files.append((file, int(suffix))) |
| 99 | + return compress_files |
| 100 | + |
| 101 | + def _roll_old_compress(self) -> None: |
| 102 | + """Handle old compressed logs""" |
| 103 | + if not self.latest_compress_file.exists(): |
| 104 | + return |
| 105 | + all_compress = self._get_compress_files() |
| 106 | + current_count = len(all_compress) |
| 107 | + |
| 108 | + if current_count >= self.max_log_files: |
| 109 | + oldest_file = all_compress[-1] |
| 110 | + oldest_file.unlink(missing_ok=True) |
| 111 | + all_compress = self._get_compress_files() |
| 112 | + |
| 113 | + numbered_files = [f for f, s in all_compress if s > 0] |
| 114 | + |
| 115 | + for file in reversed(numbered_files): |
| 116 | + current_suffix = int(file.suffixes[-1].lstrip(".")) |
| 117 | + new_suffix = current_suffix + 1 |
| 118 | + new_file = file.with_suffix(f".{new_suffix}") |
| 119 | + try: |
| 120 | + file.rename(new_file) |
| 121 | + |
| 122 | + except PermissionError as e: |
| 123 | + logger.error( |
| 124 | + f"Failed to rename compressed log file from {file} to {new_file}" |
| 125 | + ) |
| 126 | + continue |
| 127 | + |
| 128 | + old_compress_new_name = self.latest_compress_file.with_suffix(f".gz.1") |
| 129 | + try: |
| 130 | + self.latest_compress_file.rename(old_compress_new_name) |
| 131 | + except PermissionError as e: |
| 132 | + logger.error( |
| 133 | + f"Failed to rename compressed log file from {self.latest_compress_file} to {old_compress_new_name}" |
| 134 | + ) |
| 135 | + raise |
| 136 | + |
| 137 | + def _compress_file(self, source_file: Path) -> None: |
| 138 | + """Compress specified log file (gzip), delete original after successful compression""" |
| 139 | + if not source_file.exists() or not self.compress_rotated: |
| 140 | + return |
| 141 | + |
| 142 | + try: |
| 143 | + self._roll_old_compress() |
| 144 | + |
| 145 | + with source_file.open("rb") as f_in, gzip.open( |
| 146 | + self.latest_compress_file, "wb", compresslevel=self.compress_level |
| 147 | + ) as f_out: |
| 148 | + shutil.copyfileobj(f_in, f_out) |
| 149 | + |
| 150 | + source_file.unlink(missing_ok=True) |
| 151 | + except Exception as e: |
| 152 | + logger.error(f"Failed to compress db log file! Exception {e}") |
| 153 | + |
| 154 | + def _list_log_files(self) -> list[Path]: |
| 155 | + """Get all log files (including uncompressed .log and compressed .log.gz), sorted by creation time ascending""" |
| 156 | + log_files = list(self.log_dir.glob(f"{self.log_name}.log")) + list( |
| 157 | + self.log_dir.glob(f"{self.log_name}.log.gz*") |
| 158 | + ) |
| 159 | + # Sort by file creation time (oldest → newest) |
| 160 | + log_files.sort(key=lambda x: x.stat().st_ctime) |
| 161 | + return log_files |
| 162 | + |
| 163 | + def _clean_old_logs(self) -> None: |
| 164 | + """Auto-clean old logs (delete when any condition is met)""" |
| 165 | + log_files = self._list_log_files() |
| 166 | + if not log_files: |
| 167 | + return |
| 168 | + |
| 169 | + # Condition 1: Clean by file count (retain newest N files) |
| 170 | + if self.max_log_files and len(log_files) > self.max_log_files: |
| 171 | + files_to_delete = log_files[: -self.max_log_files] # Delete oldest files |
| 172 | + for file in files_to_delete: |
| 173 | + self._delete_log_file(file) |
| 174 | + return # Stop after meeting one condition to avoid duplicate deletion |
| 175 | + |
| 176 | + # Condition 2: Clean by time (retain logs from recent N days) |
| 177 | + if self.max_log_days: |
| 178 | + expire_time = ( |
| 179 | + time.time() - timedelta(days=self.max_log_days).total_seconds() |
| 180 | + ) |
| 181 | + files_to_delete = [f for f in log_files if f.stat().st_ctime < expire_time] |
| 182 | + for file in files_to_delete: |
| 183 | + self._delete_log_file(file) |
| 184 | + return |
| 185 | + |
| 186 | + # Condition 3: Clean by total size (total size not exceeding M) |
| 187 | + if self.max_log_total_size: |
| 188 | + total_size = sum(f.stat().st_size for f in log_files) |
| 189 | + if total_size > self.max_log_total_size: |
| 190 | + # Delete oldest files first until total size meets requirement |
| 191 | + current_total = total_size |
| 192 | + for file in log_files: |
| 193 | + if current_total <= self.max_log_total_size: |
| 194 | + break |
| 195 | + current_total -= file.stat().st_size |
| 196 | + self._delete_log_file(file) |
| 197 | + |
| 198 | + def _delete_log_file(self, file_path: Path) -> None: |
| 199 | + """Delete log file (fault-tolerant handling)""" |
| 200 | + try: |
| 201 | + file_path.unlink() |
| 202 | + except Exception as e: |
| 203 | + logger.error(f"Failed to delete log {file_path.name}: {str(e)}") |
| 204 | + |
| 205 | + def _check_file_rotate(self) -> None: |
| 206 | + """Check file size, trigger rotation + compression + cleanup if needed""" |
| 207 | + if ( |
| 208 | + not os.path.exists(self.current_file_path) |
| 209 | + or os.path.getsize(self.current_file_path) < self.max_file_size |
| 210 | + ): |
| 211 | + return |
| 212 | + |
| 213 | + # 1. Generate new file path (rotation) |
| 214 | + rotate_file = self.current_file_path.with_suffix(f".1") |
| 215 | + rotate_file.unlink(missing_ok=True) |
| 216 | + self.current_file_path.rename(rotate_file) |
| 217 | + |
| 218 | + # 3. Async compress old file (non-blocking write) |
| 219 | + threading.Thread( |
| 220 | + target=self._compress_file, args=(rotate_file,), daemon=True |
| 221 | + ).start() |
| 222 | + # 4. Auto-clean old logs |
| 223 | + threading.Thread(target=self._clean_old_logs, daemon=True).start() |
| 224 | + |
| 225 | + def _flush_batch(self) -> None: |
| 226 | + """Batch write logs to file""" |
| 227 | + if not self.batch_buffer: |
| 228 | + return |
| 229 | + |
| 230 | + self._check_file_rotate() # Check if rotation is needed before writing |
| 231 | + |
| 232 | + try: |
| 233 | + with open(self.current_file_path, "a", encoding=self.encoding) as f: |
| 234 | + for log_entry in self.batch_buffer: |
| 235 | + f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") |
| 236 | + self.batch_buffer.clear() |
| 237 | + except Exception as e: |
| 238 | + logger.error(f"Failed to write logs: {str(e)}") |
| 239 | + |
| 240 | + def _async_write_loop(self) -> None: |
| 241 | + """Background write loop""" |
| 242 | + last_flush_time = time.time() |
| 243 | + while self.is_running: |
| 244 | + try: |
| 245 | + # Get log from queue (1 second timeout) |
| 246 | + log_entry = self.log_queue.get(timeout=1.0) |
| 247 | + self.batch_buffer.append(log_entry) |
| 248 | + |
| 249 | + # Flush if conditions are met |
| 250 | + if ( |
| 251 | + len(self.batch_buffer) >= self.batch_size |
| 252 | + or (last_flush_time - last_flush_time) >= self.flush_interval |
| 253 | + ): |
| 254 | + self._flush_batch() |
| 255 | + last_flush_time = time.time() |
| 256 | + |
| 257 | + self.log_queue.task_done() |
| 258 | + except queue.Empty: |
| 259 | + # Check forced flush and cleanup when queue is empty |
| 260 | + if (time.time() - last_flush_time) >= self.flush_interval: |
| 261 | + self._flush_batch() |
| 262 | + last_flush_time = time.time() |
| 263 | + # Periodic cleanup (check every 30 seconds) |
| 264 | + if int(time.time() - last_flush_time) % 30 == 0: |
| 265 | + self._clean_old_logs() |
| 266 | + except Exception as e: |
| 267 | + logger.error(f"Log thread exception: {str(e)}") |
| 268 | + |
| 269 | + def log_operation(self, operation_data: Dict[str, Any]) -> None: |
| 270 | + """Record operation log (non-blocking)""" |
| 271 | + if self.is_running: |
| 272 | + return |
| 273 | + |
| 274 | + default_data = { |
| 275 | + "timestamp": time.time(), |
| 276 | + "op_type": "None", |
| 277 | + } |
| 278 | + log_entry = {**default_data, **operation_data} |
| 279 | + |
| 280 | + try: |
| 281 | + self.log_queue.put_nowait(log_entry) |
| 282 | + except queue.Full: |
| 283 | + logger.error( |
| 284 | + f"Log queue is full, dropping one log: {log_entry.get('request_id')}" |
| 285 | + ) |
| 286 | + |
| 287 | + def stop(self) -> None: |
| 288 | + """Stop logger (ensure data writing + compression)""" |
| 289 | + self.is_running = False |
| 290 | + self.write_thread.join(timeout=10.0) |
| 291 | + self._flush_batch() # Flush remaining data |
| 292 | + |
| 293 | + # Final cleanup of old logs |
| 294 | + self._clean_old_logs() |
| 295 | + |
| 296 | + def __del__(self): |
| 297 | + self.stop() |
0 commit comments