diff --git a/decentralized-api/apiconfig/constants.go b/decentralized-api/apiconfig/constants.go new file mode 100644 index 0000000000..c6b7b71444 --- /dev/null +++ b/decentralized-api/apiconfig/constants.go @@ -0,0 +1,22 @@ +package apiconfig + +// Timing constants used by onboarding UX helpers in the admin layer. +// Kept in apiconfig (not admin) so other components — including the +// selfcheck mode — can reference the same defaults without importing +// the admin package. +const ( + // DefaultBlockTimeSeconds is the assumed average chain block time + // used to translate block-distance to seconds for human-facing timing. + DefaultBlockTimeSeconds = 6.0 + + // AutoTestMinSecondsBeforePoC is the minimum slack required before + // the next PoC to trigger a pre-PoC self-test on an MLnode. Below + // this slack we avoid disrupting an MLnode that may need to load + // models for the upcoming PoC. + AutoTestMinSecondsBeforePoC int64 = 3600 + + // OnlineAlertLeadSeconds is the lead time before the next PoC at + // which we switch UX guidance from "safe to be offline" to + // "must be online now". + OnlineAlertLeadSeconds int64 = 600 +) diff --git a/decentralized-api/broker/node_worker_commands.go b/decentralized-api/broker/node_worker_commands.go index 2bcfbfba6d..6f30549b93 100644 --- a/decentralized-api/broker/node_worker_commands.go +++ b/decentralized-api/broker/node_worker_commands.go @@ -135,7 +135,12 @@ func (c InferenceUpNodeCommand) Execute(ctx context.Context, worker *NodeWorker) result.Succeeded = false result.Error = "No epoch models available for this node" result.FinalStatus = types.HardwareNodeStatus_FAILED - logging.Error(result.Error, types.Nodes, "node_id", worker.nodeId) + // Info, not Error: this state is normal when the participant + // has not yet joined the active set for the current epoch. + // A genuine misconfiguration shows up downstream as a + // FAILED node status; logging at Error here only spams + // healthy onboarding flows. + logging.Info(result.Error+" (participant may not be active in current epoch)", types.Nodes, "node_id", worker.nodeId) return result } diff --git a/decentralized-api/internal/server/admin/mlnode_tester.go b/decentralized-api/internal/server/admin/mlnode_tester.go new file mode 100644 index 0000000000..556c64c8e7 --- /dev/null +++ b/decentralized-api/internal/server/admin/mlnode_tester.go @@ -0,0 +1,178 @@ +package admin + +import ( + "context" + "decentralized-api/apiconfig" + "decentralized-api/mlnodeclient" + "errors" + "fmt" + "sync" + "time" +) + +// TestResultStatus is the outcome of a one-shot MLnode validation. +type TestResultStatus string + +const ( + TestSuccess TestResultStatus = "SUCCESS" + TestFailed TestResultStatus = "FAILED" +) + +// TestResult is the report returned by MLNodeTester.Run for one node. +// Reported back to the HTTP caller verbatim; not stored on broker +// state. Each per-model load time is recorded in LoadMs to help +// operators spot slow model loads. +type TestResult struct { + NodeId string `json:"node_id"` + Status TestResultStatus `json:"status"` + FailingModel string `json:"failing_model,omitempty"` + Error string `json:"error,omitempty"` + LoadMs map[string]int64 `json:"load_ms,omitempty"` + HealthMs int64 `json:"health_ms,omitempty"` + StartedAt time.Time `json:"started_at"` + DurationMs int64 `json:"duration_ms"` +} + +// ErrTestInProgress is returned by MLNodeTester.Run when a test is +// already running for the same node id. Callers should translate this +// to HTTP 409 Conflict. +var ErrTestInProgress = errors.New("mlnode test already in progress for this node") + +// ErrNodeNotFound is returned by MLNodeTester.Run when the requested +// node id is not present in the configured node list. Callers should +// translate this to HTTP 404 Not Found. +var ErrNodeNotFound = errors.New("node not configured") + +// MLNodeTester runs one-shot validation against a configured MLnode +// without involving the broker. It is safe for concurrent use; a +// per-node mutex prevents two tests running against the same node. +type MLNodeTester struct { + configManager *apiconfig.ConfigManager + factory mlnodeclient.ClientFactory + + mu sync.Mutex + inFlight map[string]bool + lastTests map[string]*TestResult +} + +func NewMLNodeTester(cm *apiconfig.ConfigManager, factory mlnodeclient.ClientFactory) *MLNodeTester { + return &MLNodeTester{ + configManager: cm, + factory: factory, + inFlight: map[string]bool{}, + lastTests: map[string]*TestResult{}, + } +} + +// LastResult returns the most recent TestResult for nodeId, or nil +// if no test has been recorded for that node yet. +func (t *MLNodeTester) LastResult(nodeId string) *TestResult { + t.mu.Lock() + defer t.mu.Unlock() + if r, ok := t.lastTests[nodeId]; ok { + return r + } + return nil +} + +// IsRunning reports whether a test is currently in flight for nodeId. +func (t *MLNodeTester) IsRunning(nodeId string) bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.inFlight[nodeId] +} + +// Run validates the MLnode behind nodeId: for each configured model, +// it loads the model and measures load time; then it runs a single +// inference-health check. Models that load successfully are torn down +// via Stop on completion. Returns ErrNodeNotFound or ErrTestInProgress +// before launching the test. +func (t *MLNodeTester) Run(ctx context.Context, nodeId string) (*TestResult, error) { + cfg, ok := t.findNode(nodeId) + if !ok { + return nil, ErrNodeNotFound + } + + t.mu.Lock() + if t.inFlight[nodeId] { + t.mu.Unlock() + return nil, ErrTestInProgress + } + t.inFlight[nodeId] = true + t.mu.Unlock() + + defer func() { + t.mu.Lock() + delete(t.inFlight, nodeId) + t.mu.Unlock() + }() + + result := t.runOnce(ctx, cfg) + + t.mu.Lock() + t.lastTests[nodeId] = result + t.mu.Unlock() + return result, nil +} + +func (t *MLNodeTester) findNode(nodeId string) (apiconfig.InferenceNodeConfig, bool) { + for _, n := range t.configManager.GetNodes() { + if n.Id == nodeId { + return n, true + } + } + return apiconfig.InferenceNodeConfig{}, false +} + +func (t *MLNodeTester) runOnce(ctx context.Context, cfg apiconfig.InferenceNodeConfig) *TestResult { + started := time.Now() + result := &TestResult{ + NodeId: cfg.Id, + Status: TestSuccess, + LoadMs: map[string]int64{}, + StartedAt: started, + } + defer func() { + result.DurationMs = time.Since(started).Milliseconds() + }() + + pocUrl := fmt.Sprintf("http://%s:%d%s", cfg.Host, cfg.PoCPort, cfg.PoCSegment) + inferenceUrl := fmt.Sprintf("http://%s:%d%s", cfg.Host, cfg.InferencePort, cfg.InferenceSegment) + client := t.factory.CreateClient(pocUrl, inferenceUrl) + + // Best-effort cleanup at the end so a successful test leaves the + // MLnode idle, not stuck in inference mode. + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = client.Stop(stopCtx) + }() + + for modelId, modelCfg := range cfg.Models { + modelStart := time.Now() + if err := client.InferenceUp(ctx, modelId, modelCfg.Args); err != nil { + result.Status = TestFailed + result.FailingModel = modelId + result.Error = err.Error() + result.LoadMs[modelId] = time.Since(modelStart).Milliseconds() + return result + } + result.LoadMs[modelId] = time.Since(modelStart).Milliseconds() + } + + healthStart := time.Now() + ok, err := client.InferenceHealth(ctx) + result.HealthMs = time.Since(healthStart).Milliseconds() + if err != nil { + result.Status = TestFailed + result.Error = "health check error: " + err.Error() + return result + } + if !ok { + result.Status = TestFailed + result.Error = "health check returned not ok" + return result + } + + return result +} diff --git a/decentralized-api/internal/server/admin/mlnode_tester_test.go b/decentralized-api/internal/server/admin/mlnode_tester_test.go new file mode 100644 index 0000000000..3d9b094293 --- /dev/null +++ b/decentralized-api/internal/server/admin/mlnode_tester_test.go @@ -0,0 +1,135 @@ +package admin + +import ( + "context" + "decentralized-api/apiconfig" + "decentralized-api/mlnodeclient" + "errors" + "os" + "testing" + + "github.com/knadh/koanf/providers/file" +) + +func newTesterConfig(t *testing.T, nodes []apiconfig.InferenceNodeConfig) *apiconfig.ConfigManager { + t.Helper() + tmpFile, err := os.CreateTemp("", "tester-config-*.yaml") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { os.Remove(tmpFile.Name()) }) + if _, err := tmpFile.Write([]byte("nodes: []")); err != nil { + t.Fatal(err) + } + tmpFile.Close() + + cm := &apiconfig.ConfigManager{ + KoanProvider: file.Provider(tmpFile.Name()), + WriterProvider: apiconfig.NewFileWriteCloserProvider(tmpFile.Name()), + } + if err := cm.Load(); err != nil { + t.Fatalf("Load: %v", err) + } + if len(nodes) > 0 { + if err := cm.SetNodes(nodes); err != nil { + t.Fatalf("SetNodes: %v", err) + } + } + return cm +} + +func TestMLNodeTester_NodeNotFound(t *testing.T) { + cm := newTesterConfig(t, nil) + factory := mlnodeclient.NewMockClientFactory() + tester := NewMLNodeTester(cm, factory) + + _, err := tester.Run(context.Background(), "missing") + if !errors.Is(err, ErrNodeNotFound) { + t.Fatalf("got %v, want ErrNodeNotFound", err) + } +} + +func TestMLNodeTester_SuccessRecordsResult(t *testing.T) { + cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{ + Id: "node1", + Host: "test-host", + PoCPort: 8080, + InferencePort: 5000, + Models: map[string]apiconfig.ModelConfig{ + "Qwen2.5-7B-Instruct": {Args: []string{"--quantization", "fp8"}}, + }, + }}) + factory := mlnodeclient.NewMockClientFactory() + + // Pre-create the mock client at the same pocUrl the tester will + // use, and configure it to report healthy. + mockClient := factory.CreateClient("http://test-host:8080", "http://test-host:5000").(*mlnodeclient.MockClient) + mockClient.InferenceIsHealthy = true + + tester := NewMLNodeTester(cm, factory) + result, err := tester.Run(context.Background(), "node1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Status != TestSuccess { + t.Fatalf("got %q, want %q (error=%q)", result.Status, TestSuccess, result.Error) + } + if _, ok := result.LoadMs["Qwen2.5-7B-Instruct"]; !ok { + t.Fatalf("LoadMs missing for model: %+v", result.LoadMs) + } + if got := tester.LastResult("node1"); got == nil || got.Status != TestSuccess { + t.Fatalf("LastResult not recorded: %+v", got) + } +} + +func TestMLNodeTester_ModelLoadFailure(t *testing.T) { + cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{ + Id: "node1", + Host: "test-host", + PoCPort: 8080, + InferencePort: 5000, + Models: map[string]apiconfig.ModelConfig{ + "broken-model": {}, + }, + }}) + factory := mlnodeclient.NewMockClientFactory() + mockClient := factory.CreateClient("http://test-host:8080", "http://test-host:5000").(*mlnodeclient.MockClient) + mockClient.InferenceUpError = errors.New("OOM") + + tester := NewMLNodeTester(cm, factory) + result, err := tester.Run(context.Background(), "node1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Status != TestFailed { + t.Fatalf("got %q, want %q", result.Status, TestFailed) + } + if result.FailingModel != "broken-model" { + t.Errorf("FailingModel=%q, want broken-model", result.FailingModel) + } + if result.Error == "" { + t.Errorf("Error empty, expected OOM message") + } +} + +func TestMLNodeTester_RejectsConcurrent(t *testing.T) { + cm := newTesterConfig(t, []apiconfig.InferenceNodeConfig{{ + Id: "node1", + Host: "test-host", + PoCPort: 8080, + InferencePort: 5000, + Models: map[string]apiconfig.ModelConfig{"m": {}}, + }}) + factory := mlnodeclient.NewMockClientFactory() + tester := NewMLNodeTester(cm, factory) + + // Simulate an ongoing test, then verify a concurrent Run rejects. + tester.mu.Lock() + tester.inFlight["node1"] = true + tester.mu.Unlock() + + _, err := tester.Run(context.Background(), "node1") + if !errors.Is(err, ErrTestInProgress) { + t.Fatalf("got %v, want ErrTestInProgress", err) + } +} diff --git a/decentralized-api/internal/server/admin/node_handlers.go b/decentralized-api/internal/server/admin/node_handlers.go index ba7b093a76..2429d7d9c2 100644 --- a/decentralized-api/internal/server/admin/node_handlers.go +++ b/decentralized-api/internal/server/admin/node_handlers.go @@ -1,23 +1,112 @@ package admin import ( + "context" "decentralized-api/apiconfig" "decentralized-api/broker" + "decentralized-api/chainphase" "decentralized-api/logging" + "errors" "fmt" "net/http" + "time" "github.com/labstack/echo/v4" "github.com/productscience/inference/x/inference/types" ) +// NodeWithOnboarding is the admin GET /nodes response item. It wraps +// the broker's view of a node with onboarding UX fields that are +// derived at the handler layer (not stored on broker state). +type NodeWithOnboarding struct { + broker.NodeResponse + Onboarding *OnboardingStatus `json:"onboarding,omitempty"` +} + +// OnboardingStatus aggregates everything the admin UI needs to show +// human-friendly setup state without inspecting raw broker internals. +type OnboardingStatus struct { + ParticipantState string `json:"participant_state"` + MLNodeState MLNodeOnboardingState `json:"mlnode_state"` + Timing *TimingInfo `json:"timing,omitempty"` + UserMessage string `json:"user_message,omitempty"` + Guidance string `json:"guidance,omitempty"` +} + func (s *Server) getNodes(ctx echo.Context) error { nodes, err := s.nodeBroker.GetNodes() if err != nil { logging.Error("Error getting nodes", types.Nodes, "error", err) return err } - return ctx.JSON(http.StatusOK, nodes) + + enriched := make([]NodeWithOnboarding, len(nodes)) + for i, n := range nodes { + enriched[i] = NodeWithOnboarding{ + NodeResponse: n, + Onboarding: s.computeOnboarding(n), + } + } + return ctx.JSON(http.StatusOK, enriched) +} + +// computeOnboarding derives onboarding UX fields for one node from +// the broker's NodeResponse + cached participant activity + the +// chain phase tracker. It performs no chain RPC calls and does not +// mutate broker state. Returns nil if neither timing nor activity +// is known yet (e.g. chain not yet synced and tracker not started). +func (s *Server) computeOnboarding(n broker.NodeResponse) *OnboardingStatus { + var epochState *chainphase.EpochState + if s.phaseTracker != nil { + epochState = s.phaseTracker.GetCurrentEpochState() + } + timing := ComputeTiming(epochState) + if timing == nil && s.activityTracker == nil { + return nil + } + + active := false + if s.activityTracker != nil { + active = s.activityTracker.IsActive() + } + // EpochMLNodes already-populated also implies activity for this + // participant in the current epoch; treat as a secondary signal. + if !active && len(n.State.EpochMLNodes) > 0 { + active = true + } + + participantState := DeriveParticipantState(active) + + var seconds int64 + if timing != nil { + seconds = timing.SecondsUntilNextPoC + } + + testFailed := n.State.FailureReason != "" && + n.State.CurrentStatus == types.HardwareNodeStatus_FAILED + mlState, _ := DeriveMLNodeState(OnboardingStateInputs{ + ParticipantActive: active, + IsTesting: false, + TestFailed: testFailed, + SecondsUntilNextPoC: seconds, + }) + + var userMsg, guidance string + if active { + userMsg = BuildMLNodeMessage(mlState, seconds, "") + guidance = BuildParticipantMessage(participantState) + } else { + userMsg = BuildParticipantMessage(participantState) + guidance = BuildInactiveGuidance(seconds) + } + + return &OnboardingStatus{ + ParticipantState: string(participantState), + MLNodeState: mlState, + Timing: timing, + UserMessage: userMsg, + Guidance: guidance, + } } func (s *Server) deleteNode(ctx echo.Context) error { @@ -187,6 +276,41 @@ func (s *Server) addNode(newNode apiconfig.InferenceNodeConfig) (apiconfig.Infer return *node, nil } +// postNodeTest handles POST /admin/v1/nodes/:id/test by running a +// synchronous one-shot validation against the configured MLnode. +// Does not mutate broker state — the response carries the result +// for the caller to display. +// +// Status codes: +// 200 — test completed (body contains TestResult, possibly with status=FAILED) +// 404 — node id not in configManager +// 409 — a test is already running for this node +func (s *Server) postNodeTest(c echo.Context) error { + nodeId := c.Param("id") + if nodeId == "" { + return c.JSON(http.StatusBadRequest, map[string]string{"error": "node id is required"}) + } + if s.tester == nil { + return c.JSON(http.StatusInternalServerError, map[string]string{"error": "tester not initialized"}) + } + + // Per-call timeout: model load + health probe is expected to + // complete well under this. Clients can retry on timeout. + ctx, cancel := context.WithTimeout(c.Request().Context(), 5*time.Minute) + defer cancel() + + result, err := s.tester.Run(ctx, nodeId) + switch { + case errors.Is(err, ErrNodeNotFound): + return c.JSON(http.StatusNotFound, map[string]string{"error": err.Error(), "node_id": nodeId}) + case errors.Is(err, ErrTestInProgress): + return c.JSON(http.StatusConflict, map[string]string{"error": err.Error(), "node_id": nodeId}) + case err != nil: + return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()}) + } + return c.JSON(http.StatusOK, result) +} + // enableNode handles POST /admin/v1/nodes/:id/enable func (s *Server) enableNode(c echo.Context) error { nodeId := c.Param("id") diff --git a/decentralized-api/internal/server/admin/onboarding_state_manager.go b/decentralized-api/internal/server/admin/onboarding_state_manager.go new file mode 100644 index 0000000000..11706d21b2 --- /dev/null +++ b/decentralized-api/internal/server/admin/onboarding_state_manager.go @@ -0,0 +1,62 @@ +package admin + +import ( + "decentralized-api/apiconfig" +) + +// MLNodeOnboardingState describes the user-visible onboarding state of +// an individual MLnode as derived at the API layer. The broker does +// not track or mutate this state — it is computed on each admin read +// from broker outputs and chain state. +type MLNodeOnboardingState string + +const ( + MLNodeState_WAITING_FOR_POC MLNodeOnboardingState = "WAITING_FOR_POC" + MLNodeState_TESTING MLNodeOnboardingState = "TESTING" + MLNodeState_TEST_FAILED MLNodeOnboardingState = "TEST_FAILED" +) + +// ParticipantState describes whether the participant the API is +// running on behalf of has joined the active set in the current epoch. +type ParticipantState string + +const ( + ParticipantState_INACTIVE_WAITING ParticipantState = "INACTIVE_WAITING" + ParticipantState_ACTIVE_PARTICIPATING ParticipantState = "ACTIVE_PARTICIPATING" +) + +// OnboardingStateInputs aggregates everything required to derive +// onboarding state for one MLnode. All fields are inputs computed +// elsewhere — none of these are mutated by the helpers in this file. +type OnboardingStateInputs struct { + ParticipantActive bool + IsTesting bool + TestFailed bool + SecondsUntilNextPoC int64 +} + +// DeriveParticipantState reports whether the participant is in the +// active set right now. +func DeriveParticipantState(active bool) ParticipantState { + if active { + return ParticipantState_ACTIVE_PARTICIPATING + } + return ParticipantState_INACTIVE_WAITING +} + +// DeriveMLNodeState computes the MLnode onboarding state for a node +// given the inputs. Returns the state and whether the user should be +// alerted to bring the MLnode online ("alert" implies the PoC window +// is approaching or active). +func DeriveMLNodeState(in OnboardingStateInputs) (state MLNodeOnboardingState, alert bool) { + switch { + case in.TestFailed: + return MLNodeState_TEST_FAILED, true + case in.IsTesting: + return MLNodeState_TESTING, true + case in.SecondsUntilNextPoC <= apiconfig.OnlineAlertLeadSeconds: + return MLNodeState_WAITING_FOR_POC, true + default: + return MLNodeState_WAITING_FOR_POC, false + } +} diff --git a/decentralized-api/internal/server/admin/onboarding_state_manager_test.go b/decentralized-api/internal/server/admin/onboarding_state_manager_test.go new file mode 100644 index 0000000000..8c4029e6cb --- /dev/null +++ b/decentralized-api/internal/server/admin/onboarding_state_manager_test.go @@ -0,0 +1,93 @@ +package admin + +import ( + "decentralized-api/apiconfig" + "strings" + "testing" +) + +func TestDeriveParticipantState(t *testing.T) { + if got := DeriveParticipantState(true); got != ParticipantState_ACTIVE_PARTICIPATING { + t.Errorf("active=true: got %q want %q", got, ParticipantState_ACTIVE_PARTICIPATING) + } + if got := DeriveParticipantState(false); got != ParticipantState_INACTIVE_WAITING { + t.Errorf("active=false: got %q want %q", got, ParticipantState_INACTIVE_WAITING) + } +} + +func TestDeriveMLNodeState(t *testing.T) { + tests := []struct { + name string + in OnboardingStateInputs + wantState MLNodeOnboardingState + wantAlert bool + }{ + { + name: "test failed takes precedence", + in: OnboardingStateInputs{TestFailed: true, IsTesting: true, SecondsUntilNextPoC: 99999}, + wantState: MLNodeState_TEST_FAILED, + wantAlert: true, + }, + { + name: "testing in progress", + in: OnboardingStateInputs{IsTesting: true, SecondsUntilNextPoC: 99999}, + wantState: MLNodeState_TESTING, + wantAlert: true, + }, + { + name: "waiting, plenty of time -> no alert", + in: OnboardingStateInputs{SecondsUntilNextPoC: apiconfig.OnlineAlertLeadSeconds + 1}, + wantState: MLNodeState_WAITING_FOR_POC, + wantAlert: false, + }, + { + name: "waiting, within alert lead -> alert", + in: OnboardingStateInputs{SecondsUntilNextPoC: apiconfig.OnlineAlertLeadSeconds}, + wantState: MLNodeState_WAITING_FOR_POC, + wantAlert: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotState, gotAlert := DeriveMLNodeState(tc.in) + if gotState != tc.wantState || gotAlert != tc.wantAlert { + t.Errorf("got (%q, %v) want (%q, %v)", gotState, gotAlert, tc.wantState, tc.wantAlert) + } + }) + } +} + +func TestBuildMLNodeMessage(t *testing.T) { + got := BuildMLNodeMessage(MLNodeState_TEST_FAILED, 0, "Qwen2.5-7B") + if !strings.Contains(got, "Qwen2.5-7B") { + t.Errorf("failing model not surfaced: %q", got) + } + near := BuildMLNodeMessage(MLNodeState_WAITING_FOR_POC, apiconfig.OnlineAlertLeadSeconds-1, "") + if !strings.Contains(near, "must be online") { + t.Errorf("alert message missing online directive: %q", near) + } + far := BuildMLNodeMessage(MLNodeState_WAITING_FOR_POC, apiconfig.OnlineAlertLeadSeconds+3600, "") + if !strings.Contains(far, "safe to be offline") { + t.Errorf("non-alert message missing offline guidance: %q", far) + } +} + +func TestFormatShortDuration(t *testing.T) { + tests := []struct { + seconds int64 + want string + }{ + {0, "0s"}, + {-1, "0s"}, + {45, "45s"}, + {120, "2m"}, + {125, "2m 5s"}, + {3600, "1h"}, + {3660, "1h 1m"}, + } + for _, tc := range tests { + if got := formatShortDuration(tc.seconds); got != tc.want { + t.Errorf("formatShortDuration(%d) = %q, want %q", tc.seconds, got, tc.want) + } + } +} diff --git a/decentralized-api/internal/server/admin/server.go b/decentralized-api/internal/server/admin/server.go index 55b378da97..b3d01351f4 100644 --- a/decentralized-api/internal/server/admin/server.go +++ b/decentralized-api/internal/server/admin/server.go @@ -3,10 +3,13 @@ package admin import ( "decentralized-api/apiconfig" "decentralized-api/broker" + "decentralized-api/chainphase" cosmos_client "decentralized-api/cosmosclient" "decentralized-api/internal/server/middleware" pserver "decentralized-api/internal/server/public" "decentralized-api/internal/validation" + "decentralized-api/mlnodeclient" + "decentralized-api/participant" "decentralized-api/payloadstorage" "cosmossdk.io/x/feegrant" @@ -26,14 +29,17 @@ import ( ) type Server struct { - e *echo.Echo - nodeBroker *broker.Broker - configManager *apiconfig.ConfigManager - recorder cosmos_client.CosmosMessageClient - validator *validation.InferenceValidator - cdc *codec.ProtoCodec - blockQueue *pserver.BridgeQueue - payloadStorage payloadstorage.PayloadStorage + e *echo.Echo + nodeBroker *broker.Broker + configManager *apiconfig.ConfigManager + recorder cosmos_client.CosmosMessageClient + validator *validation.InferenceValidator + cdc *codec.ProtoCodec + blockQueue *pserver.BridgeQueue + payloadStorage payloadstorage.PayloadStorage + phaseTracker *chainphase.ChainPhaseTracker + activityTracker *participant.ActivityTracker + tester *MLNodeTester } func NewServer( @@ -42,20 +48,26 @@ func NewServer( configManager *apiconfig.ConfigManager, validator *validation.InferenceValidator, blockQueue *pserver.BridgeQueue, - payloadStorage payloadstorage.PayloadStorage) *Server { + payloadStorage payloadstorage.PayloadStorage, + phaseTracker *chainphase.ChainPhaseTracker, + activityTracker *participant.ActivityTracker, + mlnodeFactory mlnodeclient.ClientFactory) *Server { cdc := getCodec() e := echo.New() e.HTTPErrorHandler = middleware.TransparentErrorHandler s := &Server{ - e: e, - nodeBroker: nodeBroker, - configManager: configManager, - recorder: recorder, - validator: validator, - cdc: cdc, - blockQueue: blockQueue, - payloadStorage: payloadStorage, + e: e, + nodeBroker: nodeBroker, + configManager: configManager, + recorder: recorder, + validator: validator, + cdc: cdc, + blockQueue: blockQueue, + payloadStorage: payloadStorage, + phaseTracker: phaseTracker, + activityTracker: activityTracker, + tester: NewMLNodeTester(configManager, mlnodeFactory), } e.Use(middleware.LoggingMiddleware) @@ -67,6 +79,7 @@ func NewServer( g.PUT("nodes/:id", s.createNewNode) g.GET("nodes/upgrade-status", s.getUpgradeStatus) g.POST("nodes/version-status", s.postVersionStatus) + g.POST("nodes/:id/test", s.postNodeTest) g.GET("nodes", s.getNodes) g.DELETE("nodes/:id", s.deleteNode) g.POST("nodes/:id/enable", s.enableNode) diff --git a/decentralized-api/internal/server/admin/server_test.go b/decentralized-api/internal/server/admin/server_test.go index c6b17ee895..b1e80cac96 100644 --- a/decentralized-api/internal/server/admin/server_test.go +++ b/decentralized-api/internal/server/admin/server_test.go @@ -135,8 +135,10 @@ func setupTestServer(t *testing.T) (*Server, *apiconfig.ConfigManager, *mlnodecl // 4. Broker nodeBroker := broker.NewBroker(bridge, phaseTracker, mockParticipant, "", mockClientFactory, configManager) - // 5. Server - s := NewServer(mockCosmos, nodeBroker, configManager, nil, nil, nil) + // 5. Server — phaseTracker is the real instance the broker uses; + // activityTracker left nil so tests don't depend on chain queries. + // Reuse the broker's mock factory so tester tests can stub clients. + s := NewServer(mockCosmos, nodeBroker, configManager, nil, nil, nil, phaseTracker, nil, mockClientFactory) return s, configManager, mockClientFactory } diff --git a/decentralized-api/internal/server/admin/status_reporter.go b/decentralized-api/internal/server/admin/status_reporter.go new file mode 100644 index 0000000000..0c8e4b070c --- /dev/null +++ b/decentralized-api/internal/server/admin/status_reporter.go @@ -0,0 +1,52 @@ +package admin + +import ( + "decentralized-api/apiconfig" +) + +// BuildMLNodeMessage produces the human-facing one-liner shown to the +// operator for the MLnode's current onboarding state. +// +// failingModel is included in the TEST_FAILED message when non-empty; +// the WAITING_FOR_POC message branches on whether the next PoC is +// inside the online-alert window. +func BuildMLNodeMessage(state MLNodeOnboardingState, secondsUntilNextPoC int64, failingModel string) string { + switch state { + case MLNodeState_TESTING: + return "Testing MLnode configuration - model loading in progress" + case MLNodeState_TEST_FAILED: + if failingModel == "" { + return "MLnode test failed" + } + return "MLnode test failed: model '" + failingModel + "' could not be loaded" + case MLNodeState_WAITING_FOR_POC: + if secondsUntilNextPoC <= apiconfig.OnlineAlertLeadSeconds { + return "PoC starting soon (in " + formatShortDuration(secondsUntilNextPoC) + + ") - MLnode must be online now" + } + return "Waiting for next PoC cycle (starts in " + formatShortDuration(secondsUntilNextPoC) + + ") - safe to be offline until " + formatShortDuration(apiconfig.OnlineAlertLeadSeconds) + " before PoC" + } + return "" +} + +// BuildParticipantMessage produces the participant-level guidance line. +func BuildParticipantMessage(state ParticipantState) string { + switch state { + case ParticipantState_ACTIVE_PARTICIPATING: + return "Participant is in active set and participating" + case ParticipantState_INACTIVE_WAITING: + return "Participant not yet active - model assignment will occur after joining active set" + } + return "" +} + +// BuildInactiveGuidance is shown alongside BuildParticipantMessage when +// the participant is not yet active, explaining what the system will +// do automatically. +func BuildInactiveGuidance(secondsUntilNextPoC int64) string { + if secondsUntilNextPoC > apiconfig.AutoTestMinSecondsBeforePoC { + return "MLnode will be tested automatically when there is more than 1 hour until next PoC" + } + return "" +} diff --git a/decentralized-api/internal/server/admin/timing_calculator.go b/decentralized-api/internal/server/admin/timing_calculator.go new file mode 100644 index 0000000000..8244e00925 --- /dev/null +++ b/decentralized-api/internal/server/admin/timing_calculator.go @@ -0,0 +1,74 @@ +package admin + +import ( + "decentralized-api/apiconfig" + "decentralized-api/chainphase" + "strconv" + + "github.com/productscience/inference/x/inference/types" +) + +// TimingInfo is the JSON shape returned to admins for one node, +// describing where the chain sits in the PoC cycle. +type TimingInfo struct { + CurrentPhase string `json:"current_phase"` + BlocksUntilNextPoC int64 `json:"blocks_until_next_poc"` + SecondsUntilNextPoC int64 `json:"seconds_until_next_poc"` + ShouldBeOnline bool `json:"should_be_online"` +} + +// ComputeTiming derives TimingInfo from the current epoch state. +// Returns nil when the chain phase tracker has not yet synced (the +// admin handler then omits the timing block entirely rather than +// reporting zeros). +func ComputeTiming(es *chainphase.EpochState) *TimingInfo { + if es.IsNilOrNotSynced() { + return nil + } + currentHeight := es.CurrentBlock.Height + nextPoC := es.LatestEpoch.NextPoCStart() + blocks := nextPoC - currentHeight + if blocks < 0 { + blocks = 0 + } + seconds := int64(float64(blocks) * apiconfig.DefaultBlockTimeSeconds) + return &TimingInfo{ + CurrentPhase: string(es.CurrentPhase), + BlocksUntilNextPoC: blocks, + SecondsUntilNextPoC: seconds, + ShouldBeOnline: shouldBeOnline(es.CurrentPhase, seconds), + } +} + +func shouldBeOnline(phase types.EpochPhase, secondsUntilNextPoC int64) bool { + switch phase { + case types.PoCGeneratePhase, types.PoCGenerateWindDownPhase, + types.PoCValidatePhase, types.PoCValidateWindDownPhase: + return true + } + return secondsUntilNextPoC <= apiconfig.OnlineAlertLeadSeconds +} + +// formatShortDuration renders a non-negative seconds count as a compact +// human string such as "2h 15m", "8m", or "45s". Zero or negative +// values render as "0s". +func formatShortDuration(seconds int64) string { + if seconds <= 0 { + return "0s" + } + h := seconds / 3600 + m := (seconds % 3600) / 60 + s := seconds % 60 + switch { + case h > 0 && m > 0: + return strconv.FormatInt(h, 10) + "h " + strconv.FormatInt(m, 10) + "m" + case h > 0: + return strconv.FormatInt(h, 10) + "h" + case m > 0 && s > 0: + return strconv.FormatInt(m, 10) + "m " + strconv.FormatInt(s, 10) + "s" + case m > 0: + return strconv.FormatInt(m, 10) + "m" + default: + return strconv.FormatInt(s, 10) + "s" + } +} diff --git a/decentralized-api/main.go b/decentralized-api/main.go index c48c592727..49630661c6 100644 --- a/decentralized-api/main.go +++ b/decentralized-api/main.go @@ -17,6 +17,7 @@ import ( "decentralized-api/payloadstorage" "decentralized-api/poc" "decentralized-api/poc/artifacts" + "decentralized-api/selfcheck" "decentralized-api/statsstorage" "net" @@ -60,6 +61,10 @@ func main() { if len(os.Args) >= 2 && os.Args[1] == "pre-upgrade" { os.Exit(1) } + if len(os.Args) >= 2 && os.Args[1] == "selfcheck" { + runSelfcheck() + return + } configManager, err := apiconfig.LoadDefaultConfigManager() if err != nil { @@ -103,6 +108,11 @@ func main() { logging.Error("Failed to get participant info", types.Participants, "error", err) return } + // Background tracker for "is this participant in the current + // active set?" — admin handlers read its cached answer so they + // don't trigger N+1 chain RPCs per request. Started below once + // the cancellable context is available. + activityTracker := participant.NewActivityTracker(recorder, participantInfo.GetAddress(), 30*time.Second) chainBridge := broker.NewBrokerChainBridgeImpl(recorder, configManager.GetChainNodeConfig().Url) nodeBroker := broker.NewBroker(chainBridge, chainPhaseTracker, participantInfo, configManager.GetApiConfig().PoCCallbackUrl, &mlnodeclient.HttpClientFactory{}, configManager) @@ -150,6 +160,10 @@ func main() { // Start periodic config auto-flush of dynamic data to DB configManager.StartAutoFlush(ctx, 60*time.Second) + // Start the participant activity tracker so admin handlers can + // answer "is this participant active?" without a per-request RPC. + activityTracker.Start(ctx) + // Optional off-chain inference stats storage (PostgreSQL-backed when PGHOST is configured). statsStore, err := statsstorage.NewStatsStorage(ctx) if err != nil { @@ -260,7 +274,7 @@ func main() { addr = fmt.Sprintf(":%v", configManager.GetApiConfig().AdminServerPort) logging.Info("start admin server on addr", types.Server, "addr", addr) - adminServer := adminserver.NewServer(recorder, nodeBroker, configManager, validator, blockQueue, payloadStore) + adminServer := adminserver.NewServer(recorder, nodeBroker, configManager, validator, blockQueue, payloadStore, chainPhaseTracker, activityTracker, &mlnodeclient.HttpClientFactory{}) adminServer.Start(addr) nmGrpcPort := configManager.GetApiConfig().NodeManagerGrpcPort @@ -299,6 +313,26 @@ func main() { os.Exit(1) // Exit with an error for cosmovisor to restart the process } +// runSelfcheck drives a one-shot PoC-wiring assertion suite against a +// fresh broker backed by a mocked chain. Prints the report to stderr +// and exits 0 on PASS, 1 on FAIL. Intended for operator confidence +// before joining the network: "does my participant binary react to +// epoch events as expected?" +func runSelfcheck() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + report, err := selfcheck.Run(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "selfcheck: setup error: %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stderr, report.String()) + if !report.Pass { + os.Exit(1) + } + os.Exit(0) +} + func returnStatus(configManager *apiconfig.ConfigManager) { height := configManager.GetHeight() status := map[string]interface{}{ diff --git a/decentralized-api/participant/activity_tracker.go b/decentralized-api/participant/activity_tracker.go new file mode 100644 index 0000000000..4965e2f897 --- /dev/null +++ b/decentralized-api/participant/activity_tracker.go @@ -0,0 +1,111 @@ +package participant + +import ( + "context" + "decentralized-api/cosmosclient" + "decentralized-api/logging" + "sync/atomic" + "time" + + "github.com/productscience/inference/x/inference/types" +) + +// ActivityTracker caches whether the current participant is part of +// the active epoch group. The result is refreshed periodically by a +// background goroutine so that admin HTTP handlers can call +// IsActive() without making any chain RPC calls per request. +// +// "Active" means the participant has at least one MLnode assigned to +// some model subgroup in the current epoch group data. +type ActivityTracker struct { + queryClient queryClient + address string + interval time.Duration + + active atomic.Bool +} + +// queryClient is a narrow interface over cosmosclient.CosmosMessageClient +// covering only what the tracker needs. Defined to make unit testing +// straightforward. +type queryClient interface { + NewInferenceQueryClient() types.QueryClient +} + +// NewActivityTracker constructs a tracker but does not start it. Call +// Start to launch the background refresher. +func NewActivityTracker(client cosmosclient.CosmosMessageClient, address string, interval time.Duration) *ActivityTracker { + if interval <= 0 { + interval = 30 * time.Second + } + return &ActivityTracker{ + queryClient: client, + address: address, + interval: interval, + } +} + +// IsActive returns the most recently observed activity status. False +// until the first successful refresh completes. +func (t *ActivityTracker) IsActive() bool { + return t.active.Load() +} + +// Start launches a goroutine that refreshes IsActive() every interval +// until ctx is cancelled. Performs one immediate refresh before +// returning so IsActive() is meaningful for the first admin request. +func (t *ActivityTracker) Start(ctx context.Context) { + t.refresh(ctx) + go func() { + ticker := time.NewTicker(t.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.refresh(ctx) + } + } + }() +} + +func (t *ActivityTracker) refresh(ctx context.Context) { + active, err := t.query(ctx) + if err != nil { + // Chain may not be ready at startup; keep the previous value + // rather than flipping to false on transient errors. + logging.Debug("ActivityTracker refresh failed", types.Participants, "error", err) + return + } + if t.active.Swap(active) != active { + logging.Info("Participant active status changed", types.Participants, "active", active) + } +} + +func (t *ActivityTracker) query(ctx context.Context) (bool, error) { + q := t.queryClient.NewInferenceQueryClient() + parentResp, err := q.CurrentEpochGroupData(ctx, &types.QueryCurrentEpochGroupDataRequest{}) + if err != nil { + return false, err + } + if parentResp == nil { + return false, nil + } + epochIndex := parentResp.EpochGroupData.EpochIndex + for _, modelId := range parentResp.EpochGroupData.SubGroupModels { + subResp, err := q.EpochGroupData(ctx, &types.QueryGetEpochGroupDataRequest{ + EpochIndex: epochIndex, + ModelId: modelId, + }) + if err != nil || subResp == nil { + continue + } + for _, w := range subResp.EpochGroupData.ValidationWeights { + if w.MemberAddress == t.address && len(w.MlNodes) > 0 { + return true, nil + } + } + } + return false, nil +} diff --git a/decentralized-api/selfcheck/config.go b/decentralized-api/selfcheck/config.go new file mode 100644 index 0000000000..60d8e2e35e --- /dev/null +++ b/decentralized-api/selfcheck/config.go @@ -0,0 +1,35 @@ +package selfcheck + +import ( + "decentralized-api/apiconfig" + "os" + + "github.com/knadh/koanf/providers/file" +) + +// newInMemoryConfig builds a ConfigManager backed by a tmpfile, used +// only during selfcheck. Returns the manager plus a cleanup func that +// removes the tmpfile. +func newInMemoryConfig() (*apiconfig.ConfigManager, func(), error) { + tmp, err := os.CreateTemp("", "selfcheck-config-*.yaml") + if err != nil { + return nil, nil, err + } + if _, err := tmp.Write([]byte("nodes: []")); err != nil { + tmp.Close() + os.Remove(tmp.Name()) + return nil, nil, err + } + tmp.Close() + + cm := &apiconfig.ConfigManager{ + KoanProvider: file.Provider(tmp.Name()), + WriterProvider: apiconfig.NewFileWriteCloserProvider(tmp.Name()), + } + if err := cm.Load(); err != nil { + os.Remove(tmp.Name()) + return nil, nil, err + } + cleanup := func() { os.Remove(tmp.Name()) } + return cm, cleanup, nil +} diff --git a/decentralized-api/selfcheck/evaluator.go b/decentralized-api/selfcheck/evaluator.go new file mode 100644 index 0000000000..2427c0084e --- /dev/null +++ b/decentralized-api/selfcheck/evaluator.go @@ -0,0 +1,137 @@ +package selfcheck + +import ( + "decentralized-api/broker" + "fmt" + "strings" + "time" +) + +// CheckResult is the outcome of one assertion stage. +type CheckResult struct { + Name string `json:"name"` + Pass bool `json:"pass"` + Details string `json:"details,omitempty"` +} + +// Report is the aggregate outcome of a selfcheck run. Pass is true iff +// every stage's Pass is true. +type Report struct { + Pass bool `json:"pass"` + Stages []CheckResult `json:"stages"` +} + +// String renders the report as a human-readable summary suitable for +// printing to stderr at the end of a CLI selfcheck invocation. +func (r Report) String() string { + var b strings.Builder + if r.Pass { + b.WriteString("selfcheck: PASS\n") + } else { + b.WriteString("selfcheck: FAIL\n") + } + for _, s := range r.Stages { + mark := "PASS" + if !s.Pass { + mark = "FAIL" + } + fmt.Fprintf(&b, " [%s] %s", mark, s.Name) + if s.Details != "" { + fmt.Fprintf(&b, " — %s", s.Details) + } + b.WriteString("\n") + } + return b.String() +} + +// Evaluator runs assertions against a broker driven through a +// synthetic PoC cycle by EventDriver. It is intentionally minimal: +// each stage queries broker state via the public GetNodes API and +// reports pass/fail. A full PoC artifact simulation is out of scope. +type Evaluator struct { + Broker *broker.Broker + Bridge *MockChainBridge + NodeId string + Timeout time.Duration +} + +// NewEvaluator constructs an Evaluator with a sensible default +// per-stage poll timeout. +func NewEvaluator(b *broker.Broker, bridge *MockChainBridge, nodeId string) *Evaluator { + return &Evaluator{ + Broker: b, + Bridge: bridge, + NodeId: nodeId, + Timeout: 5 * time.Second, + } +} + +// AssertNodeRegistered waits until the broker reports the synthetic +// node as registered. Pass iff GetNodes returns an entry for NodeId +// within Timeout. +func (e *Evaluator) AssertNodeRegistered() CheckResult { + deadline := time.Now().Add(e.Timeout) + for time.Now().Before(deadline) { + nodes, err := e.Broker.GetNodes() + if err != nil { + return CheckResult{Name: "node-registered", Pass: false, Details: err.Error()} + } + for _, n := range nodes { + if n.Node.Id == e.NodeId { + return CheckResult{Name: "node-registered", Pass: true} + } + } + time.Sleep(50 * time.Millisecond) + } + return CheckResult{Name: "node-registered", Pass: false, Details: "timeout waiting for node"} +} + +// AssertEpochModelsPopulated checks that after a chain refresh, the +// broker has populated EpochMLNodes for the synthetic node — i.e. the +// chain-bridge wiring is working end-to-end through the broker. +func (e *Evaluator) AssertEpochModelsPopulated() CheckResult { + deadline := time.Now().Add(e.Timeout) + for time.Now().Before(deadline) { + nodes, err := e.Broker.GetNodes() + if err != nil { + return CheckResult{Name: "epoch-models-populated", Pass: false, Details: err.Error()} + } + for _, n := range nodes { + if n.Node.Id != e.NodeId { + continue + } + if len(n.State.EpochMLNodes) > 0 { + return CheckResult{Name: "epoch-models-populated", Pass: true} + } + } + time.Sleep(50 * time.Millisecond) + } + return CheckResult{Name: "epoch-models-populated", Pass: false, Details: "EpochMLNodes empty"} +} + +// AssertHardwareDiffSubmitted checks that the broker reported its +// node hardware up to the (mocked) chain via SubmitHardwareDiff. +func (e *Evaluator) AssertHardwareDiffSubmitted() CheckResult { + deadline := time.Now().Add(e.Timeout) + for time.Now().Before(deadline) { + if len(e.Bridge.SubmittedDiffsSnapshot()) > 0 { + return CheckResult{Name: "hardware-diff-submitted", Pass: true} + } + time.Sleep(50 * time.Millisecond) + } + return CheckResult{ + Name: "hardware-diff-submitted", Pass: false, + Details: "broker did not submit hardware diff within timeout", + } +} + +// Combine aggregates stage results into a final Report. +func (e *Evaluator) Combine(stages ...CheckResult) Report { + r := Report{Pass: true, Stages: stages} + for _, s := range stages { + if !s.Pass { + r.Pass = false + } + } + return r +} diff --git a/decentralized-api/selfcheck/event_driver.go b/decentralized-api/selfcheck/event_driver.go new file mode 100644 index 0000000000..d12f65e77d --- /dev/null +++ b/decentralized-api/selfcheck/event_driver.go @@ -0,0 +1,70 @@ +package selfcheck + +import ( + "decentralized-api/broker" + "decentralized-api/chainphase" + + "github.com/productscience/inference/x/inference/types" +) + +// EventDriver advances a ChainPhaseTracker through a synthetic PoC +// cycle and triggers the broker commands that would normally be +// queued by the production OnNewBlockDispatcher. +// +// It does NOT simulate the full PoC artifact protocol — only the +// chain-side events the broker reacts to: +// - epoch group data refresh (Inference phase boundary) +// - PoC start trigger (StartPocCommand) +// - validation start (InitValidateCommand) +// - inference-up resume (InferenceUpAllCommand) +// +// The Evaluator inspects broker state between transitions to judge +// whether the broker is behaving correctly. This is enough to catch +// onboarding regressions where the broker fails to react to common +// epoch events. +type EventDriver struct { + Bridge *MockChainBridge + PhaseTracker *chainphase.ChainPhaseTracker + Broker *broker.Broker + Epoch *types.Epoch + EpochParams *types.EpochParams +} + +// PushBlock updates the phase tracker to the given block height and +// returns the resulting EpochState. The hash is synthetic. +func (d *EventDriver) PushBlock(height int64) *chainphase.EpochState { + d.PhaseTracker.Update( + chainphase.BlockInfo{Height: height, Hash: "selfcheck-blk"}, + d.Epoch, + d.EpochParams, + true, + nil, + ) + return d.PhaseTracker.GetCurrentEpochState() +} + +// RefreshEpochData updates the broker's per-node epoch view from the +// (mocked) chain. This mirrors what OnNewBlockDispatcher does at the +// start of every phase transition. +func (d *EventDriver) RefreshEpochData() error { + es := d.PhaseTracker.GetCurrentEpochState() + return d.Broker.UpdateNodeWithEpochData(es) +} + +// TriggerStartPoC queues the StartPocCommand the broker would normally +// receive from OnNewBlockDispatcher when the chain crosses into the +// PoC generate phase. +func (d *EventDriver) TriggerStartPoC() error { + return d.Broker.QueueMessage(broker.NewStartPocCommand()) +} + +// TriggerInitValidate queues the InitValidateCommand for the validation +// phase transition. +func (d *EventDriver) TriggerInitValidate() error { + return d.Broker.QueueMessage(broker.NewInitValidateCommand()) +} + +// TriggerInferenceUpAll resumes inference at end of validation. +func (d *EventDriver) TriggerInferenceUpAll() error { + return d.Broker.QueueMessage(broker.NewInferenceUpAllCommand()) +} diff --git a/decentralized-api/selfcheck/mock_chain_bridge.go b/decentralized-api/selfcheck/mock_chain_bridge.go new file mode 100644 index 0000000000..ee72d31c50 --- /dev/null +++ b/decentralized-api/selfcheck/mock_chain_bridge.go @@ -0,0 +1,146 @@ +package selfcheck + +import ( + "decentralized-api/broker" + "sync" + + "github.com/productscience/inference/x/inference/types" +) + +// MockChainBridge implements broker.BrokerChainBridge with hardcoded +// responses that describe a single-participant, single-node, single-model +// network. It is the foundation for running the real broker in +// isolation during selfcheck — no chain RPC calls leave the process. +// +// All methods are safe for concurrent use. +type MockChainBridge struct { + // ParticipantAddress is "self" — the participant address treated + // as active in epoch group data. Must match what NewBroker is + // constructed with via participant.CurrenParticipantInfo. + ParticipantAddress string + + // ModelId is the single governance model used in the synthetic + // epoch group data and reported by GetGovernanceModels. + ModelId string + + // NodeId is the MLnode id assigned to ParticipantAddress in the + // epoch group data. The selfcheck registers this id into the + // broker so EpochMLNodes lookups succeed. + NodeId string + + // EpochIndex is the epoch index reported by epoch group responses. + EpochIndex uint64 + + // SubmittedDiffs records every SubmitHardwareDiff call so the + // Evaluator can assert the broker reported its hardware on startup. + mu sync.Mutex + SubmittedDiffs []*types.MsgSubmitHardwareDiff +} + +var _ broker.BrokerChainBridge = (*MockChainBridge)(nil) + +func (m *MockChainBridge) GetHardwareNodes() (*types.QueryHardwareNodesResponse, error) { + return &types.QueryHardwareNodesResponse{ + Nodes: &types.HardwareNodes{HardwareNodes: nil}, + }, nil +} + +func (m *MockChainBridge) SubmitHardwareDiff(diff *types.MsgSubmitHardwareDiff) error { + m.mu.Lock() + defer m.mu.Unlock() + m.SubmittedDiffs = append(m.SubmittedDiffs, diff) + return nil +} + +func (m *MockChainBridge) GetBlockHash(height int64) (string, error) { + return "selfcheck-fake-hash", nil +} + +func (m *MockChainBridge) GetGovernanceModels() (*types.QueryModelsAllResponse, error) { + return &types.QueryModelsAllResponse{ + Model: []types.Model{{Id: m.ModelId}}, + }, nil +} + +// GetCurrentEpochGroupData returns a parent epoch group that points at +// the single model subgroup. TotalWeight is non-zero so the v0.2.13 +// broker considers the epoch populated. +func (m *MockChainBridge) GetCurrentEpochGroupData() (*types.QueryCurrentEpochGroupDataResponse, error) { + return &types.QueryCurrentEpochGroupDataResponse{ + EpochGroupData: types.EpochGroupData{ + EpochIndex: m.EpochIndex, + SubGroupModels: []string{m.ModelId}, + TotalWeight: 1, + }, + }, nil +} + +// GetEpochGroupDataByModelId returns: +// - The parent group (modelId="") with the subgroup pointer. +// - The model-specific subgroup with one ValidationWeights entry +// assigning ParticipantAddress's NodeId to the model. +func (m *MockChainBridge) GetEpochGroupDataByModelId(epochIndex uint64, modelId string) (*types.QueryGetEpochGroupDataResponse, error) { + if modelId == "" { + return &types.QueryGetEpochGroupDataResponse{ + EpochGroupData: types.EpochGroupData{ + EpochIndex: epochIndex, + SubGroupModels: []string{m.ModelId}, + TotalWeight: 1, + }, + }, nil + } + return &types.QueryGetEpochGroupDataResponse{ + EpochGroupData: types.EpochGroupData{ + EpochIndex: epochIndex, + ModelSnapshot: &types.Model{Id: modelId}, + ValidationWeights: []*types.ValidationWeight{ + { + MemberAddress: m.ParticipantAddress, + Weight: 1, + MlNodes: []*types.MLNodeInfo{ + {NodeId: m.NodeId}, + }, + }, + }, + }, + }, nil +} + +// GetPreservedNodesSnapshot returns an empty snapshot — selfcheck does +// not exercise preserved-node restoration. Added for v0.2.13 broker +// interface compatibility. +func (m *MockChainBridge) GetPreservedNodesSnapshot() (*types.QueryPreservedNodesSnapshotResponse, error) { + return &types.QueryPreservedNodesSnapshotResponse{}, nil +} + +func (m *MockChainBridge) GetParams() (*types.QueryParamsResponse, error) { + return &types.QueryParamsResponse{ + Params: types.Params{ + EpochParams: m.epochParams(), + }, + }, nil +} + +// epochParams produces a deliberately short PoC cycle (a handful of +// blocks per phase) so the EventDriver can step through a full epoch +// in under a second of simulated time. +func (m *MockChainBridge) epochParams() *types.EpochParams { + return &types.EpochParams{ + EpochLength: 100, + PocStageDuration: 10, + PocExchangeDuration: 5, + PocValidationDelay: 1, + PocValidationDuration: 10, + SetNewValidatorsDelay: 1, + } +} + +// SubmittedDiffsSnapshot returns a snapshot of recorded diffs so the +// Evaluator can inspect them without racing the broker. +func (m *MockChainBridge) SubmittedDiffsSnapshot() []*types.MsgSubmitHardwareDiff { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]*types.MsgSubmitHardwareDiff, len(m.SubmittedDiffs)) + copy(out, m.SubmittedDiffs) + return out +} diff --git a/decentralized-api/selfcheck/run.go b/decentralized-api/selfcheck/run.go new file mode 100644 index 0000000000..55839f3702 --- /dev/null +++ b/decentralized-api/selfcheck/run.go @@ -0,0 +1,146 @@ +// Package selfcheck provides a standalone mode for the api binary +// that exercises the broker against a mocked chain to verify the +// PoC lifecycle wiring is intact, without touching production code +// paths. It is the implementation of the approach suggested by +// reviewers on PR #866: keep the broker untouched, drive it with +// synthetic events, and observe outcomes from a separate evaluator. +// +// See proposals/onboarding-clarity-v1/README.md for product context. +package selfcheck + +import ( + "context" + "decentralized-api/apiconfig" + "decentralized-api/broker" + "decentralized-api/chainphase" + "decentralized-api/mlnodeclient" + "fmt" + "os" + "time" + + "github.com/productscience/inference/x/inference/types" +) + +const ( + selfParticipantAddress = "selfcheck-participant" + selfNodeId = "selfcheck-node-1" + selfModelId = "selfcheck-model" +) + +// Run executes the selfcheck and returns a Report. The Report's Pass +// field is the overall outcome. An error is only returned if setup +// itself failed (e.g. broker could not be constructed); per-stage +// assertion failures are surfaced via Report.Pass=false. +// +// Run blocks until completion (a few seconds at most) or until ctx is +// cancelled. +func Run(ctx context.Context) (Report, error) { + // Disable the production "enforced model id" gate — selfcheck uses + // a synthetic model id and otherwise registration would be + // rejected. Tests in broker_test.go set this same env var. + if os.Getenv("ENFORCED_MODEL_ID") == "" { + os.Setenv("ENFORCED_MODEL_ID", "disabled") + } + + bridge := &MockChainBridge{ + ParticipantAddress: selfParticipantAddress, + ModelId: selfModelId, + NodeId: selfNodeId, + EpochIndex: 1, + } + + phaseParams, err := bridge.GetParams() + if err != nil { + return Report{}, fmt.Errorf("mock GetParams: %w", err) + } + + // Anchor the synthetic epoch so phase math is consistent. + const pocStart int64 = 1000 + epoch := &types.Epoch{Index: 1, PocStartBlockHeight: pocStart} + phaseTracker := &chainphase.ChainPhaseTracker{} + phaseTracker.UpdateEpochParams(*phaseParams.Params.EpochParams) + + // Configure a single fake node in the in-memory config manager so + // the broker reads it as a known node. We use a real + // ConfigManager wired to a tmpfile so SetNodes round-trips cleanly. + cm, cleanup, err := newInMemoryConfig() + if err != nil { + return Report{}, fmt.Errorf("newInMemoryConfig: %w", err) + } + defer cleanup() + if err := cm.SetNodes([]apiconfig.InferenceNodeConfig{{ + Id: selfNodeId, + Host: "127.0.0.1", + InferencePort: 18080, + PoCPort: 18080, + MaxConcurrent: 1, + Models: map[string]apiconfig.ModelConfig{selfModelId: {}}, + }}); err != nil { + return Report{}, fmt.Errorf("SetNodes: %w", err) + } + + mockFactory := mlnodeclient.NewMockClientFactory() + participantInfo := &staticParticipant{addr: selfParticipantAddress} + nodeBroker := broker.NewBroker( + bridge, + phaseTracker, + participantInfo, + "http://selfcheck-callback", + mockFactory, + cm, + ) + + // Load the configured node into the broker. + for _, n := range cm.GetNodes() { + ch := nodeBroker.LoadNodeToBroker(&n) + if ch != nil { + <-ch + } + } + + driver := &EventDriver{ + Bridge: bridge, + PhaseTracker: phaseTracker, + Broker: nodeBroker, + Epoch: epoch, + EpochParams: phaseParams.Params.EpochParams, + } + driver.PushBlock(pocStart - 50) // Inference phase, well before PoC + + if err := driver.RefreshEpochData(); err != nil { + return Report{}, fmt.Errorf("RefreshEpochData: %w", err) + } + + // Give the broker's goroutines a beat to process queued commands. + if err := waitOrCtx(ctx, 200*time.Millisecond); err != nil { + return Report{}, err + } + + // nodeSyncWorker runs on a 60s ticker; trigger immediately so + // AssertHardwareDiffSubmitted has something to observe within + // selfcheck's short window. + _ = nodeBroker.QueueMessage(broker.NewSyncNodesCommand()) + + ev := NewEvaluator(nodeBroker, bridge, selfNodeId) + registered := ev.AssertNodeRegistered() + epochPopulated := ev.AssertEpochModelsPopulated() + hwSubmitted := ev.AssertHardwareDiffSubmitted() + + return ev.Combine(registered, epochPopulated, hwSubmitted), nil +} + +func waitOrCtx(ctx context.Context, d time.Duration) error { + select { + case <-time.After(d): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// staticParticipant is a CurrenParticipantInfo whose values never +// change — enough for selfcheck where the participant is hardcoded. +type staticParticipant struct{ addr string } + +func (p *staticParticipant) GetAddress() string { return p.addr } +func (p *staticParticipant) GetPubKey() string { return "selfcheck-pubkey" } diff --git a/decentralized-api/selfcheck/run_test.go b/decentralized-api/selfcheck/run_test.go new file mode 100644 index 0000000000..29acaaf80a --- /dev/null +++ b/decentralized-api/selfcheck/run_test.go @@ -0,0 +1,32 @@ +package selfcheck + +import ( + "context" + "testing" + "time" +) + +// TestRunPasses asserts that the selfcheck reports PASS on a healthy +// broker driven by the synthetic chain. Doubles as a regression guard +// against changes to broker construction that would break the public +// API the selfcheck depends on. +func TestRunPasses(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + report, err := Run(ctx) + if err != nil { + t.Fatalf("Run setup error: %v", err) + } + if !report.Pass { + t.Fatalf("selfcheck did not pass:\n%s", report.String()) + } + if len(report.Stages) == 0 { + t.Fatal("expected at least one stage in report") + } + for _, s := range report.Stages { + if !s.Pass { + t.Errorf("stage %q failed: %s", s.Name, s.Details) + } + } +} diff --git a/proposals/onboarding-clarity-v1/README.md b/proposals/onboarding-clarity-v1/README.md new file mode 100644 index 0000000000..0000ee3da3 --- /dev/null +++ b/proposals/onboarding-clarity-v1/README.md @@ -0,0 +1,154 @@ +# Onboarding Clarity Enhancement + +## Overview + +This proposal addresses critical user experience issues during node onboarding by implementing clear state reporting, proactive testing, and intelligent waiting period management. The current onboarding process leaves users confused about system status, with unclear error messages and no guidance on expected waiting periods or next steps. + +## Problem Statement + +### Current Onboarding Issues + +**Unclear Waiting States**: When users install and start nodes, there is no clear indication that nothing will happen until the next Proof-of-Compute (PoC) cycle begins. Users see their nodes running but receive no feedback about the expected waiting period or what will happen next. + +**Confusing Error Messages**: The API node shows "there is no model for ml node" messages even when the participant is not yet active, creating confusion about whether the setup is correct. + +**Lack of Proactive Testing**: New MLnodes are not tested before participating in PoC, leading to failures during critical consensus periods that could have been detected earlier. + +**Poor Restart Handling**: When API nodes restart, they don't properly check if participants are part of the active participant set, leading to inconsistent behavior. + +**Insufficient Status Information**: Users cannot determine: +- How long until the next PoC cycle +- Whether their MLnode is properly configured +- If they can safely turn off servers during waiting periods +- When they should be online for participation + +## Proposed Solution + +**Better Status Messages:** +- When MLnode registered, the latest and most visible log should be "waiting for PoC" +- Tell users exactly when next PoC starts +- Tell users when they should bring the server online, and when they can safely turn off servers temporarily +- Show: Info message "Waiting for next PoC cycle (starts in 2h 15m) - you can safely turn off the server and restart it 10 minutes before PoC" + +**Proactive Testing:** +- When a new MLnode is registered (and there's >1 hour until PoC), automatically test it +- Test: model loading, health check, inference request +- Only show "waiting for PoC" if the test passes +- If test fails, show clear error with specific problem + +**Better Restart Handling:** +- When API node restarts, check if participant is actually in active set +- Don't show confusing error messages if participant isn't active yet +- Instead of: Error "there is no model for ml node" +- Show: Info message "Participant not yet active - model assignment pending (normal for new participants)" + +### Enhanced State Management System + +The proposal introduces a comprehensive state management system that provides clear feedback at every stage of the onboarding and participation lifecycle. + +#### New MLnode States + +**Enhanced State Enumeration**: +- `WAITING_FOR_POC` - MLnode is configured and waiting for next PoC cycle +- `TESTING` - MLnode is undergoing pre-PoC validation testing +- `TEST_FAILED` - MLnode failed validation testing + +**Timing Guidance Through Messages**: +The same `WAITING_FOR_POC` state provides different user messages based on timing: +- "Waiting for next PoC cycle (starts in 2h 15m) - you can safely turn off the server and restart it 10 minutes before PoC" +- "PoC starting soon (in 8 minutes) - MLnode must be online now" + +#### New API Node Participant States + +**Participant Status Tracking**: +- `INACTIVE_WAITING` - Participant registered but not yet in active set +- `ACTIVE_PARTICIPATING` - Participant is in active set and participating in current epoch + +### Proactive MLnode Testing System + +#### Pre-PoC Validation Flow + +**Testing Trigger Conditions**: +- New MLnode registered OR configuration changes detected, when more than 1 hour until next PoC +- Manual testing request through admin interface + +**Testing Process**: +1. **Model Loading Test**: MLnode switches to "testing" state and performs similar operations as in "inference" state - load configured models and verify successful loading +2. **Health Check**: Perform inference health check to ensure model is functional +3. **Response Validation**: Send test inference request and validate response +4. **Performance Baseline**: Record loading time and response time metrics + +**Test Result Actions**: +- **Success**: Switch MLnode to `WAITING_FOR_POC` state +- **Failure**: Switch to `TEST_FAILED` state with detailed error reporting in both MLnode and API node logs + +### Intelligent Timing System + +#### PoC Schedule Awareness + +**Timing Calculations**: +- Calculate time until next PoC cycle using epoch parameters +- Determine safe offline windows (more than 10 minutes until PoC) +- Provide countdown timers for user interfaces +- Alert users when they should be online + +**Existing Epoch Structure Integration**: +- Use existing `Epoch` structure with PoC start block height and upcoming epoch information +- Leverage existing `EpochParams` with `PocStageDuration`, `PocValidationDuration`, and other timing parameters +- Utilize current `chainphase.EpochState` and block height tracking for accurate PoC timing calculations + +### Enhanced Logging and Status Reporting + +#### User-Friendly Status Messages + +**Clear State Communications**: +- "Waiting for next PoC cycle (starts in 2h 15m) - you can safely turn off the server and restart it 10 minutes before PoC" +- "Testing MLnode configuration - model loading in progress" +- "MLnode test failed: model 'Qwen/Qwen2.5-7B-Instruct' could not be loaded" +- "PoC starting soon (in 8 minutes) - MLnode must be online now" + +**Contextual Error Messages**: +- Suppress "no model for ml node" messages when participant is inactive +- Show clear explanations: "Participant not yet active - model assignment will occur after joining active set" +- Provide actionable guidance: "MLnode will be tested automatically when there is more than 1 hour until next PoC" + +#### Enhanced Logging Categories + +**Extend Existing Logging System**: +- Enhance existing `types.Nodes` logging with onboarding state transitions +- Add testing-specific logs within existing `types.Nodes` category +- Use existing `types.Participants` for participant status changes +- Integrate timing guidance into existing log categories rather than creating new ones + +### Implementation Architecture + +#### API Node Enhancements + +**New Components for Admin Server** (`decentralized-api/internal/server/admin/`): +- `OnboardingStateManager` - Centralized state tracking for onboarding process +- `MLnodeTestingOrchestrator` - Manages pre-PoC testing workflows +- `TimingCalculator` - Computes PoC schedules and safe offline windows +- `StatusReporter` - Generates user-friendly status messages + +**MLnode Server** (`decentralized-api/internal/server/mlnode/server.go`): +- No changes needed - existing PoC batch endpoints remain the same + +#### Integration Points + +**Broker Integration**: +- Enhance `NodeState` structure with new onboarding-specific fields +- Modify `RegisterNode` command to trigger testing when appropriate +- Update status query results to include timing information + +**Chain Integration**: +- Query active participant status during startup +- Monitor epoch transitions for timing calculations +- Track participant weight changes for status updates + +### Conclusion + +This proposal addresses critical gaps in the Gonka network's onboarding experience by providing clear state communication, proactive testing, and intelligent timing guidance. The implementation focuses on user experience improvements while maintaining system reliability and security. + +The modular architecture allows for incremental deployment and easy maintenance, while the comprehensive testing approach ensures that configuration issues are caught early rather than during critical consensus periods. This results in a more reliable network and significantly improved user experience for node operators. + +The enhanced status reporting and timing guidance eliminate confusion about waiting periods and provide clear actionable information, transforming the onboarding process from a frustrating guessing game into a transparent, guided experience.