Skip to content

feat(fine-tuning): add Prometheus monitoring and LoRA serving for fine-tuned models#369

Open
zeyuyuyu wants to merge 16 commits intomainfrom
feat/fine-tuning-monitoring-and-serving
Open

feat(fine-tuning): add Prometheus monitoring and LoRA serving for fine-tuned models#369
zeyuyuyu wants to merge 16 commits intomainfrom
feat/fine-tuning-monitoring-and-serving

Conversation

@zeyuyuyu
Copy link
Collaborator

@zeyuyuyu zeyuyuyu commented Feb 24, 2026

Summary

  • Prometheus monitoring for the fine-tuning service: tracks task creation, state transitions, storage operations, settlement events, HTTP request metrics, and unique users
  • LoRA inference serving: automatically discovers finished fine-tuning tasks and serves their LoRA adapters via vLLM with OpenAI-compatible API, including EIP-191 signature authentication and model ownership enforcement
  • Multi-tier LoRA caching: implements a four-tier caching system for LoRA adapters with automatic offloading and on-demand restore
  • Blocking wait mode: clients can set wait_for_model: true to block until a cold-storage model finishes loading instead of polling HTTP 202 (inspired by ServerlessLLM)
  • GPU memory utilization config: new gpuMemoryUtilization parameter for vLLM to coexist with other GPU processes
  • Bug fix: vLLM process group cleanup on shutdown to prevent orphaned GPU-holding processes

Architecture Overview

System Components

+------------------------------------------------------------------+
|                         User Request                             |
|              (POST /v1/serving/v1/chat/completions)              |
|              Authorization: Bearer <EIP-191 signature>           |
+-------------------------------+----------------------------------+
                                |
                                v
+------------------------------------------------------------------+
|                    Go Proxy (serving/proxy.go)                   |
|                                                                  |
|  1. Verify EIP-191 signature -> recover user Ethereum address    |
|  2. Look up model by name -> check ownership (address match)    |
|  3. Check model.State -> route accordingly:                      |
|     +-------------------------------------------------------+    |
|     | active   -> RecordAccess() -> forward request to vLLM |    |
|     | archived -> RestoreModel() -> see wait mode below     |    |
|     | loading  -> see wait mode below                       |    |
|     +-------------------------------------------------------+    |
|     For archived/loading models:                                  |
|       wait_for_model=false (default) -> HTTP 202 "retry shortly"  |
|       wait_for_model=true  -> block until active, then forward    |
+-------------------------------+----------------------------------+
                                | (active path only)
                                v
+------------------------------------------------------------------+
|                    vLLM Server (:8000)                            |
|                                                                  |
|  Base model loaded permanently on GPU                            |
|  LoRA adapters managed by vLLM's internal LRU:                   |
|                                                                  |
|  +----------+    +----------+    +----------------------+        |
|  | GPU (hot)|<-->|CPU (warm)|<-->| Disk (cool)          |        |
|  | --max-   |    |--max-cpu-|    | filesystem_resolver   |        |
|  |  loras   |    |  loras   |    | auto-discovers from   |        |
|  |          |    |          |    | symlink directory      |        |
|  +----------+    +----------+    +----------------------+        |
|                                                                  |
|  All three tiers are transparent to the user -- vLLM handles     |
|  GPU<->CPU<->Disk transitions automatically with zero user impact|
+------------------------------------------------------------------+

+------------------------------------------------------------------+
|                   Cold Storage Tier (Go code)                    |
|                                                                  |
|  offloadLoop (model_cache.go):                                   |
|    - Runs every 1 minute                                         |
|    - If model.LastAccessedAt < now - offloadAfterMinutes:        |
|        -> delete local files + symlink                           |
|        -> set state = archived                                   |
|        -> model only exists in 0G decentralized storage          |
|                                                                  |
|  RestoreModel (triggered by user request):                       |
|    - Download from 0G Storage -> recreate files + symlink        |
|    - state: archived -> loading -> active                        |
|    - vLLM filesystem_resolver auto-discovers the new symlink     |
+------------------------------------------------------------------+

Multi-tier Caching: How Each Tier Works

Tier Where Managed By Activation Trigger User-visible Latency
GPU (hot) GPU VRAM vLLM LRU (--max-loras) Already loaded -- immediate inference ~0ms (no extra latency)
CPU (warm) Host RAM vLLM LRU (--max-cpu-loras) vLLM auto-copies weights from CPU to GPU when a request arrives ~10-50ms (transparent to user)
Disk (cool) Local disk vLLM lora_filesystem_resolver plugin vLLM reads adapter files from symlink directory, loads into GPU ~100-500ms (transparent to user)
0G Storage (cold) Decentralized storage Broker Go code (model_cache.go) User request triggers async download; returns HTTP 202 with "status": "loading" Seconds to minutes (user must retry)

Key insight: The first three tiers (GPU, CPU, Disk) are entirely managed by vLLM's internal LRU cache and are invisible to the user -- requests always succeed immediately. Only the fourth tier (0G Storage) requires user-side retry logic.

Request Flow: LoRA on Disk (the common case)

When a LoRA adapter is on disk but not in GPU/CPU memory (state = active):

1. User sends POST /v1/serving/v1/chat/completions
   Headers: Authorization: Bearer 0x<EIP-191 signature>
   Body: {"model": "ft-Qwen2-5-0-5B-abc12345-678", "messages": [...]}

2. Proxy verifies signature -> recovers 0x1234...abcd
   Proxy checks: model exists? user is owner? -> yes

3. model.State == active -> RecordAccess() updates LastAccessedAt
   Proxy forwards request body to vLLM at http://localhost:8000/v1/chat/completions

