Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions decentralized-api/apiconfig/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package apiconfig

// Timing constants used by onboarding UX helpers in the admin layer.
// Kept in apiconfig (not admin) so other components — including the
// selfcheck mode — can reference the same defaults without importing
// the admin package.
const (
// DefaultBlockTimeSeconds is the assumed average chain block time
// used to translate block-distance to seconds for human-facing timing.
DefaultBlockTimeSeconds = 6.0

// AutoTestMinSecondsBeforePoC is the minimum slack required before
// the next PoC to trigger a pre-PoC self-test on an MLnode. Below
// this slack we avoid disrupting an MLnode that may need to load
// models for the upcoming PoC.
AutoTestMinSecondsBeforePoC int64 = 3600

// OnlineAlertLeadSeconds is the lead time before the next PoC at
// which we switch UX guidance from "safe to be offline" to
// "must be online now".
OnlineAlertLeadSeconds int64 = 600
)
7 changes: 6 additions & 1 deletion decentralized-api/broker/node_worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ func (c InferenceUpNodeCommand) Execute(ctx context.Context, worker *NodeWorker)
result.Succeeded = false
result.Error = "No epoch models available for this node"
result.FinalStatus = types.HardwareNodeStatus_FAILED
logging.Error(result.Error, types.Nodes, "node_id", worker.nodeId)
// Info, not Error: this state is normal when the participant
// has not yet joined the active set for the current epoch.
// A genuine misconfiguration shows up downstream as a
// FAILED node status; logging at Error here only spams
// healthy onboarding flows.
logging.Info(result.Error+" (participant may not be active in current epoch)", types.Nodes, "node_id", worker.nodeId)
Comment on lines +138 to +143
return result
}

Expand Down
178 changes: 178 additions & 0 deletions decentralized-api/internal/server/admin/mlnode_tester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package admin

import (
"context"
"decentralized-api/apiconfig"
"decentralized-api/mlnodeclient"
"errors"
"fmt"
"sync"
"time"
)

// TestResultStatus is the outcome of a one-shot MLnode validation.
type TestResultStatus string

const (
TestSuccess TestResultStatus = "SUCCESS"
TestFailed TestResultStatus = "FAILED"
)

// TestResult is the report returned by MLNodeTester.Run for one node.
// Reported back to the HTTP caller verbatim; not stored on broker
// state. Each per-model load time is recorded in LoadMs to help
// operators spot slow model loads.
type TestResult struct {
NodeId string `json:"node_id"`
Status TestResultStatus `json:"status"`
FailingModel string `json:"failing_model,omitempty"`
Error string `json:"error,omitempty"`
LoadMs map[string]int64 `json:"load_ms,omitempty"`
HealthMs int64 `json:"health_ms,omitempty"`
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
}

// ErrTestInProgress is returned by MLNodeTester.Run when a test is
// already running for the same node id. Callers should translate this
// to HTTP 409 Conflict.
var ErrTestInProgress = errors.New("mlnode test already in progress for this node")

// ErrNodeNotFound is returned by MLNodeTester.Run when the requested
// node id is not present in the configured node list. Callers should
// translate this to HTTP 404 Not Found.
var ErrNodeNotFound = errors.New("node not configured")

// MLNodeTester runs one-shot validation against a configured MLnode
// without involving the broker. It is safe for concurrent use; a
// per-node mutex prevents two tests running against the same node.
type MLNodeTester struct {
configManager *apiconfig.ConfigManager
factory mlnodeclient.ClientFactory

mu sync.Mutex
inFlight map[string]bool
lastTests map[string]*TestResult
}

func NewMLNodeTester(cm *apiconfig.ConfigManager, factory mlnodeclient.ClientFactory) *MLNodeTester {
return &MLNodeTester{
configManager: cm,
factory: factory,
inFlight: map[string]bool{},
lastTests: map[string]*TestResult{},
}
}