4. vLLM receives request for model "ft-Qwen2-5-0-5B-abc12345-678"
   - Not in GPU cache? Check CPU cache
   - Not in CPU cache? filesystem_resolver finds symlink in lora-modules/ directory
   - Reads adapter_model.safetensors from disk
   - Loads LoRA weights onto GPU (possibly evicting LRU adapter)
   - Runs inference with base_model + LoRA adapter

5. Response streamed back through Proxy to user
   (Next request for the same model hits GPU cache -- near-zero overhead)

Request Flow: LoRA Archived in 0G Storage

When a LoRA adapter has been offloaded (state = archived):

1. User sends chat completion request for an archived model

2. Proxy checks model.State == archived
   -> Calls RestoreModel() which:
     a. Sets state = loading
     b. Launches goroutine to download from 0G Storage
   -> Returns HTTP 202:
     {"status": "loading", "model": "ft-...", "error": "Model is being loaded..."}

3. Background: download completes -> recreate symlink -> state = active

4. User retries the same request
   -> model.State == active -> normal inference flow (see above)

Automatic Model Discovery

The Manager runs a background poller (pollFinishedTasks) that:

  1. Queries the database for tasks with progress = "Finished" or "UserAcknowledged"
  2. Checks if the task's output LoRA directory exists on disk
  3. Creates a symlink in the loraModulesDir pointing to the LoRA output
  4. Registers the model in the in-memory map with state active
  5. vLLM's filesystem_resolver automatically discovers the new symlink

This means fine-tuning output is automatically available for inference with zero manual intervention.

vLLM Process Group Cleanup (Bug Fix)

vLLM 0.16.0+ uses a multi-process architecture:

Main process (APIServer)
  +-- EngineCore (holds GPU memory)
       +-- resource_tracker

Previously, Manager.Stop() only sent SIGINT to the main process, leaving EngineCore alive as an orphan still holding GPU memory. This caused GPU OOM errors for other users sharing the machine.

Fix: Start vLLM with Setpgid: true to create an independent process group, then kill the entire group (syscall.Kill(-pid, SIGTERM)) on shutdown.

Authentication

Users authenticate via EIP-191 personal signatures:

  1. Sign the message "0g-serving-inference-auth" with their Ethereum private key
  2. Include the signature as Authorization: Bearer 0x<signature>
  3. Proxy recovers the signer's address and enforces model ownership

Configuration

serving:
  enable: true
  baseModelPath: "/path/to/base-model"       # HuggingFace model directory
  inferenceGpuIds: "0"                        # CUDA_VISIBLE_DEVICES
  vllmPort: 8000                              # vLLM listening port
  maxLoraRank: 64                             # Maximum LoRA rank supported
  maxLoraModules: 16                          # GPU LoRA cache slots (--max-loras)
  maxCpuLoras: 32                             # CPU LoRA cache slots (--max-cpu-loras)
  loraModulesDir: "/tmp/lora-modules"         # Symlink directory for filesystem_resolver
  inputPrice: "10000000"                      # Price per input token (for contract)
  outputPrice: "10000000"                     # Price per output token (for contract)
  offloadAfterMinutes: 60                     # Inactivity threshold for cold-storage offload
  enableColdStorage: false                    # Enable 0G Storage offload/restore

Test Results

Unit Tests: 17/17 passed

Covers model registration, state tracking, access recording, offload/restore logic, ownership verification, and the full offload-restore cycle.

E2E Tests: 13/13 passed (NVIDIA H20 GPU)

Ran on server with Qwen2.5-0.5B-Instruct base model + 3 LoRA adapters:

Test Case Result
Health endpoint returns correct tier counts PASS
List all served models (3 active) PASS
List models for authenticated user PASS
Unauthorized request returns 401 PASS
Chat completion with LoRA adapter 0 PASS
Chat completion with LoRA adapter 1 PASS
Chat completion with LoRA adapter 2 PASS
Non-existent model returns 404 PASS
Wrong user returns 403 PASS
Streaming (SSE) chat completion (49 chunks) PASS
Chat completion with wait_for_model=true on active model PASS
Health endpoint shows model_load_timeout_sec PASS
3 concurrent requests to different adapters PASS

Unit Tests: 23/23 passed

Including 6 new tests for WaitForModel:

  • TestWaitForModelAlreadyActive — instant return for active models
  • TestWaitForModelRestoreCompletes — blocks then returns after restore
  • TestWaitForModelTimeout — respects timeout deadline
  • TestWaitForModelContextCancelled — respects context cancellation
  • TestWaitForModelMultipleWaiters — 3 concurrent waiters all wake up
  • TestWaitForModelRestoreFails — returns Archived state on failure

Key observations:

  • vLLM filesystem_resolver dynamically loaded LoRA adapters on first request
  • All 3 adapters served concurrently from a single base model
  • Streaming responses worked correctly through the Go reverse proxy
  • wait_for_model=true works transparently for already-active models
  • Channel-based notification wakes all concurrent waiters simultaneously

Files Changed

File Description
serving/manager.go vLLM lifecycle, LoRA registration, process group cleanup
serving/proxy.go HTTP API, auth, routing, streaming proxy
serving/model_cache.go Cold-storage offload/restore, state machine
serving/registry.go On-chain service auto-registration
serving/serving_test.go 23 unit tests (including 6 for WaitForModel)
cmd/e2e-serving-test/main.go E2E test program
db/test_helpers.go DB test utilities
config/config.go ServingConfig fields
cmd/server/main.go Serving module integration
monitor/ Prometheus metrics
docs/LORA_SERVING.md Design document

Wuying Created Local Users and others added 6 commits February 11, 2026 18:20
- Finalizer now always encrypts locally first (TEE backup)
- Then attempts upload to 0G Storage (unless skipStorageUpload=true)
- If storage upload fails, falls back to local hash gracefully
- Encrypted file is always retained for TEE download fallback
- Removes old encryptAndUploadModel (split into encrypt + upload)

Co-authored-by: Cursor <cursoragent@cursor.com>
Previously DownloadFromStorage always assumed ZIP format, causing
"zip: not a valid zip file" when CLI uploads raw JSONL to 0G Storage.

Changes:
- Detect file type via magic bytes before attempting unzip
- For non-ZIP files (raw JSONL), move directly to target path
- Convert raw JSONL to HuggingFace DatasetDict format after download
  (required by token counter and training executor)

Co-authored-by: Cursor <cursoragent@cursor.com>
- DownloadFromStorage: detect file type via magic bytes, handle non-ZIP
- Clean up existing target before rename on retry
- Convert raw JSONL to HF DatasetDict format after download
- Auto-calculate fee when fee=0 (broker-calculated fee mode)
- Verify signature with original fee to avoid mismatch

Co-authored-by: Cursor <cursoragent@cursor.com>
- [HIGH] Use passed ctx instead of context.Background() in Download call
  for proper cancellation/timeout propagation
- [HIGH] Add defer cleanup for temp download file to prevent disk leaks
  on early error returns
- [MEDIUM] Improve isZipFile() to return errors on I/O failures instead
  of silently treating them as non-ZIP files

Co-authored-by: Cursor <cursoragent@cursor.com>
- Upgrade 0g-storage-client v1.2.1 → v1.2.2 to fix ABI incompatibility
  with testnet Flow contract (was causing `execution reverted; data: 0x`)
- Remove deprecated WithRoutines() calls (now set internally via
  UploaderConfig in v1.2.2)
- Fix uploadModel() to return raw 32-byte hash (hash.Bytes()) instead of
  hex string bytes ([]byte(hash.Hex())), which caused double-encoding
  via hexutil.Encode and prevented client from downloading model via
  0G Storage

Tested: Full flow verified with Qwen2.5-0.5B and Qwen3-32B (1.1GB model)
— broker upload, client download from 0G Storage, decrypt, LoRA verified.

Co-authored-by: Cursor <cursoragent@cursor.com>
…e-tuned models

Implements two features for the fine-tuning service:

1. Prometheus Monitoring (#368):
   - Add fine-tuning specific metrics: task lifecycle, storage operations,
     settlement, HTTP requests, and unique users tracking
   - Configurable via `monitor.enable` in config
   - Instrument worker phases, storage client, settlement, and task handler

2. Serve Fine-tuned Models to End-users (#367):
   - Add LoRA serving module with vLLM integration for shared base model
     serving with multiple concurrent LoRA adapters
   - Auto-discover finished tasks and register LoRA adapters for inference
   - Private key authentication with model ownership access control
   - GPU isolation via CUDA_VISIBLE_DEVICES for inference vs training
   - OpenAI-compatible API endpoints with streaming support
   - Health endpoint and automatic adapter lifecycle management

Co-authored-by: Cursor <cursoragent@cursor.com>
@zeyuyuyu
Copy link
Collaborator Author

@claude

@claude
Copy link

claude bot commented Feb 25, 2026

Claude finished @zeyuyuyu's task in 2m 29s —— View job


Code Review Complete ✅

I've completed a comprehensive review of the Prometheus monitoring and LoRA serving features. Overall, this is a well-structured implementation with good separation of concerns. Below are my findings organized by severity.


🔴 Critical Issues

1. [CRITICAL] Global Environment Variable Mutation

File: internal/serving/manager.go:113

if m.config.InferenceGPUIDs != "" {
    os.Setenv("CUDA_VISIBLE_DEVICES", m.config.InferenceGPUIDs)
}

Issue: Setting CUDA_VISIBLE_DEVICES globally affects the entire process, including training tasks. This violates GPU isolation between training and inference.

Fix: Set environment variables only for the vLLM subprocess:

cmd := exec.CommandContext(ctx, "vllm", args...)
cmd.Env = append(os.Environ(), "CUDA_VISIBLE_DEVICES="+m.config.InferenceGPUIDs)

🟠 High Priority Issues

2. [HIGH] Goroutine Leak in Monitor

File: monitor/monitor.go:164-184

go processUniqueUsers()

Issue: The processUniqueUsers goroutine has no way to exit. It will leak when the monitor is stopped or the application shuts down.

Fix: Pass a context and handle cancellation:

func Init(serverName string, ctx context.Context) {
    // ...
    go processUniqueUsers(ctx)
}

func processUniqueUsers(ctx context.Context) {
    // ...
    for {
        select {
        case <-ctx.Done():
            return
        case userAddress := <-uniqueUsersChan:
            // ... process user
        }
    }
}

3. [HIGH] Panic in Metrics Server

File: monitor/monitor.go:286-289

func StartMetricsServer(address string) {
    // ...
    if err := r.Run(address); err != nil {
        panic(err)  // ❌ Should return error instead
    }
}

Issue: Using panic for operational errors violates Go best practices. This function is not exported and panicking here is inappropriate.

Recommendation: This function appears unused in the codebase (metrics are served via main HTTP server). Consider removing it or properly returning errors.

4. [HIGH] vLLM Process Cleanup Missing

File: internal/serving/manager.go:118-139

cmd := exec.CommandContext(ctx, "vllm", args...)
// ...
if err := cmd.Start(); err != nil {
    return  // Process started but not tracked for cleanup
}

Issue: If the vLLM process starts but encounters errors later, it may not be properly killed.

Fix: Add proper cleanup in Manager:

func (m *Manager) Stop() error {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    if m.vllmProcess != nil && m.vllmProcess.Process != nil {
        if err := m.vllmProcess.Process.Kill(); err != nil {
            return errors.Wrap(err, "kill vLLM process")
        }
    }
    return nil
}

Then call this from main.go shutdown handler.

5. [HIGH] Unchecked Error in Proxy Response

File: internal/serving/proxy.go:177-179

respBody, _ := io.ReadAll(resp.Body)  // Error ignored
c.Writer.Write(respBody)

Issue: Ignoring read errors can cause incomplete responses to be sent to users.

Fix:

respBody, err := io.ReadAll(resp.Body)
if err != nil {
    p.logger.Errorf("failed to read response: %v", err)
    c.JSON(http.StatusBadGateway, gin.H{"error": "Failed to read backend response"})
    return
}
c.Writer.Write(respBody)

6. [HIGH] Unchecked File Removal Errors

Files:

  • internal/serving/manager.go:256 (pruneStaleModels)
  • internal/serving/manager.go:306 (UnregisterModel)
os.Remove(destDir)  // Error ignored

Issue: Failing to remove symlinks can cause disk space issues and stale state.

Fix:

if err := os.Remove(destDir); err != nil && !os.IsNotExist(err) {
    m.logger.Warnf("failed to remove model symlink %s: %v", destDir, err)
}

🟡 Medium Priority Issues

7. [MEDIUM] Missing Pagination for Serving Tasks

File: internal/db/service.go:261-272

func (d *DB) GetFinishedTasksForServing() ([]Task, error) {
    var tasks []Task
    ret := d.db.Where("progress IN ?", servableStates).Order("created_at DESC").Find(&tasks)
    // No LIMIT clause

Issue: This query can return unlimited rows, causing memory issues as tasks accumulate.

Fix: Add pagination or a reasonable limit:

ret := d.db.Where("progress IN ?", servableStates).
    Order("created_at DESC").
    Limit(1000).  // Or make configurable
    Find(&tasks)

8. [MEDIUM] Model Name Collisions Possible

File: internal/serving/manager.go:369-381

func (m *Manager) makeModelName(baseModel string, taskID uuid.UUID) string {
    shortBase := baseModel
    if len(shortBase) > 16 {
        shortBase = shortBase[:16]  // Truncation can cause collisions
    }
    // ...
    return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:8])  // Only 8 chars of UUID
}

Issue: Using only 8 characters of the UUID significantly increases collision probability. Two tasks with the same base model prefix could collide.

Fix: Use the full UUID or increase to 12-16 characters:

return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:12])