// LastResult returns the most recent TestResult for nodeId, or nil
// if no test has been recorded for that node yet.
func (t *MLNodeTester) LastResult(nodeId string) *TestResult {
t.mu.Lock()
defer t.mu.Unlock()
if r, ok := t.lastTests[nodeId]; ok {
return r
}
return nil
}

// IsRunning reports whether a test is currently in flight for nodeId.
func (t *MLNodeTester) IsRunning(nodeId string) bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.inFlight[nodeId]
}

// Run validates the MLnode behind nodeId: for each configured model,
// it loads the model and measures load time; then it runs a single
// inference-health check. Models that load successfully are torn down
// via Stop on completion. Returns ErrNodeNotFound or ErrTestInProgress
// before launching the test.
func (t *MLNodeTester) Run(ctx context.Context, nodeId string) (*TestResult, error) {
cfg, ok := t.findNode(nodeId)
if !ok {
return nil, ErrNodeNotFound
}

t.mu.Lock()
if t.inFlight[nodeId] {
t.mu.Unlock()
return nil, ErrTestInProgress
}
t.inFlight[nodeId] = true
t.mu.Unlock()

defer func() {
t.mu.Lock()
delete(t.inFlight, nodeId)
t.mu.Unlock()
}()

result := t.runOnce(ctx, cfg)

t.mu.Lock()
t.lastTests[nodeId] = result
t.mu.Unlock()
return result, nil
}

func (t *MLNodeTester) findNode(nodeId string) (apiconfig.InferenceNodeConfig, bool) {
for _, n := range t.configManager.GetNodes() {
if n.Id == nodeId {
return n, true
}
}
return apiconfig.InferenceNodeConfig{}, false
}

func (t *MLNodeTester) runOnce(ctx context.Context, cfg apiconfig.InferenceNodeConfig) *TestResult {
started := time.Now()
result := &TestResult{
NodeId: cfg.Id,
Status: TestSuccess,
LoadMs: map[string]int64{},
StartedAt: started,
}
defer func() {
result.DurationMs = time.Since(started).Milliseconds()
}()

pocUrl := fmt.Sprintf("http://%s:%d%s", cfg.Host, cfg.PoCPort, cfg.PoCSegment)
inferenceUrl := fmt.Sprintf("http://%s:%d%s", cfg.Host, cfg.InferencePort, cfg.InferenceSegment)
client := t.factory.CreateClient(pocUrl, inferenceUrl)

// Best-effort cleanup at the end so a successful test leaves the
// MLnode idle, not stuck in inference mode.
defer func() {
stopCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = client.Stop(stopCtx)
}()

for modelId, modelCfg := range cfg.Models {
modelStart := time.Now()
if err := client.InferenceUp(ctx, modelId, modelCfg.Args); err != nil {
result.Status = TestFailed
result.FailingModel = modelId
result.Error = err.Error()
result.LoadMs[modelId] = time.Since(modelStart).Milliseconds()
return result
}
result.LoadMs[modelId] = time.Since(modelStart).Milliseconds()
}
Comment on lines +151 to +161

healthStart := time.Now()
ok, err := client.InferenceHealth(ctx)
result.HealthMs = time.Since(healthStart).Milliseconds()
if err != nil {
result.Status = TestFailed
result.Error = "health check error: " + err.Error()
return result
}
if !ok {
result.Status = TestFailed
result.Error = "health check returned not ok"
return result
}

return result
}
135 changes: 135 additions & 0 deletions decentralized-api/internal/server/admin/mlnode_tester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package admin

import (
"context"
"decentralized-api/apiconfig"
"decentralized-api/mlnodeclient"
"errors"
"os"
"testing"

"github.com/knadh/koanf/providers/file"
)

func newTesterConfig(t *testing.T, nodes []apiconfig.InferenceNodeConfig) *apiconfig.ConfigManager {
t.Helper()
tmpFile, err := os.CreateTemp("", "tester-config-*.yaml")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { os.Remove(tmpFile.Name()) })
if _, err := tmpFile.Write([]byte("nodes: []")); err != nil {
t.Fatal(err)
}
tmpFile.Close()

cm := &apiconfig.ConfigManager{
KoanProvider: file.Provider(tmpFile.Name()),
WriterProvider: apiconfig.NewFileWriteCloserProvider(tmpFile.Name()),
}
if err := cm.Load(); err != nil {
t.Fatalf("Load: %v", err)
}
if len(nodes) > 0 {
if err := cm.SetNodes(nodes); err != nil {
t.Fatalf("SetNodes: %v", err)
}
}
return cm
}

func TestMLNodeTester_NodeNotFound(t *testing.T) {
cm := newTesterConfig(t, nil)
factory := mlnodeclient.NewMockClientFactory()
tester := NewMLNodeTester(cm, factory)

_, err := tester.Run(context.Background(), "missing")
if !errors.Is(err, ErrNodeNotFound) {
t.Fatalf("got %v, want ErrNodeNotFound", err)
}
}

func TestMLNodeTester_SuccessRecordsResult(t *testing.T) {
cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{
Id: "node1",
Host: "test-host",
PoCPort: 8080,
InferencePort: 5000,
Models: map[string]apiconfig.ModelConfig{
"Qwen2.5-7B-Instruct": {Args: []string{"--quantization", "fp8"}},
},
}})
factory := mlnodeclient.NewMockClientFactory()

// Pre-create the mock client at the same pocUrl the tester will
// use, and configure it to report healthy.
mockClient := factory.CreateClient("http://test-host:8080", "http://test-host:5000").(*mlnodeclient.MockClient)
mockClient.InferenceIsHealthy = true

tester := NewMLNodeTester(cm, factory)
result, err := tester.Run(context.Background(), "node1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != TestSuccess {
t.Fatalf("got %q, want %q (error=%q)", result.Status, TestSuccess, result.Error)
}
if _, ok := result.LoadMs["Qwen2.5-7B-Instruct"]; !ok {
t.Fatalf("LoadMs missing for model: %+v", result.LoadMs)
}
if got := tester.LastResult("node1"); got == nil || got.Status != TestSuccess {
t.Fatalf("LastResult not recorded: %+v", got)
}
}

func TestMLNodeTester_ModelLoadFailure(t *testing.T) {
cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{
Id: "node1",
Host: "test-host",
PoCPort: 8080,
InferencePort: 5000,
Models: map[string]apiconfig.ModelConfig{
"broken-model": {},
},
}})
factory := mlnodeclient.NewMockClientFactory()
mockClient := factory.CreateClient("http://test-host:8080", "http://test-host:5000").(*mlnodeclient.MockClient)
mockClient.InferenceUpError = errors.New("OOM")

tester := NewMLNodeTester(cm, factory)
result, err := tester.Run(context.Background(), "node1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Status != TestFailed {
t.Fatalf("got %q, want %q", result.Status, TestFailed)
}
if result.FailingModel != "broken-model" {
t.Errorf("FailingModel=%q, want broken-model", result.FailingModel)
}
if result.Error == "" {
t.Errorf("Error empty, expected OOM message")
}
}

func TestMLNodeTester_RejectsConcurrent(t *testing.T) {
cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{
Id: "node1",
Host: "test-host",
PoCPort: 8080,
InferencePort: 5000,
Models: map[string]apiconfig.ModelConfig{"m": {}},
}})
factory := mlnodeclient.NewMockClientFactory()
tester := NewMLNodeTester(cm, factory)

// Simulate an ongoing test, then verify a concurrent Run rejects.
tester.mu.Lock()
tester.inFlight["node1"] = true
tester.mu.Unlock()

_, err := tester.Run(context.Background(), "node1")
if !errors.Is(err, ErrTestInProgress) {
t.Fatalf("got %v, want ErrTestInProgress", err)
}
}
Loading
Loading