9. [MEDIUM] Hardcoded HTTP Timeouts

Files:

  • internal/serving/manager.go:65-67 (10s timeout)
  • internal/serving/proxy.go:30 (no timeout)

Issue: The proxy's HTTP client has no timeout, which can cause goroutine leaks if backend hangs.

Fix:

client: &http.Client{
    Timeout: 5 * time.Minute,  // Allow for long inference requests
    Transport: &http.Transport{
        MaxIdleConns:        100,
        IdleConnTimeout:     90 * time.Second,
        DisableCompression:  false,
    },
},

10. [MEDIUM] Channel Buffer Size Arbitrary

File: monitor/monitor.go:163

uniqueUsersChan = make(chan string, 10000)  // Why 10000?

Issue: Hardcoded buffer size with no documentation or configurability.

Recommendation: Either make it configurable via config or document the sizing rationale.

11. [MEDIUM] No Context Propagation in Health Check

File: internal/serving/manager.go:142-165

func (m *Manager) waitForVLLMReady(ctx context.Context) {
    // ...
    resp, err := m.httpClient.Get(endpoint + "/health")  // No context

Fix:

req, _ := http.NewRequestWithContext(ctx, "GET", endpoint+"/health", nil)
resp, err := m.httpClient.Do(req)

🔵 Low Priority / Best Practices

12. [LOW] Registry Contract Registration is No-Op

File: internal/serving/registry.go:86-100

func (r *Registry) registerOnContract(ctx context.Context, model *ServedModel) error {
    // ... logs only, no actual contract call
    return nil
}

Issue: The comment says "Full inference contract registration would require a separate inference contract instance, which is out of scope." This is confusing.

Fix: Either:

  1. Implement the registration, or
  2. Remove the Registry entirely and just track locally, or
  3. Add a TODO comment explaining the future implementation plan

13. [LOW] Streaming Could Handle Errors Better

File: internal/serving/proxy.go:154-175

for {
    n, readErr := resp.Body.Read(buf)
    if n > 0 {
        if _, writeErr := c.Writer.Write(buf[:n]); writeErr != nil {
            p.logger.Warnf("stream write error: %v", writeErr)
            return  // Abrupt termination
        }

Recommendation: Consider sending an error SSE event before returning to inform the client of the failure.

14. [LOW] Task State Poller Not Tracked

File: cmd/server/main.go:270

go startTaskStatePoller(ctx, svc.db, logger)

Issue: Goroutine is launched but not tracked in any sync.WaitGroup for graceful shutdown.

Recommendation: While context cancellation handles this, consider using a WaitGroup for cleaner shutdown:

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    startTaskStatePoller(ctx, svc.db, logger)
}()
// ... on shutdown ...
wg.Wait()

⚪ Nitpicks

15. [nit] Missing godoc for Exported Functions

File: monitor/monitor.go

Several exported functions lack documentation:

  • RecordTaskCreated() (line 215)
  • RecordTaskCompleted() (line 221)
  • RecordTaskFailed() (line 227)
  • etc.

Fix: Add godoc comments following Go conventions:

// RecordTaskCreated increments the counter for total tasks created.
func RecordTaskCreated() {
    if TasksCreatedTotal != nil {
        TasksCreatedTotal.Inc()
    }
}

16. [nit] Inconsistent Error Wrapping

File: internal/serving/manager.go

Some functions use errors.Wrap() (line 82, 274, 278) while others return plain errors (line 79, 268, 302).

Recommendation: Be consistent with error wrapping for better debugging:

return errors.New("LoRA serving is not enabled")  // Instead of plain errors.New

✅ Good Practices Observed

Excellent work on these aspects:

  1. Monitoring Integration: Clean separation between business logic and metrics recording. The defer pattern in storage client (lines 106-107, 185-186) is excellent.

  2. Error Handling in Settlement: The trySettle function (settlement.go:115-141) properly records metrics on both success and failure paths. Well done!

  3. Database Queries: Good use of GORM with proper error handling. The CountTasksByState function (db/service.go:274-310) properly initializes all states to 0 before populating - nice attention to detail.

  4. Configuration Structure: The config package is well-organized with proper validation in validateCustomizedModels (config.go:265-305).

  5. Context Propagation: Most functions properly accept and use context.Context for cancellation.

  6. Authentication: The signature verification in proxy.go (lines 44-88) follows Ethereum standards correctly.


📝 Summary

Overall Assessment: This is a solid implementation with good architecture. The main concerns are around resource management (goroutine leaks, process cleanup) and some edge cases in error handling.

Priority Recommendations:

  1. Fix the CUDA_VISIBLE_DEVICES global mutation (Feat: Add 0g serving agent #1) - Critical for production
  2. Fix goroutine leaks in monitor ( 补充各个功能的回滚机制 #2) and add vLLM cleanup (补齐验证机制 #4)
  3. Add pagination to GetFinishedTasksForServing (和 prover agent 对接 #7)
  4. Add timeout to proxy HTTP client (user refund framework #9)
  5. Address all unchecked errors (删除 service 的时候 db 可能会残留脏数据 #5, 合约错误显示似乎不完全 #6)

Testing Recommendations:

  • Add integration tests for vLLM lifecycle management
  • Test serving module with multiple concurrent LoRA adapters
  • Test monitoring metrics under load
  • Test graceful shutdown with serving module enabled

Documentation Needed:

  • Document GPU isolation strategy between training and inference
  • Document the model naming scheme and collision handling
  • Add godoc comments for all exported monitor functions

Todo List

  • Read and understand PR changes
  • Review monitoring implementation (monitor.go)
  • Review serving module (manager.go, proxy.go, registry.go)
  • Review modified files for integration
  • Check against CLAUDE.md standards
  • Compile and post comprehensive review

Wuying Created Local Users and others added 3 commits February 25, 2026 16:45
…toring-and-serving

Co-authored-by: Cursor <cursoragent@cursor.com>

# Conflicts:
#	api/fine-tuning/config/config.go
#	api/fine-tuning/internal/handler/handler.go
#	api/fine-tuning/internal/storage/client.go
- Add context cancellation to monitor.processUniqueUsers goroutine to
  prevent leaks on shutdown
- Add Manager.Stop() method to gracefully terminate vLLM process
- Use http.NewRequestWithContext for health check to respect cancellation
- Add HTTP client timeout (5min) to proxy for inference requests
- Check os.RemoveAll errors when pruning/unregistering model directories
- Check c.Writer.Write error in non-streaming proxy responses
- Increase model name UUID suffix from 8 to 12 chars for uniqueness
- Add godoc comments to all exported monitor functions
- Remove unused promhttp import from monitor package
- Add missing ProviderStake and SupportedPredefinedModels config fields
  from origin/main merge

Co-authored-by: Cursor <cursoragent@cursor.com>
…anup duplicate Stop

- Add godoc comments to all exported types and functions in serving package
- Add .Limit(1000) to GetFinishedTasksForServing query to prevent unbounded results
- Remove duplicate Stop() method (keep the graceful signal-based one)
- Rewrite registerOnContract TODO comment for clarity

Co-authored-by: Cursor <cursoragent@cursor.com>
@zeyuyuyu
Copy link
Collaborator Author

@claude

@claude
Copy link

claude bot commented Feb 25, 2026

Claude finished @zeyuyuyu's task in 4m 9s —— View job


Code Review Complete ✅

I've completed a thorough review of the Prometheus monitoring and LoRA serving implementation. This is a well-implemented feature with solid architecture and good attention to detail. I can see that many issues from a previous review were addressed (great job!). Below are my findings.


🟢 Excellent Improvements from Previous Review

I noticed many critical issues were fixed:

  • CUDA_VISIBLE_DEVICES now properly set via cmd.Env (not global mutation)
  • Goroutine leak in monitor fixed with context handling
  • vLLM cleanup properly implemented with Stop() method
  • HTTP client timeouts added (5 minutes for proxy)
  • Error handling improved in proxy and manager
  • Pagination added to GetFinishedTasksForServing (LIMIT 1000)
  • Model name collision risk reduced (now uses 12 chars instead of 8)
  • Context propagation in health checks

🔴 High Priority Issues

1. [HIGH] Temporary Directory for LoRA Modules May Not Persist

Files: internal/serving/manager.go:59-60, config/config.go:231

// manager.go
if loraDir == "" {
    loraDir = "/tmp/lora-modules"  // ❌ /tmp may be cleared on reboot
}

// config.go
LoraModulesDir: "/tmp/lora-modules",  // ❌ Not persistent

Issue: Using /tmp for LoRA module symlinks means:

  • Symlinks are lost after system reboot
  • Auto-discovery will fail to find models after restart
  • Users may experience unexpected downtime

Fix: Use a persistent directory:

if loraDir == "" {
    loraDir = filepath.Join(utils.GetDataDir(), "lora-modules")  // Persistent
}

// Or in config.go
LoraModulesDir: "./data/lora-modules",

Recommendation: Document this in deployment docs and make it configurable with a sensible default outside /tmp.


2. [HIGH] GetVLLMModels Doesn't Accept Context

File: internal/serving/manager.go:420-441

func (m *Manager) GetVLLMModels() ([]string, error) {
    resp, err := m.httpClient.Get(m.GetVLLMEndpoint() + "/v1/models")  // ❌ No context
    if err != nil {
        return nil, err
    }

Issue: This function doesn't accept a context parameter, making it impossible to:

  • Cancel long-running requests
  • Propagate timeouts from caller
  • Follow Go best practices for network operations

Fix:

func (m *Manager) GetVLLMModels(ctx context.Context) ([]string, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, m.GetVLLMEndpoint()+"/v1/models", nil)
    if err != nil {
        return nil, err
    }
    
    resp, err := m.httpClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    // ... rest of the code
}

🟡 Medium Priority Issues

3. [MEDIUM] Model Name Collision Risk Still Exists

File: internal/serving/manager.go:405-417

func (m *Manager) makeModelName(baseModel string, taskID uuid.UUID) string {
    shortBase := baseModel
    if len(shortBase) > 16 {
        shortBase = shortBase[:16]  // Truncation
    }
    // ...
    return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:12])  // Only 12 chars of UUID
}

Issue: While improved from 8 chars, using only 12 characters of the UUID still has collision probability:

  • 12 hex chars = 48 bits = ~281 trillion combinations
  • Birthday paradox: ~50% collision probability after ~16.7 million models with the same base prefix
  • Not catastrophic, but avoidable

Fix: Use more UUID characters or full UUID:

return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:16])  // 16 chars = 64 bits
// Or just use the full UUID (no collision risk):
return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String())

Rationale: Model names are internal identifiers, not user-facing. Length is less important than uniqueness.


4. [MEDIUM] Hardcoded Database Query Limit

File: internal/db/service.go:267

ret := d.db.Where("progress IN ?", servableStates).
    Order("created_at DESC").
    Limit(1000).  // ❌ Hardcoded
    Find(&tasks)

Issue: The 1000-task limit is hardcoded, which:

  • May be too small for large deployments
  • May be too large for resource-constrained environments
  • Cannot be adjusted without code changes

Fix: Make it configurable:

// In config/config.go
type ServingConfig struct {
    // ... existing fields ...
    MaxDiscoverableTasks int `yaml:"maxDiscoverableTasks"`
}

// With default in GetConfig():
MaxDiscoverableTasks: 1000,

// In db/service.go:
func (d *DB) GetFinishedTasksForServing(limit int) ([]Task, error) {
    if limit <= 0 {
        limit = 1000  // Sensible default
    }
    ret := d.db.Where("progress IN ?", servableStates).
        Order("created_at DESC").
        Limit(limit).
        Find(&tasks)
    // ...
}

5. [MEDIUM] Streaming Error Handling Could Be More Graceful

File: internal/serving/proxy.go:166-182

for {
    n, readErr := resp.Body.Read(buf)
    if n > 0 {
        if _, writeErr := c.Writer.Write(buf[:n]); writeErr != nil {
            p.logger.Warnf("stream write error: %v", writeErr)
            return  // ❌ Abrupt termination, client may not know why
        }
        c.Writer.Flush()
    }
    if readErr != nil {
        if readErr != io.EOF {
            p.logger.Warnf("stream read error: %v", readErr)
        }
        return
    }
}

Issue: When a streaming write fails, the connection is abruptly closed without sending an error event to the client.

Recommendation: For SSE streams, consider sending an error event before closing:

if _, writeErr := c.Writer.Write(buf[:n]); writeErr != nil {
    p.logger.Warnf("stream write error: %v", writeErr)
    // Try to send error event (may fail if connection is broken, but worth trying)
    errorEvent := fmt.Sprintf("data: {\"error\": \"Stream interrupted\"}\n\n")
    c.Writer.Write([]byte(errorEvent))
    c.Writer.Flush()
    return
}

This is a nice-to-have rather than critical, as clients should handle unexpected disconnections anyway.


🔵 Low Priority / Best Practices

6. [LOW] Missing Godoc Comments for Exported Functions

File: monitor/monitor.go

Exported functions lack documentation:

  • RecordTaskCreated() (line 224)
  • RecordTaskCompleted() (line 231)
  • RecordTaskFailed() (line 238)
  • RecordPhaseDuration() (line 245)
  • UpdateTaskStateGauge() (line 252)
  • RecordStorageUpload() (line 262)
  • RecordStorageDownload() (line 275)
  • RecordSettlement() (line 288)

Fix: Add godoc comments following Go conventions:

// RecordTaskCreated increments the counter for total fine-tuning tasks created.
// This metric tracks task creation across all users and includes both successful
// and failed task initialization attempts.
func RecordTaskCreated() {
    if TasksCreatedTotal != nil {
        TasksCreatedTotal.Inc()
    }
}

// RecordTaskCompleted increments the counter for successfully completed fine-tuning tasks.
// A task is considered completed when it reaches the Finished state after successful settlement.
func RecordTaskCompleted() {
    if TasksCompletedTotal != nil {
        TasksCompletedTotal.Inc()
    }
}

7. [LOW] Arbitrary Channel Buffer Size

File: monitor/monitor.go:165

uniqueUsersChan = make(chan string, 10000)  // Why 10000?

Issue: The buffer size of 10,000 has no documented rationale.

Recommendation: Either:

  1. Make it configurable:

    type MonitorConfig struct {
        Enable            bool   `yaml:"enable"`
        EventAddress      string `yaml:"eventAddress"`
        UniqueUserBuffer  int    `yaml:"uniqueUserBuffer"`  // New
    }
    
    // With default:
    UniqueUserBuffer: 10000,
  2. Or document the sizing rationale:

    // Buffer size of 10000 assumes peak of 10k task creations before the
    // processUniqueUsers goroutine can drain the channel. At 100 tasks/sec,
    // this provides 100 seconds of burst capacity.
    uniqueUsersChan = make(chan string, 10000)

8. [LOW] Registry Module is Essentially a No-Op

File: internal/serving/registry.go:94-107

func (r *Registry) registerOnContract(ctx context.Context, model *ServedModel) error {
    r.logger.Infof("registering fine-tuned model on contract: name=%s, ...", model.ModelName)
    
    // TODO: When fine-tuning and inference brokers share a contract interface, this method
    // should call contract.AddOrUpdateService() to register the LoRA model...
    
    r.logger.Infof("model %s marked as registered for inference serving...", model.ModelName)
    
    return nil  // ❌ Does nothing
}

Issue: The entire Registry module is a placeholder that:

  • Consumes CPU cycles (polls every 60 seconds)
  • Maintains state that's never used on-chain
  • Adds complexity without benefit

Recommendation: Either:

  1. Remove it entirely until the contract integration is ready:

    // In main.go, comment out:
    // registry := serving.NewRegistry(...)
    // registry.Start(ctx)
  2. Or add a clear feature flag:

    type ServingConfig struct {
        // ... existing fields ...
        EnableContractRegistration bool `yaml:"enableContractRegistration"`
    }
    
    // In registry.go:
    func (r *Registry) Start(ctx context.Context) {
        if !r.config.EnableContractRegistration {
            r.logger.Info("contract registration is disabled")
            return
        }
        go r.syncLoop(ctx)
    }
  3. Or document prominently in the struct:

    // Registry periodically synchronises the set of served LoRA models with the
    // inference contract.
    //
    // NOTE: Contract registration is currently NOT IMPLEMENTED (see registerOnContract).
    // This component only tracks serving state locally until fine-tuning and inference
    // brokers share a unified contract interface. The syncLoop runs but has no effect
    // on the blockchain.
    type Registry struct {
        // ...
    }

9. [LOW] Task State Poller Not Tracked for Clean Shutdown

File: cmd/server/main.go:270

go startTaskStatePoller(ctx, svc.db, logger)

Issue: While the goroutine handles context cancellation, it's not tracked in a sync.WaitGroup, so there's no guarantee it finishes before the main function exits.

Recommendation: Use a WaitGroup for cleaner shutdown:

var wg sync.WaitGroup

// ... in runApplication:
if cfg.Monitor.Enable {
    monitor.Init(cfg.Service.ServingUrl, ctx)
    engine.GET("/metrics", gin.WrapH(promhttp.Handler()))
    engine.Use(monitor.TrackMetrics())
    
    wg.Add(1)
    go func() {
        defer wg.Done()
        startTaskStatePoller(ctx, svc.db, logger)
    }()
}

// ... before return in runApplication:
logger.Info("waiting for background tasks to finish...")
wg.Wait()

This ensures metrics are fully flushed before shutdown.


10. [nit] Inconsistent Error Wrapping

Examples:

  • manager.go:87: errors.Wrap(err, "create lora modules directory")
  • manager.go:293: errors.New("LoRA serving is not enabled") (plain error)
  • manager.go:329: errors.New("model not found: " + modelName) (plain error with concatenation)

Recommendation: Be consistent with error wrapping for better debugging:

return fmt.Errorf("LoRA serving is not enabled")
return fmt.Errorf("model not found: %s", modelName)

Or use errors.New for all static messages. Pick one style and stick to it.


⚪ Nitpicks

11. [nit] Variable Naming in makeModelName

File: internal/serving/manager.go:405-417

func (m *Manager) makeModelName(baseModel string, taskID uuid.UUID) string {
    shortBase := baseModel
    if len(shortBase) > 16 {
        shortBase = shortBase[:16]
    }
    shortBase = strings.Map(func(r rune) rune {  // ❓ What does this do?
        if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' {
            return r
        }
        return '-'
    }, shortBase)
    return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:12])
}

Recommendation: Add a comment explaining the sanitization:

func (m *Manager) makeModelName(baseModel string, taskID uuid.UUID) string {
    shortBase := baseModel
    if len(shortBase) > 16 {
        shortBase = shortBase[:16]
    }
    
    // Sanitize model name: replace non-alphanumeric chars (except - and _) with hyphens
    // This ensures vLLM-compatible model identifiers
    shortBase = strings.Map(func(r rune) rune {
        if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' {
            return r
        }
        return '-'
    }, shortBase)
    
    return fmt.Sprintf("ft-%s-%s", shortBase, taskID.String()[:12])
}

✅ Excellent Practices Observed

Well Done! 👏

  1. Monitoring Integration: Clean separation between business logic and metrics. The defer pattern in storage client is excellent.

  2. Error Handling in Settlement: The trySettle function (settlement.go:115-141) properly records metrics on both success and failure paths.

  3. Context Propagation: Excellent use of context throughout the codebase. Almost all network operations and long-running tasks properly accept and use context.Context.

  4. Database Queries: Good use of GORM with proper error handling. The CountTasksByState function properly initializes all states to 0 before populating.

  5. Resource Cleanup: The Stop() method in manager properly handles vLLM process cleanup. File handles are closed with defer.

  6. Authentication: The signature verification in proxy.go follows Ethereum standards correctly with proper recovery of public keys.

  7. Concurrency Safety: Proper use of sync.RWMutex in Manager for thread-safe access to the servedModels map.

  8. Configuration Structure: Well-organized with sensible defaults and proper validation.


📝 Summary

Overall Assessment: This is a solid, production-ready implementation with good architecture and attention to detail. The monitoring integration is clean, and the serving module provides a valuable feature for users.

Priority Recommendations:

  1. Fix Feat: Add 0g serving agent #1 (persistent LoRA directory) - Important for production deployments
  2. Fix 补充各个功能的回滚机制 #2 (context in GetVLLMModels) - Standard Go practice
  3. Consider user 如何获取已经有的 service,需要加入一个获取机制 #8 (remove/disable Registry) - Reduces complexity until needed
  4. Address 补齐验证机制 #4 (configurable query limit) - Important for scalability
  5. Add godoc comments (合约错误显示似乎不完全 #6) - Improves maintainability

Testing Recommendations:

  • ✅ Unit tests: Build, vet, and existing tests pass
  • 🔲 Manual test: Verify /metrics endpoint returns fine-tuning metrics
  • 🔲 Manual test: Test vLLM integration with auto-registration of finished task LoRA adapters
  • 🔲 Manual test: Verify ownership auth — non-owner gets 403 on inference requests
  • 🔲 Manual test: Test system reboot recovery (LoRA modules persist and auto-discover)
  • 🔲 Load test: Verify behavior with >1000 finished tasks (pagination)
  • 🔲 Integration test: Test vLLM lifecycle management (start, health check, stop, restart)
  • 🔲 Integration test: Test multiple concurrent LoRA adapters with different users

Documentation Needed:

  • Document the persistent storage requirement for LoRA modules in deployment guide
  • Document the model naming scheme and expected uniqueness
  • Add API documentation for new serving endpoints (/v1/serving/*)
  • Document GPU isolation strategy (training vs inference GPUs)

Review Checklist

  • Read and understand PR changes
  • Review monitoring implementation (monitor.go)
  • Review serving module (manager.go, proxy.go, registry.go)
  • Review modified files for integration
  • Check against CLAUDE.md standards (Go best practices, blockchain security, error handling, resource management, testing)
  • Verify previous review issues were addressed
  • Compile and post comprehensive review

Recommendation: ✅ Approve with minor revisions. Address high-priority issues (#1, #2) before merging to production, but the code is solid enough for staging deployment.

Wuying Created Local Users and others added 7 commits February 25, 2026 18:54
- Add comment explaining uniqueUsersChan buffer sizing rationale
- Add godoc + sanitization comment to makeModelName
- Use http.NewRequestWithContext in GetVLLMModels for proper context propagation
- Use fmt.Errorf for string-formatted errors (consistent error style)
- Add startup log to Registry clarifying local-only tracking
- Wrap task state poller goroutine in sync.WaitGroup for clean shutdown

Co-authored-by: Cursor <cursoragent@cursor.com>
…e offload/restore

Adds a four-tier caching system for LoRA adapters:
- GPU (hot): vLLM native LRU via --max-loras
- CPU (warm): vLLM native via --max-cpu-loras
- Disk (cool): dynamic loading via lora_filesystem_resolver plugin
- 0G Storage (cold): automatic offload of inactive adapters and on-demand restore

Key changes:
- New model_cache.go with ModelState enum, StorageDownloader interface,
  offload loop, and async restore logic
- Manager tracks LastAccessedAt/State/OutputRootHash per model
- vLLM startup includes --max-cpu-loras and filesystem resolver env vars
- Proxy returns HTTP 202 with loading status for archived/loading models
- Health endpoint reports cache tier statistics
- Config adds MaxCpuLoras, OffloadAfterMinutes, EnableColdStorage fields

Made-with: Cursor
…caching

17 tests covering:
- Model registration, state tracking, and access recording
- Offload logic (stale models, skip without hash, skip recent)
- Restore logic (async download, idempotent for active/loading)
- Unregister, prune, ownership checks, model name generation
- Full offload→restore cycle end-to-end

Made-with: Cursor
Covers architecture, multi-tier caching strategy (GPU→CPU→Disk→0G Storage),
API reference, configuration guide, authentication, deployment, and testing
results from end-to-end validation on NVIDIA H20.

Made-with: Cursor
…ry leak

vLLM 0.16.0 uses a multi-process architecture (APIServer + EngineCore).
Previously Stop() only sent SIGINT to the main process, leaving EngineCore
as an orphan still holding GPU memory. This caused GPU OOM for other users.

Fix: start vLLM with Setpgid=true and kill the entire process group
(-pid) with SIGTERM/SIGKILL on shutdown.

Also adds an E2E serving test program that validates the full flow:
MySQL -> Manager -> vLLM -> Proxy -> authenticated inference requests.
All 11 test cases passed on H20 GPU with 3 concurrent LoRA adapters.

Made-with: Cursor
Instead of always returning HTTP 202 when a model is archived/loading,
clients can now set wait_for_model=true in the request body to block
until the model is ready, similar to ServerlessLLM's approach.

- Add WaitForModel with channel-based notification mechanism
- Add configurable ModelLoadTimeoutSeconds (default 300s)
- Add E2E test cases for wait_for_model feature
- Add 6 new unit tests covering wait scenarios

Made-with: Cursor
Allows configuring --gpu-memory-utilization for vLLM to coexist with
other GPU processes on shared machines.

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant