diff --git a/docs/admin/administration.md b/docs/admin/administration.md index 7d1d85db8d9..78c5d316b7f 100644 --- a/docs/admin/administration.md +++ b/docs/admin/administration.md @@ -49,6 +49,8 @@ severity, message, description, and cause. | engine\_died| STATE\_CHANGE| ERROR| DAOS engine exited exited unexpectedly: | Indicates engine instance unexpectedly. describes the exit state returned from exited daos\_engine process.| N/A | | engine\_asserted| STATE\_CHANGE| ERROR| TBD| Indicates engine instance threw a runtime assertion, causing a crash. | An unexpected internal state resulted in assert failure. | | engine\_clock\_drift| INFO\_ONLY | ERROR| clock drift detected| Indicates CART comms layer has detected clock skew between engines.| NTP may not be syncing clocks across DAOS system. | +| engine\_self\_terminated| INFO\_ONLY| NOTICE| excluded rank self terminated detected| Indicates that a DAOS engine rank has performed a self-termination due to having been excluded from the system's group map. The rank is automatically restarted by the control plane with rate-limiting (default: 5 minute minimum delay between restarts per rank) to prevent restart storms. | An engine was found to be in a transient non-functional state and excluded from the group map. The control plane monitors for this event and automatically restarts the affected engine so it can rejoin the system. Restarts are rate-limited per rank using the `engine_auto_restart_min_delay` configuration parameter. | +| engine\_join\_failed| INFO\_ONLY| ERROR | DAOS engine (rank ) was not allowed to join the system | Join operation failed for the given engine instance ID and rank (if assigned). | Reason should be provided in the extended info field of the event data. | | pool\_corruption\_detected| INFO\_ONLY| ERROR | Data corruption detected| Indicates a corruption in pool data has been detected. The event fields will contain pool and container UUIDs. | A corruption was found by the checksum scrubber. | | pool\_rebuild\_started| INFO\_ONLY| NOTICE | Pool rebuild started.| Indicates a pool rebuild has started. The event data field contains pool map version and pool operation identifier. | When a pool rank becomes unavailable a rebuild will be triggered. | | pool\_rebuild\_finished| INFO\_ONLY| NOTICE| Pool rebuild finished.| Indicates a pool rebuild has finished successfully. The event data field includes the pool map version and pool operation identifier. | N/A| @@ -69,7 +71,6 @@ severity, message, description, and cause. | device\_plugged| INFO\_ONLY| NOTICE| Detected hot plugged device: | Indicates device was physically inserted into host. | NVMe SSD physically added to host. | | device\_replace| INFO\_ONLY| NOTICE or ERROR| Replaced device: with device: [failed: ] | Indicates that a faulty device was replaced with a new device and if the operation failed. The old and new device IDs as well as any non-zero return code are specified in the event data. | Device was replaced using DMG nvme replace command. | | system\_fabric\_provider\_changed| INFO\_ONLY| NOTICE| System fabric provider has changed: -> | Indicates that the system-wide fabric provider has been updated. No other specific information is included in event data.| A system-wide fabric provider change has been intentionally applied to all joined ranks.| -| engine\_join\_failed| INFO\_ONLY| ERROR | DAOS engine (rank ) was not allowed to join the system | Join operation failed for the given engine instance ID and rank (if assigned). | Reason should be provided in the extended info field of the event data. | | device\_link\_speed\_changed| INFO\_ONLY| NOTICE or WARNING| NVMe PCIe device at port-: link speed changed to (max )| Indicates that an NVMe device link speed has changed. The negotiated and maximum device link speeds are indicated in the event message field and the severity is set to warning if the negotiated speed is not at maximum capability (and notice level severity if at maximum). No other specific information is included in the event data.| Either device link speed was previously downgraded and has returned to maximum or link speed has downgraded to a value that is less than its maximum capability.| | device\_link\_width\_changed| INFO\_ONLY| NOTICE or WARNING| NVMe PCIe device at port-: link width changed to (max )| Indicates that an NVMe device link width has changed. The negotiated and maximum device link widths are indicated in the event message field and the severity is set to warning if the negotiated width is not at maximum capability (and notice level severity if at maximum). No other specific information is included in the event data.| Either device link width was previously downgraded and has returned to maximum or link width has downgraded to a value that is less than its maximum capability.| | device\_led\_set| INFO\_ONLY| NOTICE| LED on device set to state | Indicates that the LED state has been changed on a device. Device identifier and LED state are specified in the event message.| LED control command was issued to change device LED state for visual identification or fault indication.| @@ -1007,6 +1008,94 @@ specified on the command line: If the ranks were excluded from pools (e.g., unclean shutdown), they will need to be reintegrated. Please see the pool operation section for more information. +### Engine Auto-Restart + +DAOS automatically restarts engines that self-terminate after being excluded from +the system. This feature improves system availability by recovering from transient +failures without administrator intervention. + +#### How It Works + +When an engine is excluded (e.g., due to network issues detected by SWIM), the +engine detects the exclusion and performs a self-termination. The control plane +monitors for these events and automatically restarts the affected engine after +clearing the exclusion state, allowing it to rejoin the system. + +The automatic restart includes rate-limiting to prevent restart storms. By default, +an engine must wait 5 minutes between automatic restarts. + +#### Configuration + +Control auto-restart behavior in `daos_server.yml`: + +```yaml +# Disable automatic restart (default: enabled) +disable_engine_auto_restart: false + +# Minimum delay between automatic restarts per rank (default: 300 seconds) +engine_auto_restart_min_delay: 300 +``` + +#### Manual Operations + +Manual `dmg system stop` and `dmg system start` operations are never affected by +the rate-limiting mechanism. Administrators can always immediately stop and start +ranks regardless of recent automatic restart activity. + +```bash +# Manual operations always work immediately +$ dmg system stop --ranks=0,1,2 +$ dmg system start --ranks=0,1,2 +``` + +When you manually stop or start ranks, the restart history for those ranks is +automatically cleared, ensuring no delays from previous automatic restarts. + +#### Monitoring + +The `engine_self_terminated` RAS event is logged when an engine self-terminates +and triggers an automatic restart: + +``` +&&& RAS EVENT id: [engine_self_terminated] ... msg: [excluded rank self terminated detected] +``` + +Use `dmg system query` to check rank status and incarnation numbers. The +incarnation number increments each time a rank restarts, helping track restart +events: + +```bash +$ dmg system query --ranks=0 +Rank UUID Control Address Fault Domain State Reason Incarnation +---- ---- --------------- ------------- ----- ------ ----------- +0 12345678-1234-1234-1234-123456789012 10.0.0.1:10001 /node1 Joined 3 +``` + +#### Best Practices + +- **Leave enabled**: Automatic restart improves availability for transient failures +- **Adjust timing**: For frequent exclusions, consider increasing `engine_auto_restart_min_delay` +- **Monitor events**: Watch for repeated `engine_self_terminated` events indicating persistent issues +- **Manual control**: Use `dmg system stop/start` for maintenance without worrying about delays + +#### Troubleshooting + +**Problem**: Rank keeps self-terminating and restarting + +**Solution**: Investigate root cause: +1. Check network connectivity (SWIM may be detecting real failures) +2. Review engine logs for errors +3. Verify hardware health +4. Consider disabling auto-restart temporarily for investigation + +**Problem**: Need immediate restart but recently auto-restarted + +**Solution**: Use manual operations (not affected by rate-limiting): +```bash +$ dmg system stop --ranks=X +$ dmg system start --ranks=X +``` + ### Storage Reformat To reformat the system after a controlled shutdown, run the command: @@ -1052,15 +1141,15 @@ the storage server has not changed the old rank can be "reused" by formatting us An examples workflow would be: -- `daos_server` is running and PMem NVDIMM fails causing an engine to enter excluded state. -- `daos_server` is stopped, storage server powered down, faulty PMem NVDIMM is replaced. -- After powering up storage server, `daos_server scm prepare` command is used to repair PMem. -- Storage server is rebooted after running `daos_server scm prepare` and command is run again. -- Now PMem is intact, clear with `wipefs -a /dev/pmemX` where "X" refers to the repaired PMem ID. -- `daos_server` can be started again. On start-up repaired engine prompts for "SCM format required". -- Run `dmg storage format --replace` to rejoin with existing rank (if --replace isn't used, a new - rank will be created). -- Formatted engine will join using the existing (old) rank which is mapped to the engine's hardware. +1. `daos_server` is running and PMem NVDIMM fails causing an engine to enter excluded state. +2. `daos_server` is stopped, storage server powered down, faulty PMem NVDIMM is replaced. +3. After powering up storage server, `daos_server scm prepare` command is used to repair PMem. +4. Storage server is rebooted after running `daos_server scm prepare` and command is run again. +5. Now PMem is intact, clear with `wipefs -a /dev/pmemX` where "X" refers to the repaired PMem ID. +6. `daos_server` can be started again. On start-up repaired engine prompts for "SCM format required". +7. Run `dmg storage format --replace` to rejoin with existing rank (if --replace isn't used, a new + rank will be created). +8. Formatted engine will join using the existing (old) rank which is mapped to the engine's hardware. !!! note `dmg storage format --replace` can be used to replace a rank in `AdminExcluded` state. The diff --git a/docs/overview/fault.md b/docs/overview/fault.md index 49d7e400e1b..accf6cf3bbe 100644 --- a/docs/overview/fault.md +++ b/docs/overview/fault.md @@ -84,3 +84,68 @@ can now read from the rebuilt object shards. This rebuild process is executed online while applications continue accessing and updating objects. + +### Engine Self-Termination and Automatic Restart + +A DAOS engine may be excluded from the group map because of inactivity +for example. When an engine becomes aware of it's removal from the +group map it will self-terminate to protect data integrity and system stability. + +When an engine self terminates, it raises a `engine_self_terminated` RAS event +(INFO_ONLY, NOTICE severity) containing the rank and incarnation information. +The control plane automatically handles this event by: + +1. Detecting the engine self terminated event through the RAS event system +2. Identifying the engine instance associated with the rank +3. Waiting for the engine process to fully stop +4. Automatically restarting the engine to rejoin the system + +This automatic restart mechanism is implemented in all control servers to ensure +local engine recovery happens regardless of management service leadership state. +The restarted engine will rejoin the system with a new incarnation number and +resume normal operations. + +This self-healing mechanism allows DAOS to automatically recover system +membership state from transient engine failures without administrator +intervention, improving overall system availability. + +#### Rate Limiting + +To prevent restart storms and ensure system stability, automatic engine restarts +are rate-limited on a per-rank basis. By default, a minimum delay of 300 seconds +(5 minutes) is enforced between consecutive restart attempts for the same rank. + +When an engine self-terminates within the minimum delay period, the control plane +schedules a deferred restart that will automatically trigger when the delay expires. +If multiple self-termination events occur for the same rank during the delay period +(this would be unexpected) only the most recent event triggers a deferred restart. +This ensures the engine is restarted exactly once after the delay, regardless of +how many self-termination events occur. + +The rate-limiting interval can be customized by setting the +`engine_auto_restart_min_delay` configuration option (in seconds) in the +daos_server.yml file. For example: + +```yaml +engine_auto_restart_min_delay: 600 # 10 minutes between restarts +``` + +This protection mechanism prevents scenarios where: +- Repeated transient failures cause excessive restart cycling +- A misconfigured engine continuously self-terminates +- Cascading failures overwhelm the control plane with restart requests + +#### Disabling Automatic Restart + +The automatic restart behavior can be completely disabled by setting the +`disable_engine_auto_restart` configuration option to `true` in the +daos_server.yml file: + +```yaml +disable_engine_auto_restart: true +``` + +When auto restart is disabled, engines that self-terminate will not be +automatically restarted by the control plane, requiring manual intervention +to restart the affected engine instances. This setting may be useful for +debugging scenarios or when custom external restart management is preferred. diff --git a/src/control/cmd/dmg/auto_test.go b/src/control/cmd/dmg/auto_test.go index bec414701d2..b5f25558f0b 100644 --- a/src/control/cmd/dmg/auto_test.go +++ b/src/control/cmd/dmg/auto_test.go @@ -606,6 +606,7 @@ mgmt_svc_replicas: - hostX:10002 fault_cb: "" hyperthreads: false +disable_engine_auto_restart: false ` ) diff --git a/src/control/events/ras.go b/src/control/events/ras.go index 902a8559e58..bf7446fe4bd 100644 --- a/src/control/events/ras.go +++ b/src/control/events/ras.go @@ -49,6 +49,7 @@ const ( RASUnknownEvent RASID = C.RAS_UNKNOWN_EVENT RASEngineFormatRequired RASID = C.RAS_ENGINE_FORMAT_REQUIRED // notice RASEngineDied RASID = C.RAS_ENGINE_DIED // error + RASEngineSelfTerminated RASID = C.RAS_ENGINE_SELF_TERMINATED // notice RASPoolRepsUpdate RASID = C.RAS_POOL_REPS_UPDATE // info RASSwimRankAlive RASID = C.RAS_SWIM_RANK_ALIVE // info RASSwimRankDead RASID = C.RAS_SWIM_RANK_DEAD // info diff --git a/src/control/lib/control/event.go b/src/control/lib/control/event.go index d316a4e7cb1..7934b921424 100644 --- a/src/control/lib/control/event.go +++ b/src/control/lib/control/event.go @@ -1,5 +1,6 @@ // // (C) Copyright 2021-2024 Intel Corporation. +// (C) Copyright 2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -170,7 +171,8 @@ func newEventLogger(logBasic logging.Logger, newSyslogger newSysloggerFn) *Event } // NewEventLogger returns an initialized EventLogger capable of writing to the -// supplied logger in addition to syslog. +// supplied logger in addition to syslog. Should only be used in production code, +// use MockEventLogger in unit tests. func NewEventLogger(log logging.Logger) *EventLogger { return newEventLogger(log, syslog.NewLogger) } diff --git a/src/control/lib/control/mocks.go b/src/control/lib/control/mocks.go index 83fd078c8ac..bfb417acb50 100644 --- a/src/control/lib/control/mocks.go +++ b/src/control/lib/control/mocks.go @@ -1,6 +1,6 @@ // // (C) Copyright 2020-2024 Intel Corporation. -// (C) Copyright 2025 Hewlett Packard Enterprise Development LP +// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -30,6 +30,7 @@ import ( "github.com/daos-stack/daos/src/control/common/test" "github.com/daos-stack/daos/src/control/lib/hostlist" "github.com/daos-stack/daos/src/control/lib/ranklist" + "github.com/daos-stack/daos/src/control/logging" "github.com/daos-stack/daos/src/control/server/config" "github.com/daos-stack/daos/src/control/server/engine" "github.com/daos-stack/daos/src/control/server/storage" @@ -945,3 +946,10 @@ func MockHostFabricMap(t *testing.T, scans ...*MockFabricScan) HostFabricMap { return hfm } + +// MockEventLogger returns EventLogger reference that has no syslog handlers registered. +func MockEventLogger(logBasic logging.Logger) *EventLogger { + return &EventLogger{ + log: logBasic, + } +} diff --git a/src/control/server/config/server.go b/src/control/server/config/server.go index 9c7a18041c6..77b9980025b 100644 --- a/src/control/server/config/server.go +++ b/src/control/server/config/server.go @@ -99,7 +99,9 @@ type Server struct { Path string `yaml:"-"` // path to config file // Behavior flags - AutoFormat bool `yaml:"-"` + AutoFormat bool `yaml:"-"` + DisableEngineAutoRestart bool `yaml:"disable_engine_auto_restart"` + EngineAutoRestartMinDelay int `yaml:"engine_auto_restart_min_delay,omitempty"` deprecatedParams `yaml:",inline"` } @@ -362,6 +364,18 @@ func (cfg *Server) WithTelemetryPort(port int) *Server { return cfg } +// WithDisableEngineAutoRestart enables or disables automatic engine restarts on self-termination. +func (cfg *Server) WithDisableEngineAutoRestart(disabled bool) *Server { + cfg.DisableEngineAutoRestart = disabled + return cfg +} + +// WithEngineAutoRestartMinDelay sets minimum time between automatic engine restarts. +func (cfg *Server) WithEngineAutoRestartMinDelay(secs uint) *Server { + cfg.EngineAutoRestartMinDelay = int(secs) + return cfg +} + // DefaultServer creates a new instance of configuration struct // populated with defaults. func DefaultServer() *Server { @@ -837,6 +851,11 @@ func (cfg *Server) Validate(log logging.Logger) (err error) { return FaultConfigSysRsvdZero } + if cfg.EngineAutoRestartMinDelay < 0 { + return errors.Errorf("engine_auto_restart_min_delay must be >= 0 (got %d)", + cfg.EngineAutoRestartMinDelay) + } + // A config without engines is valid when initially discovering hardware prior to adding // per-engine sections with device allocations. if len(cfg.Engines) == 0 { diff --git a/src/control/server/config/server_test.go b/src/control/server/config/server_test.go index 163f7d4de49..b68ccd0265b 100644 --- a/src/control/server/config/server_test.go +++ b/src/control/server/config/server_test.go @@ -267,6 +267,8 @@ func TestServerConfig_Constructed(t *testing.T) { WithSystemRamReserved(5). WithAllowNumaImbalance(true). WithAllowTHP(true). + WithDisableEngineAutoRestart(true). + WithEngineAutoRestartMinDelay(120). WithKernelConfigPath("/host/boot/config") // add engines explicitly to test functionality applied in WithEngines() diff --git a/src/control/server/ctl_check_test.go b/src/control/server/ctl_check_test.go index 1a998e6eca9..ab766988378 100644 --- a/src/control/server/ctl_check_test.go +++ b/src/control/server/ctl_check_test.go @@ -116,7 +116,7 @@ func TestServer_ControlService_CheckEngineRepair(t *testing.T) { t.Fatalf("setup error - wrong type for Engine (%T)", e) } - setupTestEngine(t, srv, uint32(i), rankNums[i]) + setupTestEngine(t, srv, rankNums[i]) drpcCfg := new(mockDrpcClientConfig) drpcCfg.ConnectError = tc.drpcErr diff --git a/src/control/server/ctl_ranks_rpc.go b/src/control/server/ctl_ranks_rpc.go index aad15ece736..5fcb092f3ea 100644 --- a/src/control/server/ctl_ranks_rpc.go +++ b/src/control/server/ctl_ranks_rpc.go @@ -1,6 +1,6 @@ // // (C) Copyright 2020-2024 Intel Corporation. -// (C) Copyright 2025 Hewlett Packard Enterprise Development LP +// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -153,6 +153,21 @@ func (svc *ControlService) memberStateResults(instances []Engine, tgtState syste return results, nil } +// Clear restart history for manually stopped ranks on this server. This prevents rate-limiting +// from interfering with manual operations and vice versa. +func clearRankRestartHistory(mgr *engineRestartManager, instances []Engine) { + ranks := make([]ranklist.Rank, 0, len(instances)) + for _, ei := range instances { + rank, err := ei.GetRank() + if err == nil { + ranks = append(ranks, rank) + } + } + if len(ranks) > 0 { + mgr.clearRankRestartHistory(ranks) + } +} + // StopRanks implements the method defined for the Management Service. // // Stop data-plane instance(s) managed by control-plane identified by unique @@ -206,6 +221,10 @@ func (svc *ControlService) StopRanks(ctx context.Context, req *ctlpb.RanksReq) ( return nil, err } + // clear state history for stopped ranks, instances have already been filtered by + // FilterInstancesByRankSet() to match req.GetRanks() + clearRankRestartHistory(svc.restartMgr, instances) + return resp, nil } @@ -319,6 +338,10 @@ func (svc *ControlService) StartRanks(ctx context.Context, req *ctlpb.RanksReq) return nil, err } + // clear state history for started ranks, instances have already been filtered by + // FilterInstancesByRankSet() to match req.GetRanks() + clearRankRestartHistory(svc.restartMgr, instances) + return resp, nil } diff --git a/src/control/server/ctl_ranks_rpc_test.go b/src/control/server/ctl_ranks_rpc_test.go index 848618cfa21..8057325710f 100644 --- a/src/control/server/ctl_ranks_rpc_test.go +++ b/src/control/server/ctl_ranks_rpc_test.go @@ -76,17 +76,15 @@ func checkUnorderedRankResults(t *testing.T, expResults, gotResults []*sharedpb. } } -func setupTestEngine(t *testing.T, srv *EngineInstance, idx, rank uint32, stopped ...bool) { +func setupTestEngine(t *testing.T, ei *EngineInstance, rank uint32, stopped ...bool) { + ei._superblock.Rank = ranklist.NewRankPtr(rank) + trc := &engine.TestRunnerConfig{} if len(stopped) == 0 || !stopped[0] { trc.Running.SetTrue() - srv.ready.SetTrue() + ei.ready.SetTrue() } - srv.runner = engine.NewTestRunner(trc, engine.MockConfig()) - srv.setIndex(idx) - - srv._superblock.Rank = new(ranklist.Rank) - *srv._superblock.Rank = ranklist.Rank(rank) + ei.runner = engine.NewTestRunner(trc, engine.MockConfig()) } func TestServer_CtlSvc_PrepShutdownRanks(t *testing.T) { @@ -207,7 +205,7 @@ func TestServer_CtlSvc_PrepShutdownRanks(t *testing.T) { continue } - setupTestEngine(t, srv, uint32(i), uint32(i+1), tc.instancesStopped) + setupTestEngine(t, srv, uint32(i+1), tc.instancesStopped) cfg := new(mockDrpcClientConfig) if tc.drpcRet != nil { diff --git a/src/control/server/ctl_svc.go b/src/control/server/ctl_svc.go index a70348f8854..9417723fafe 100644 --- a/src/control/server/ctl_svc.go +++ b/src/control/server/ctl_svc.go @@ -1,5 +1,6 @@ // // (C) Copyright 2018-2024 Intel Corporation. +// (C) Copyright 2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -19,10 +20,11 @@ import ( type ControlService struct { ctlpb.UnimplementedCtlSvcServer StorageControlService - harness *EngineHarness - srvCfg *config.Server - events *events.PubSub - fabric *hardware.FabricScanner + harness *EngineHarness + srvCfg *config.Server + events *events.PubSub + fabric *hardware.FabricScanner + restartMgr *engineRestartManager } // NewControlService returns ControlService to be used as gRPC control service diff --git a/src/control/server/instance_restart.go b/src/control/server/instance_restart.go new file mode 100644 index 00000000000..a47605fdc0d --- /dev/null +++ b/src/control/server/instance_restart.go @@ -0,0 +1,246 @@ +// +// (C) Copyright 2026 Hewlett Packard Enterprise Development LP +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package server + +import ( + "context" + "sync" + "time" + + "github.com/daos-stack/daos/src/control/lib/ranklist" + "github.com/daos-stack/daos/src/control/logging" + "github.com/daos-stack/daos/src/control/server/config" +) + +const ( + // engineRestartMaxQueueSz is the maximum number of engine restart requests to be held in a + // channel at any one time. Additional requests will be dropped during exclusion storm. + engineRestartMaxQueueSz = 100 + + // defaultEngineAutoRestartMinDelay is the minimum number of seconds between automatic engine + // restarts that are triggered when engine_self_terminated RAS events are received. + defaultEngineAutoRestartMinDelay = 300 // 5 minutes +) + +// engineRestartRequest represents a request to restart an engine instance. +type engineRestartRequest struct { + rank ranklist.Rank + instance Engine +} + +// engineRestartManager manages engine restart requests with rate limiting. +type engineRestartManager struct { + log logging.Logger + cfg *config.Server + requestChan chan engineRestartRequest + stopChan chan struct{} + lastRestart map[ranklist.Rank]time.Time + pendingRestart map[ranklist.Rank]*time.Timer + mu sync.RWMutex +} + +// getMinDelay returns the configured minimum delay between restarts. +func (mgr *engineRestartManager) getMinDelay() time.Duration { + minDelay := defaultEngineAutoRestartMinDelay + if mgr.cfg.EngineAutoRestartMinDelay > 0 { + minDelay = mgr.cfg.EngineAutoRestartMinDelay + } + return time.Duration(minDelay) * time.Second +} + +// canRestartNow checks if a rank can be restarted immediately. +// Returns true if restart can proceed, false and delay duration if rate limited. +func (mgr *engineRestartManager) canRestartNow(rank ranklist.Rank) (bool, time.Duration) { + mgr.mu.RLock() + defer mgr.mu.RUnlock() + + lastRestart, hasRestarted := mgr.lastRestart[rank] + if !hasRestarted { + return true, 0 + } + + minDelay := mgr.getMinDelay() + elapsed := time.Since(lastRestart) + if elapsed >= minDelay { + return true, 0 + } + + remaining := minDelay - elapsed + return false, remaining +} + +// recordRestartTime records when a rank was restarted. +func (mgr *engineRestartManager) recordRestartTime(rank ranklist.Rank) { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + mgr.lastRestart[rank] = time.Now() + mgr.log.Debugf("last restart recorded") +} + +// clearPendingRestart removes a pending restart timer for a rank. +func (mgr *engineRestartManager) clearPendingRestart(rank ranklist.Rank) { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + delete(mgr.pendingRestart, rank) +} + +// setPendingRestart stores a pending restart timer for a rank. +func (mgr *engineRestartManager) setPendingRestart(rank ranklist.Rank, timer *time.Timer) { + mgr.mu.Lock() + defer mgr.mu.Unlock() + + // Cancel any existing timer + if existingTimer, exists := mgr.pendingRestart[rank]; exists { + existingTimer.Stop() + mgr.log.Debugf("cancelled existing pending restart timer for rank %d", rank) + } + + mgr.pendingRestart[rank] = timer +} + +// waitForEngineStopped polls until the engine instance is stopped. +func waitForEngineStopped(ctx context.Context, instances []Engine) error { + pollFn := func(e Engine) bool { return !e.IsStarted() } + return pollInstanceState(ctx, instances, pollFn) +} + +// performRestart executes the restart after waiting for the engine to stop. +func (mgr *engineRestartManager) performRestart(ctx context.Context, rank ranklist.Rank, instance Engine) { + defer mgr.clearPendingRestart(rank) + + // Wait for engine to stop + instances := []Engine{instance} + if err := waitForEngineStopped(ctx, instances); err != nil { + mgr.log.Errorf("rank %d did not stop before restart: %s", rank, err) + return + } + + mgr.log.Noticef("restart manager is restarting rank %d", rank) + instance.requestStart(ctx) + + // Record restart time and clear pending state on exit (deferred) + mgr.recordRestartTime(rank) + mgr.log.Debugf("recording rank %d", rank) +} + +// processRestartRequest handles a single restart request with rate limiting. +func (mgr *engineRestartManager) processRestartRequest(ctx context.Context, req engineRestartRequest) { + rank := req.rank + instance := req.instance + + mgr.log.Debugf("processing restart request for rank %d", rank) + + canRestart, delay := mgr.canRestartNow(rank) + if !canRestart { + mgr.log.Noticef("rank %d restart rate limited: will restart in %s", + rank, delay.Round(time.Second)) + + // Schedule deferred restart + timer := time.AfterFunc(delay, func() { + mgr.log.Noticef("deferred restart triggered for rank %d after rate-limit delay", rank) + mgr.performRestart(ctx, rank, instance) + }) + + // Overwrite any existing pending restart + mgr.setPendingRestart(rank, timer) + return + } + + // Can restart immediately + mgr.performRestart(ctx, rank, instance) +} + +// requestRestart submits a restart request to the manager. +func (mgr *engineRestartManager) requestRestart(rank ranklist.Rank, instance Engine) { + req := engineRestartRequest{ + rank: rank, + instance: instance, + } + + select { + case mgr.requestChan <- req: + mgr.log.Debugf("restart request queued for rank %d", rank) + default: + mgr.log.Errorf("restart request channel full, dropping request for rank %d", rank) + } +} + +// start begins processing restart requests. Function to be called once on server start-up. +func (mgr *engineRestartManager) start(ctx context.Context) { + mgr.log.Debug("engine restart manager started") + go func() { + for { + select { + case <-ctx.Done(): + mgr.log.Debug("engine restart manager context cancelled") + return + case <-mgr.stopChan: + mgr.log.Debug("engine restart manager stopped") + return + case req := <-mgr.requestChan: + mgr.processRestartRequest(ctx, req) + } + } + }() +} + +// clearRankRestartHistory clears the restart history for specific ranks. +// This is called when ranks are manually stopped or started to ensure +// manual operations don't interfere with automatic restart rate limiting. +func (mgr *engineRestartManager) clearRankRestartHistory(ranks []ranklist.Rank) { + if mgr == nil || len(ranks) == 0 { + return + } + + mgr.mu.Lock() + defer mgr.mu.Unlock() + + for _, rank := range ranks { + // Cancel any pending restart for this rank + if timer, exists := mgr.pendingRestart[rank]; exists { + timer.Stop() + delete(mgr.pendingRestart, rank) + mgr.log.Debugf("cancelled pending restart for rank %d during manual operation", rank) + } + + // Clear restart history for this rank + if _, exists := mgr.lastRestart[rank]; exists { + delete(mgr.lastRestart, rank) + mgr.log.Debugf("cleared restart history for rank %d (manual operation)", rank) + } + } +} + +// stop shuts down the restart manager. Function to be called once on server shutdown. +func (mgr *engineRestartManager) stop() { + mgr.log.Debug("stopping engine restart manager") + mgr.mu.Lock() + defer mgr.mu.Unlock() + + // Cancel all pending restart timers + for rank, timer := range mgr.pendingRestart { + timer.Stop() + mgr.log.Debugf("cancelled pending restart for rank %d", rank) + } + mgr.pendingRestart = make(map[ranklist.Rank]*time.Timer) + + close(mgr.stopChan) +} + +// newEngineRestartManager creates a new restart manager. +func newEngineRestartManager(log logging.Logger, cfg *config.Server) *engineRestartManager { + return &engineRestartManager{ + log: log, + cfg: cfg, + requestChan: make(chan engineRestartRequest, engineRestartMaxQueueSz), + stopChan: make(chan struct{}), + lastRestart: make(map[ranklist.Rank]time.Time), + pendingRestart: make(map[ranklist.Rank]*time.Timer), + } +} diff --git a/src/control/server/instance_restart_test.go b/src/control/server/instance_restart_test.go new file mode 100644 index 00000000000..689bacec53a --- /dev/null +++ b/src/control/server/instance_restart_test.go @@ -0,0 +1,750 @@ +// +// (C) Copyright 2026 Hewlett Packard Enterprise Development LP +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package server + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/lib/ranklist" + "github.com/daos-stack/daos/src/control/logging" + "github.com/daos-stack/daos/src/control/server/config" +) + +// Test helper functions + +func setupTestLogger(t *testing.T) (logging.Logger, *logging.LogBuffer) { + t.Helper() + log, buf := logging.NewTestLogger(t.Name()) + t.Cleanup(func() { + test.ShowBufferOnFailure(t, buf) + }) + + return log, buf +} + +func getTestLogger(t *testing.T, loggers []logging.Logger) logging.Logger { + t.Helper() + var log logging.Logger + + switch len(loggers) { + case 0: + log, _ = setupTestLogger(t) + case 1: + log = loggers[0] + default: + t.Fatal("multiple loggers provided, want one") + } + + return log +} + +func setupTestManager(t *testing.T, cfg *config.Server, loggers ...logging.Logger) *engineRestartManager { + t.Helper() + log := getTestLogger(t, loggers) + if cfg == nil { + cfg = &config.Server{} + } + + return newEngineRestartManager(log, cfg) +} + +func setupTestHarness(t *testing.T, rankStr string, loggers ...logging.Logger) (*EngineInstance, ranklist.Rank) { + t.Helper() + log := getTestLogger(t, loggers) + harness := NewEngineHarness(log) + setupAddTestEngine(t, log, harness, false) + + instances, err := harness.FilterInstancesByRankSet(rankStr) + if err != nil || len(instances) == 0 { + t.Fatalf("failed to get instance: %v", err) + } + + rank, err := ranklist.ParseRanks(rankStr) + if err != nil || len(rank) != 1 { + t.Fatalf("failed to parse rank: %v", err) + } + + return instances[0].(*EngineInstance), rank[0] +} + +func startInstanceConsumer(ctx context.Context, instance *EngineInstance) { + go func() { + select { + case <-ctx.Done(): + case <-instance.startRequested: + } + }() +} + +func waitForRestartRecorded(ctx context.Context, t *testing.T, mgr *engineRestartManager, rank ranklist.Rank) bool { + recorded := make(chan struct{}) + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + mgr.mu.RLock() + _, exists := mgr.lastRestart[rank] + mgr.mu.RUnlock() + + if exists { + close(recorded) + return + } + } + } + }() + + select { + case <-ctx.Done(): + return false + case <-recorded: + return true + } +} + +func TestServer_EngineRestartManager_GetMinDelay(t *testing.T) { + for name, tc := range map[string]struct { + configDelay int + expDelay time.Duration + }{ + "default delay": { + configDelay: 0, + expDelay: 300 * time.Second, + }, + "custom delay": { + configDelay: 60, + expDelay: 60 * time.Second, + }, + "long delay": { + configDelay: 600, + expDelay: 600 * time.Second, + }, + } { + t.Run(name, func(t *testing.T) { + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: tc.configDelay, + }) + + gotDelay := mgr.getMinDelay() + if gotDelay != tc.expDelay { + t.Errorf("expected delay %s, got %s", tc.expDelay, gotDelay) + } + }) + } +} + +func TestServer_EngineRestartManager_CanRestartNow(t *testing.T) { + for name, tc := range map[string]struct { + lastRestartAge time.Duration + minDelay int + expCanRestart bool + }{ + "no previous restart": { + lastRestartAge: 0, + minDelay: 60, + expCanRestart: true, + }, + "enough time elapsed": { + lastRestartAge: 70 * time.Second, + minDelay: 60, + expCanRestart: true, + }, + "not enough time elapsed": { + lastRestartAge: 50 * time.Second, + minDelay: 60, + expCanRestart: false, + }, + "exactly minimum delay": { + lastRestartAge: 60 * time.Second, + minDelay: 60, + expCanRestart: true, + }, + } { + t.Run(name, func(t *testing.T) { + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: tc.minDelay, + }) + testRank := ranklist.Rank(1) + + // Set last restart time if test case specifies + if tc.lastRestartAge > 0 { + mgr.lastRestart[testRank] = time.Now().Add(-tc.lastRestartAge) + } + + canRestart, remaining := mgr.canRestartNow(testRank) + + if canRestart != tc.expCanRestart { + t.Errorf("expected canRestart=%v, got %v", tc.expCanRestart, + canRestart) + } + + if tc.expCanRestart && remaining != 0 { + t.Errorf("expected no remaining delay when can restart, got %s", + remaining) + } + + if !tc.expCanRestart && remaining <= 0 { + t.Errorf("expected positive remaining delay when cannot restart, "+ + "got %s", remaining) + } + }) + } +} + +func TestServer_EngineRestartManager_RecordRestartTime(t *testing.T) { + mgr := setupTestManager(t, nil) + testRank := ranklist.Rank(1) + + beforeRecord := time.Now() + mgr.recordRestartTime(testRank) + afterRecord := time.Now() + + recordedTime, exists := mgr.lastRestart[testRank] + if !exists { + t.Fatal("restart time not recorded") + } + + if recordedTime.Before(beforeRecord) || recordedTime.After(afterRecord) { + t.Errorf("recorded time %s outside expected range [%s, %s]", + recordedTime, beforeRecord, afterRecord) + } +} + +func TestServer_EngineRestartManager_SetPendingRestart(t *testing.T) { + mgr := setupTestManager(t, nil) + testRank := ranklist.Rank(1) + + // Set initial timer + timer1 := time.NewTimer(10 * time.Second) + mgr.setPendingRestart(testRank, timer1) + + if len(mgr.pendingRestart) != 1 { + t.Fatalf("expected 1 pending restart, got %d", len(mgr.pendingRestart)) + } + + // Set another timer for same rank (should cancel previous) + timer2 := time.NewTimer(5 * time.Second) + mgr.setPendingRestart(testRank, timer2) + + if len(mgr.pendingRestart) != 1 { + t.Fatalf("expected 1 pending restart after replacement, got %d", + len(mgr.pendingRestart)) + } + + if mgr.pendingRestart[testRank] != timer2 { + t.Error("pending restart timer not updated to new timer") + } + + // Cleanup + timer2.Stop() +} + +func TestServer_EngineRestartManager_ClearPendingRestart(t *testing.T) { + mgr := setupTestManager(t, nil) + testRank := ranklist.Rank(1) + + // Set a timer + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + mgr.pendingRestart[testRank] = timer + + // Clear it + mgr.clearPendingRestart(testRank) + + if len(mgr.pendingRestart) != 0 { + t.Errorf("expected no pending restarts after clear, got %d", + len(mgr.pendingRestart)) + } +} + +func TestServer_EngineRestartManager_RequestRestart(t *testing.T) { + mgr := setupTestManager(t, nil) + testRank := ranklist.Rank(1) + + // Create mock instance + mockInstance := &MockInstance{ + cfg: MockInstanceConfig{ + GetRankResp: testRank, + }, + } + + mgr.requestRestart(testRank, mockInstance) + + // Should receive request on channel + select { + case req := <-mgr.requestChan: + if !req.rank.Equals(testRank) { + t.Errorf("expected rank %d, got %d", testRank, req.rank) + } + if req.instance != mockInstance { + t.Error("expected mock instance in request") + } + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for restart request") + } +} + +func TestServer_EngineRestartManager_RequestRestart_ChannelFull(t *testing.T) { + log, buf := setupTestLogger(t) + mgr := setupTestManager(t, nil, log) + testRank := ranklist.Rank(1) + + mockInstance := &MockInstance{ + cfg: MockInstanceConfig{ + GetRankResp: testRank, + }, + } + + // Fill the channel + for i := 0; i < engineRestartMaxQueueSz; i++ { + mgr.requestRestart(ranklist.Rank(i), mockInstance) + } + + // Next request should be dropped + mgr.requestRestart(testRank, mockInstance) + + // Should see error in log + logOutput := buf.String() + if !strings.Contains(logOutput, "channel full") && + !strings.Contains(logOutput, "dropping request") { + t.Error("expected channel full error in log") + } +} + +func TestServer_EngineRestartManager_ProcessRestartRequest_Immediate(t *testing.T) { + ctx := test.Context(t) + instance, testRank := setupTestHarness(t, "1") + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 10, + }) + + startInstanceConsumer(ctx, instance) + + req := engineRestartRequest{ + rank: testRank, + instance: instance, + } + + // Process request (no previous restart, should be immediate) + mgr.processRestartRequest(ctx, req) + + // Verify restart time recorded + mgr.mu.Lock() + _, recorded := mgr.lastRestart[testRank] + mgr.mu.Unlock() + + if !recorded { + t.Error("expected restart time to be recorded") + } + + // Verify no pending restart + mgr.mu.Lock() + _, pending := mgr.pendingRestart[testRank] + mgr.mu.Unlock() + + if pending { + t.Error("expected no pending restart for immediate restart") + } +} + +func TestServer_EngineRestartManager_ProcessRestartRequest_Deferred(t *testing.T) { + log, buf := setupTestLogger(t) + ctx := test.Context(t) + instance, testRank := setupTestHarness(t, "1", log) + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 2, // 2 seconds for fast test + }, log) + + // Record a recent restart + mgr.lastRestart[testRank] = time.Now() + + req := engineRestartRequest{ + rank: testRank, + instance: instance, + } + + // Process request (should be deferred due to rate limiting) + mgr.processRestartRequest(ctx, req) + + // Verify pending restart was set + mgr.mu.RLock() + timer, pending := mgr.pendingRestart[testRank] + mgr.mu.RUnlock() + + if !pending { + t.Fatal("expected pending restart to be set") + } + + // Cleanup + if timer != nil { + timer.Stop() + } + mgr.clearPendingRestart(testRank) + + // Verify log message + logOutput := buf.String() + if !strings.Contains(logOutput, "rate limited") && + !strings.Contains(logOutput, "will restart in") { + t.Error("expected rate limited message in log") + } +} + +func TestServer_EngineRestartManager_Stop(t *testing.T) { + mgr := setupTestManager(t, nil) + + // Add some pending restarts + timer1 := time.NewTimer(10 * time.Second) + timer2 := time.NewTimer(10 * time.Second) + mgr.pendingRestart[ranklist.Rank(1)] = timer1 + mgr.pendingRestart[ranklist.Rank(2)] = timer2 + + // Stop should cancel all timers + mgr.stop() + + if len(mgr.pendingRestart) != 0 { + t.Errorf("expected all pending restarts cleared, got %d", + len(mgr.pendingRestart)) + } + + // Verify stopChan is closed + select { + case <-mgr.stopChan: + // Expected + default: + t.Error("stopChan should be closed") + } +} + +func TestServer_EngineRestartManager_Start_ProcessRequests(t *testing.T) { + ctx, cancel := context.WithTimeout(test.Context(t), 10*time.Second) + defer cancel() + + instance, testRank := setupTestHarness(t, "1") + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 10, + }) + mgr.start(ctx) + defer mgr.stop() + + startInstanceConsumer(ctx, instance) + + // Submit restart request + mgr.requestRestart(testRank, instance) + + // Wait for restart time to be recorded + if !waitForRestartRecorded(ctx, t, mgr, testRank) { + t.Error("expected restart time to be recorded after processing") + } else { + t.Log("restart time recorded successfully") + } +} + +func TestServer_EngineRestartManager_DeferredRestartExecutes(t *testing.T) { + ctx, cancel := context.WithTimeout(test.Context(t), 20*time.Second) + defer cancel() + + instance, testRank := setupTestHarness(t, "1") + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 2, // seconds + }) + + startInstanceConsumer(ctx, instance) + + // Set recent restart time + mgr.lastRestart[testRank] = time.Now() + + req := engineRestartRequest{ + rank: testRank, + instance: instance, + } + + // Process request (should create deferred restart) + mgr.processRestartRequest(ctx, req) + + // Verify timer exists + mgr.mu.RLock() + timer, exists := mgr.pendingRestart[testRank] + mgr.mu.RUnlock() + + if !exists { + t.Fatal("expected pending restart timer to be created") + } + + // Wait for timer to fire (with buffer) + time.Sleep(5 * time.Second) + + // Verify timer was cleaned up + mgr.mu.RLock() + timer, exists = mgr.pendingRestart[testRank] + mgr.mu.RUnlock() + + // Cleanup + if timer != nil { + timer.Stop() + } + + if exists { + t.Error("expected pending restart to be cleared after execution") + } +} + +func TestServer_EngineRestartManager_MultipleRanks(t *testing.T) { + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 10, + }) + + rank1 := ranklist.Rank(1) + rank2 := ranklist.Rank(2) + + // Record restarts for both ranks + mgr.recordRestartTime(rank1) + time.Sleep(10 * time.Millisecond) + mgr.recordRestartTime(rank2) + + // Verify both recorded + mgr.mu.RLock() + time1, exists1 := mgr.lastRestart[rank1] + time2, exists2 := mgr.lastRestart[rank2] + mgr.mu.RUnlock() + + if !exists1 || !exists2 { + t.Fatal("expected both ranks to have restart times recorded") + } + + if !time1.Before(time2) { + t.Error("expected rank1 restart time to be before rank2") + } + + // Verify independent rate limiting + canRestart1, _ := mgr.canRestartNow(rank1) + canRestart2, _ := mgr.canRestartNow(rank2) + + if canRestart1 || canRestart2 { + t.Error("expected both ranks to be rate limited") + } +} + +func TestServer_EngineRestartManager_CancelExistingTimer(t *testing.T) { + log, buf := setupTestLogger(t) + ctx := test.Context(t) + instance, testRank := setupTestHarness(t, "1", log) + mgr := setupTestManager(t, &config.Server{ + EngineAutoRestartMinDelay: 5, + }, log) + + // Set recent restart + mgr.lastRestart[testRank] = time.Now() + + // First deferred request + req1 := engineRestartRequest{ + rank: testRank, + instance: instance, + } + mgr.processRestartRequest(ctx, req1) + + mgr.mu.RLock() + timer1, exists1 := mgr.pendingRestart[testRank] + mgr.mu.RUnlock() + + if !exists1 { + t.Fatal("expected first pending restart to be set") + } + + // Second deferred request (should cancel first) + time.Sleep(100 * time.Millisecond) + req2 := engineRestartRequest{ + rank: testRank, + instance: instance, + } + mgr.processRestartRequest(ctx, req2) + + mgr.mu.RLock() + timer2, exists2 := mgr.pendingRestart[testRank] + mgr.mu.RUnlock() + + if !exists2 { + t.Fatal("expected second pending restart to be set") + } + + if timer1 == timer2 { + t.Error("expected timer to be replaced") + } + + // Verify log shows cancellation + logOutput := buf.String() + if !strings.Contains(logOutput, "cancelled existing pending restart") { + t.Error("expected cancellation message in log") + } + + // Cleanup + if timer2 != nil { + timer2.Stop() + } + mgr.clearPendingRestart(testRank) +} + +func TestServer_NewEngineRestartManager(t *testing.T) { + cfg := &config.Server{ + EngineAutoRestartMinDelay: 42, + } + ctx := test.MustLogContext(t) + log := logging.FromContext(ctx) + mgr := newEngineRestartManager(log, cfg) + + if mgr.log == nil { + t.Error("expected logger to be set") + } + + if mgr.cfg != cfg { + t.Error("expected config to be set") + } + + if mgr.requestChan == nil { + t.Error("expected requestChan to be initialized") + } + + if cap(mgr.requestChan) != engineRestartMaxQueueSz { + t.Errorf("expected channel capacity %d, got %d", + engineRestartMaxQueueSz, cap(mgr.requestChan)) + } + + if mgr.stopChan == nil { + t.Error("expected stopChan to be initialized") + } + + if mgr.lastRestart == nil { + t.Error("expected lastRestart map to be initialized") + } + + if mgr.pendingRestart == nil { + t.Error("expected pendingRestart map to be initialized") + } +} + +func TestServer_EngineRestartManager_ClearRankRestartHistory(t *testing.T) { + for name, tc := range map[string]struct { + setupRanks []ranklist.Rank + clearRanks []ranklist.Rank + expectLogMsgs []string + remainingRanks []ranklist.Rank + }{ + "nil manager": { + setupRanks: []ranklist.Rank{1, 2}, + clearRanks: []ranklist.Rank{1}, + }, + "empty ranks": { + setupRanks: []ranklist.Rank{1, 2}, + clearRanks: []ranklist.Rank{}, + }, + "clear single rank with history": { + setupRanks: []ranklist.Rank{1, 2, 3}, + clearRanks: []ranklist.Rank{2}, + expectLogMsgs: []string{"cleared restart history for rank 2"}, + remainingRanks: []ranklist.Rank{1, 3}, + }, + "clear multiple ranks with history": { + setupRanks: []ranklist.Rank{1, 2, 3, 4}, + clearRanks: []ranklist.Rank{1, 3}, + expectLogMsgs: []string{"cleared restart history for rank 1", "cleared restart history for rank 3"}, + remainingRanks: []ranklist.Rank{2, 4}, + }, + "clear all ranks": { + setupRanks: []ranklist.Rank{1, 2, 3}, + clearRanks: []ranklist.Rank{1, 2, 3}, + expectLogMsgs: []string{"cleared restart history for rank 1", "cleared restart history for rank 2", "cleared restart history for rank 3"}, + remainingRanks: []ranklist.Rank{}, + }, + "clear rank without history": { + setupRanks: []ranklist.Rank{1, 2}, + clearRanks: []ranklist.Rank{5}, + expectLogMsgs: []string{}, + remainingRanks: []ranklist.Rank{1, 2}, + }, + "clear rank with pending restart": { + setupRanks: []ranklist.Rank{1, 2}, + clearRanks: []ranklist.Rank{1}, + expectLogMsgs: []string{"cancelled pending restart for rank 1", "cleared restart history for rank 1"}, + remainingRanks: []ranklist.Rank{2}, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := setupTestLogger(t) + var mgr *engineRestartManager + + if name == "nil manager" { + // Test nil manager doesn't panic + var nilMgr *engineRestartManager + nilMgr.clearRankRestartHistory(tc.clearRanks) + return + } + + mgr = setupTestManager(t, nil, log) + + // Setup restart history for ranks + now := time.Now() + for i, rank := range tc.setupRanks { + mgr.lastRestart[rank] = now.Add(-time.Duration(i) * time.Minute) + } + + // Setup pending restart for rank 1 if testing that case + if name == "clear rank with pending restart" { + timer := time.NewTimer(10 * time.Second) + t.Cleanup(func() { timer.Stop() }) + mgr.pendingRestart[ranklist.Rank(1)] = timer + } + + mgr.clearRankRestartHistory(tc.clearRanks) + + // Verify expected log messages + for _, expectedMsg := range tc.expectLogMsgs { + if !strings.Contains(buf.String(), expectedMsg) { + t.Errorf("expected log message %q not found in: %s", + expectedMsg, buf.String()) + } + } + + // Verify remaining ranks still have history + for _, rank := range tc.remainingRanks { + if _, exists := mgr.lastRestart[rank]; !exists { + t.Errorf("expected rank %d to still have restart history", rank) + } + } + + // Verify cleared ranks don't have history + for _, rank := range tc.clearRanks { + if _, exists := mgr.lastRestart[rank]; exists { + found := false + for _, remaining := range tc.remainingRanks { + if remaining.Equals(rank) { + found = true + break + } + } + if !found { + t.Errorf("expected rank %d to have cleared restart history", rank) + } + } + } + + // Verify pending restart was cleared for rank 1 in specific test + if name == "clear rank with pending restart" { + if _, exists := mgr.pendingRestart[ranklist.Rank(1)]; exists { + t.Error("expected pending restart for rank 1 to be cleared") + } + } + }) + } +} diff --git a/src/control/server/mgmt_svc.go b/src/control/server/mgmt_svc.go index 947f97e97d6..1334e23dd63 100644 --- a/src/control/server/mgmt_svc.go +++ b/src/control/server/mgmt_svc.go @@ -1,6 +1,6 @@ // // (C) Copyright 2018-2024 Intel Corporation. -// (C) Copyright 2025 Hewlett Packard Enterprise Development LP +// (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP // // SPDX-License-Identifier: BSD-2-Clause-Patent // diff --git a/src/control/server/server.go b/src/control/server/server.go index a35b83be964..69fa0703a4d 100644 --- a/src/control/server/server.go +++ b/src/control/server/server.go @@ -163,6 +163,7 @@ type server struct { mgmtSvc *mgmtSvc grpcServer *grpc.Server controlClient *control.Client + restartMgr *engineRestartManager cbLock sync.Mutex onEnginesStarted []func(context.Context) error @@ -189,6 +190,7 @@ func newServer(log logging.Logger, cfg *config.Server, faultDomain *system.Fault runningUser: cu, faultDomain: faultDomain, harness: harness, + restartMgr: newEngineRestartManager(log, cfg), }, nil } @@ -256,6 +258,7 @@ func (srv *server) createServices(ctx context.Context) (err error) { srv.ctlSvc = NewControlService(srv.log, srv.harness, srv.cfg, srv.pubSub, network.DefaultFabricScanner(srv.log)) + srv.ctlSvc.restartMgr = srv.restartMgr srv.mgmtSvc = newMgmtSvc(srv.harness, srv.membership, srv.sysdb, rpcClient, srv.pubSub) if err := srv.mgmtSvc.systemProps.UpdateCompPropVal(daos.SystemPropertyDaosSystem, func() string { @@ -283,6 +286,9 @@ func (srv *server) OnShutdown(fns ...func()) { } func (srv *server) shutdown() { + // Stop the restart manager first + srv.restartMgr.stop() + srv.cbLock.Lock() onShutdownCbs := srv.onShutdown srv.cbLock.Unlock() @@ -406,6 +412,9 @@ func (srv *server) addEngines(ctx context.Context, smi *common.SysMemInfo) error allStarted.Wait() srv.log.Debug("engines have started") + // Start the restart manager + srv.restartMgr.start(ctx) + srv.cbLock.Lock() onEnginesStartedCbs := srv.onEnginesStarted srv.cbLock.Unlock() @@ -466,7 +475,7 @@ func (srv *server) setupGrpc() error { } func (srv *server) registerEvents() { - registerFollowerSubscriptions(srv) + registerSubscriptions(srv) srv.sysdb.OnLeadershipGained( func(ctx context.Context) error { @@ -507,7 +516,7 @@ func (srv *server) registerEvents() { ) srv.sysdb.OnLeadershipLost(func() error { srv.log.Infof("MS leader no longer running on %s", srv.hostname) - registerFollowerSubscriptions(srv) + registerSubscriptions(srv) return nil }) } diff --git a/src/control/server/server_utils.go b/src/control/server/server_utils.go index b9b57481f50..013944148e1 100644 --- a/src/control/server/server_utils.go +++ b/src/control/server/server_utils.go @@ -769,14 +769,76 @@ func registerTelemetryCallbacks(ctx context.Context, srv *server) { }) } -// registerFollowerSubscriptions stops handling received forwarded (in addition -// to local) events and starts forwarding events to the new MS leader. -// Log events on the host that they were raised (and first published) on. -// This is the initial behavior before leadership has been determined. -func registerFollowerSubscriptions(srv *server) { +// Handle local engine self termination and restart engine to rejoin system. +func handleEngineSelfTerminated(ctx context.Context, srv *server, evt *events.RASEvent) error { + srv.log.Tracef("handling engine self termination") + + if evt.IsForwarded() { + return errors.Errorf("unexpected forwarded engine_self_terminated event from %q", + evt.Hostname) + } + if srv.hostname != "" && evt.Hostname != "" && evt.Hostname != srv.hostname { + return errors.Errorf("unexpected non-local engine_self_terminated event from %q", + evt.Hostname) + } + + // Check if automatic restart is disabled + if srv.cfg.DisableEngineAutoRestart { + srv.log.Debugf("automatic engine restart disabled by configuration") + return nil + } + + ts, err := evt.GetTimestamp() + if err != nil { + return errors.Wrapf(err, "bad event timestamp %q", evt.Timestamp) + } + + // Find the engine instance by rank + instances, err := srv.harness.FilterInstancesByRankSet(fmt.Sprintf("%d", evt.Rank)) + if err != nil { + return errors.Wrapf(err, "failed to find instance for rank %d", evt.Rank) + } + + if len(instances) == 0 { + return errors.Errorf("no instance found for rank %d", evt.Rank) + } + if len(instances) > 1 { + return errors.Errorf("multiple instances found for rank %d", evt.Rank) + } + ei := instances[0] + + srv.log.Noticef("%s was notified @ %s of rank %d:%d (instance %d) self terminated", ts, + evt.Hostname, evt.Rank, evt.Incarnation, ei.Index()) + + rank := ranklist.Rank(evt.Rank) + + // Submit restart request to the restart manager + srv.restartMgr.requestRestart(rank, ei) + + return nil +} + +// subscribeEngineSelfTerminated creates a handler for engine self-termination events. +func subscribeEngineSelfTerminated(srv *server) events.Handler { + return events.HandlerFunc(func(ctx context.Context, evt *events.RASEvent) { + if evt.ID == events.RASEngineSelfTerminated { + if err := handleEngineSelfTerminated(ctx, srv, evt); err != nil { + srv.log.Errorf("handleEngineSelfTerminated: %s", err) + } + } + }) +} + +// registerSubscriptions doesn't handle received forwarded events but forwardable events are sent to +// the MS leader. Received events are logged on the host that they were raised (and first published) +// on. This is the initial behavior for all servers and only changes when leadership has been +// determined (when we change subscribers via registerLeaderSubscriptions). A handler is subscribed +// for local engine self-termination events. +func registerSubscriptions(srv *server) { srv.pubSub.Reset() srv.pubSub.Subscribe(events.RASTypeAny, srv.evtLogger) srv.pubSub.Subscribe(events.RASTypeStateChange, srv.evtForwarder) + srv.pubSub.Subscribe(events.RASTypeInfoOnly, subscribeEngineSelfTerminated(srv)) } func isSysSelfHealExcludeSet(svc *mgmtSvc) (bool, error) { @@ -840,8 +902,11 @@ func handleRankDead(ctx context.Context, srv *server, evt *events.RASEvent) { } } -// registerLeaderSubscriptions stops forwarding events to MS and instead starts -// handling received forwarded (and local) events. +// registerLeaderSubscriptions doesn't forward events to MS but instead handles received events by +// subscribing the management service membership and system-DB to StateChange event-type. Received +// events are logged on the host that they were raised (and first published) on. This behavior is +// triggered when a new leader steps-up. A handler is subscribed for local engine self-termination +// events and another for handling forwarded rank-dead events. func registerLeaderSubscriptions(srv *server) { srv.pubSub.Reset() srv.pubSub.Subscribe(events.RASTypeAny, srv.evtLogger) @@ -854,6 +919,7 @@ func registerLeaderSubscriptions(srv *server) { handleRankDead(ctx, srv, evt) } })) + srv.pubSub.Subscribe(events.RASTypeInfoOnly, subscribeEngineSelfTerminated(srv)) // Add a debounce to throttle multiple SWIM Rank Dead events for the same rank/incarnation. srv.pubSub.Debounce(events.RASSwimRankDead, 0, func(ev *events.RASEvent) string { diff --git a/src/control/server/server_utils_test.go b/src/control/server/server_utils_test.go index ce1d3117871..1dc57554699 100644 --- a/src/control/server/server_utils_test.go +++ b/src/control/server/server_utils_test.go @@ -8,12 +8,16 @@ package server import ( + "context" "fmt" "net" "os" "os/user" "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/dustin/go-humanize" "github.com/google/go-cmp/cmp" @@ -21,7 +25,10 @@ import ( "github.com/daos-stack/daos/src/control/common" "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/events" + "github.com/daos-stack/daos/src/control/lib/control" "github.com/daos-stack/daos/src/control/lib/hardware" + "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/logging" sysprov "github.com/daos-stack/daos/src/control/provider/system" "github.com/daos-stack/daos/src/control/server/config" @@ -1981,3 +1988,750 @@ f0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 t.Fatalf("unexpected output format (-want, +got):\n%s\n", diff) } } + +const ( + testContextTimeout = 1 * time.Second + testHandlerTimeout = 1 * time.Second + testRestartRequestWait = 5 * time.Second + testSubscriptionDelay = 50 * time.Millisecond + testProcessingDelay = 100 * time.Millisecond +) + +func setupAddTestEngine(t *testing.T, log logging.Logger, h *EngineHarness, isRunning bool, ranks ...uint32) { + t.Helper() + + rank := uint32(1) + if len(ranks) != 0 { + rank = ranks[0] + } + + ei := newTestEngine(log, false, storage.MockProvider(log, 0, nil, nil, nil, nil, nil)) + setupTestEngine(t, ei, rank, !isRunning) + + if err := h.AddInstance(ei); err != nil { + t.Fatal(err) + } +} + +func TestServer_handleEngineSelfTerminated(t *testing.T) { + testRank := ranklist.Rank(1) + testIncarnation := uint64(42) + testHostname := "test-host-1" + validTimestamp := time.Now().Format(time.RFC3339) + + for name, tc := range map[string]struct { + evt *events.RASEvent + setupEngines func(*testing.T, logging.Logger, *EngineHarness) + disableEngineAutoRestart bool + engineAutoRestartDelay int + serverHostname string + expErr error + expRestartRequested bool + expLogContains []string + }{ + "forwarded event refused": { + evt: (&events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }).WithForwarded(true), + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + setupAddTestEngine(t, log, h, false) + }, + serverHostname: testHostname, + expRestartRequested: false, + expErr: errors.New("forwarded engine_self_terminated event"), + }, + "non-local event refused": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: "other-host", + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + setupAddTestEngine(t, log, h, false) + }, + serverHostname: testHostname, + expRestartRequested: false, + expErr: errors.New("non-local engine_self_terminated event"), + }, + "auto restart disabled by config": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + setupAddTestEngine(t, log, h, false) + }, + disableEngineAutoRestart: true, + expRestartRequested: false, + expLogContains: []string{ + "automatic engine restart disabled", + }, + }, + "nil event timestamp": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: "", + }, + expErr: errors.New("bad event timestamp"), + expRestartRequested: false, + }, + "invalid event timestamp": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: "not-a-valid-timestamp", + }, + expErr: errors.New("bad event timestamp"), + expRestartRequested: false, + }, + "rank not found in harness": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: 99, // Non-existent rank + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + e := newTestEngine(log, false, storage.MockProvider(log, 0, nil, nil, nil, nil, nil)) + e._superblock.Rank = ranklist.NewRankPtr(1) + if err := h.AddInstance(e); err != nil { + t.Fatal(err) + } + }, + expErr: errors.New("no instance found for rank 99"), + expRestartRequested: false, + }, + "filter instances error - nil superblock": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + e := newTestEngine(log, false, storage.MockProvider(log, 0, nil, nil, nil, nil, nil)) + e._superblock = nil + if err := h.AddInstance(e); err != nil { + t.Fatal(err) + } + }, + expErr: errors.New("no instance found for rank"), + expRestartRequested: false, + }, + "successful restart request - engine already stopped": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + setupAddTestEngine(t, log, h, false) + }, + expRestartRequested: true, + expLogContains: []string{ + fmt.Sprintf("rank %d:%d (instance 0) self terminated", testRank, testIncarnation), + testHostname, + }, + }, + "successful restart request - engine still running": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + setupAddTestEngine(t, log, h, true) + }, + expRestartRequested: false, + expLogContains: []string{ + fmt.Sprintf("rank %d:%d (instance 0) self terminated", testRank, testIncarnation), + }, + }, + "multiple engines - restart correct one": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: 2, + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + }, + setupEngines: func(t *testing.T, log logging.Logger, h *EngineHarness) { + for i := 0; i < 3; i++ { + setupAddTestEngine(t, log, h, false, uint32(i)) + } + }, + expRestartRequested: true, + expLogContains: []string{ + "rank 2:42 (instance 2) self terminated", + }, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), testContextTimeout) + defer cancel() + + harness := NewEngineHarness(log) + + if tc.setupEngines != nil { + tc.setupEngines(t, log, harness) + } + + cfg := &config.Server{ + DisableEngineAutoRestart: tc.disableEngineAutoRestart, + EngineAutoRestartMinDelay: tc.engineAutoRestartDelay, + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + srv := &server{ + log: log, + hostname: tc.serverHostname, + harness: harness, + cfg: cfg, + restartMgr: restartMgr, + } + + var wg sync.WaitGroup + var restartRequested atomic.Bool + + // Run go-routines for each engine which consume from startRequested channel + // otherwise the requestStart() instance methods would block. + if len(harness.instances) > 0 { + targetRank := ranklist.Rank(tc.evt.Rank) + for _, inst := range harness.instances { + rank, err := inst.GetRank() + if err != nil || rank != targetRank { + continue + } + + ei, ok := inst.(*EngineInstance) + if !ok { + continue + } + + wg.Add(1) + go func(e *EngineInstance) { + defer wg.Done() + select { + case <-ctx.Done(): + case <-e.startRequested: + restartRequested.Store(true) + } + }(ei) + } + } + + err := handleEngineSelfTerminated(ctx, srv, tc.evt) + wg.Wait() + + if tc.expRestartRequested { + time.Sleep(testProcessingDelay) + } + + test.CmpErr(t, tc.expErr, err) + + if tc.expRestartRequested != restartRequested.Load() { + t.Errorf("expected restart requested=%v, got=%v", + tc.expRestartRequested, restartRequested.Load()) + } + + logOutput := buf.String() + for _, expStr := range tc.expLogContains { + if !strings.Contains(logOutput, expStr) { + t.Errorf("expected log to contain %q, but it didn't\nLog:\n%s", + expStr, logOutput) + } + } + }) + } +} + +func TestServer_handleEngineSelfTerminated_RateLimiting(t *testing.T) { + testRank := ranklist.Rank(1) + testIncarnation := uint64(42) + testHostname := "test-host-1" + validTimestamp := time.Now().Format(time.RFC3339) + + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), 10*time.Second) + defer cancel() + + harness := NewEngineHarness(log) + setupAddTestEngine(t, log, harness, false) + + cfg := &config.Server{ + DisableEngineAutoRestart: false, + EngineAutoRestartMinDelay: 2, // 2 seconds for testing + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + srv := &server{ + log: log, + harness: harness, + cfg: cfg, + restartMgr: restartMgr, + } + + // Get reference to the engine instance for monitoring startRequested + instances, err := harness.FilterInstancesByRankSet("1") + if err != nil || len(instances) == 0 { + t.Fatalf("failed to get engine instance: %v", err) + } + e, ok := instances[0].(*EngineInstance) + if !ok { + t.Fatal("failed to cast to EngineInstance") + } + + evt := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation, + Hostname: testHostname, + Timestamp: validTimestamp, + } + + // Setup goroutine to consume startRequested channel + restartCount := atomic.Uint32{} + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for { + select { + case <-ctx.Done(): + return + case <-e.startRequested: + restartCount.Add(1) + case <-time.After(testRestartRequestWait): + return + } + } + }() + + // First restart should succeed immediately + err = handleEngineSelfTerminated(ctx, srv, evt) + if err != nil { + t.Fatalf("first restart failed: %v", err) + } + + // Wait for restart to be requested + time.Sleep(testProcessingDelay) + if restartCount.Load() != 1 { + t.Fatalf("expected 1 restart request, got %d", restartCount.Load()) + } + + // Second restart immediately after should be deferred (not rejected) + evt2 := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation + 1, + Hostname: testHostname, + Timestamp: time.Now().Format(time.RFC3339), + } + + err = handleEngineSelfTerminated(ctx, srv, evt2) + if err != nil { + t.Fatalf("second restart call failed: %v", err) + } + + // Verify no immediate restart (deferred) + time.Sleep(testProcessingDelay) + if restartCount.Load() != 1 { + t.Fatalf("expected restart to be deferred, got %d restarts", restartCount.Load()) + } + + // Verify rate limiting log message + logOutput := buf.String() + if !strings.Contains(logOutput, "rate limited") { + t.Errorf("expected log to contain 'rate limited', got: %s", logOutput) + } + + checkPending := func(t *testing.T, shouldExist bool) { + t.Helper() + restartMgr.mu.RLock() + defer restartMgr.mu.RUnlock() + _, exists := restartMgr.pendingRestart[testRank] + if exists && !shouldExist { + t.Fatal("expected pending restart timer to have been cleaned up") + } else if !exists && shouldExist { + t.Fatal("expected pending restart timer to exist") + } + } + + // Verify pending timer exists + checkPending(t, true) + + // Wait for the deferred restart to trigger + time.Sleep(time.Duration(srv.cfg.EngineAutoRestartMinDelay) * time.Second) + + // Verify deferred restart was executed + time.Sleep(testProcessingDelay) + if restartCount.Load() != 2 { + t.Fatalf("expected 2 total restarts after deferred delay, got %d", restartCount.Load()) + } + + // Verify pending timer was cleaned up + checkPending(t, false) + + // Third event immediately after deferred restart should again be deferred + evt3 := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: uint32(testRank), + Incarnation: testIncarnation + 2, + Hostname: testHostname, + Timestamp: time.Now().Format(time.RFC3339), + } + + err = handleEngineSelfTerminated(ctx, srv, evt3) + if err != nil { + t.Fatalf("third restart call failed: %v", err) + } + + // Should still be 2 restarts (third is deferred) + time.Sleep(testProcessingDelay) + if restartCount.Load() != 2 { + t.Fatalf("expected third restart to be deferred, got %d restarts", restartCount.Load()) + } + + <-doneCh +} + +func TestServer_handleEngineSelfTerminated_ErrorHandling(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), 2*time.Second) + defer cancel() + + harness := NewEngineHarness(log) + pubSub := events.NewPubSub(ctx, log) + + cfg := &config.Server{ + DisableEngineAutoRestart: false, + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + // Channel to signal when handler completes + handlerDone := make(chan struct{}) + var once sync.Once + + srv := &server{ + log: log, + harness: harness, + pubSub: pubSub, + evtLogger: control.MockEventLogger(log), + cfg: cfg, + restartMgr: restartMgr, + } + + srv.pubSub.Subscribe(events.RASTypeInfoOnly, + events.HandlerFunc(func(ctx context.Context, evt *events.RASEvent) { + log.Debugf("ErrorHandling: handler called for event: ID=%v, Type=%v", + evt.ID, evt.Type) + switch evt.ID { + case events.RASEngineSelfTerminated: + log.Debugf("ErrorHandling: handling engine self termination event") + if err := handleEngineSelfTerminated(ctx, srv, evt); err != nil { + srv.log.Errorf("handleEngineSelfTerminated: %s", err) + } + once.Do(func() { close(handlerDone) }) + } + })) + + // Give the subscription time to register in the eventLoop + time.Sleep(testSubscriptionDelay) + + evt := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Type: events.RASTypeInfoOnly, + Rank: 1, + Incarnation: 42, + Hostname: "test-host", + Timestamp: time.Now().Format(time.RFC3339), + } + + pubSub.Publish(evt) + + // Wait for handler to complete or timeout + select { + case <-handlerDone: + // Handler completed + case <-time.After(testHandlerTimeout): + t.Fatal("timeout waiting for handler to complete") + } + + t.Log(buf.String()) + if !strings.Contains(buf.String(), "handleEngineSelfTerminated") { + t.Error("expected error to be logged by handler") + } + if !strings.Contains(buf.String(), "no instance found") { + t.Errorf("expected 'no instance found' in log, got:\n%s", buf.String()) + } +} + +func TestServer_handleEngineSelfTerminated_EdgeCases(t *testing.T) { + validTimestamp := time.Now().Format(time.RFC3339) + + for name, tc := range map[string]struct { + evt *events.RASEvent + expErrContains string + }{ + "zero incarnation": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: 1, + Incarnation: 0, + Hostname: "test-host", + Timestamp: validTimestamp, + }, + expErrContains: "no instance found", + }, + "very high rank number": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: 999999, + Incarnation: 1, + Hostname: "test-host", + Timestamp: validTimestamp, + }, + expErrContains: "no instance found for rank", + }, + "max incarnation value": { + evt: &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Rank: 1, + Incarnation: ^uint64(0), + Hostname: "test-host", + Timestamp: validTimestamp, + }, + expErrContains: "no instance found", + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), testContextTimeout) + defer cancel() + + harness := NewEngineHarness(log) + + cfg := &config.Server{ + DisableEngineAutoRestart: false, + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + srv := &server{ + log: log, + harness: harness, + cfg: cfg, + restartMgr: restartMgr, + } + + err := handleEngineSelfTerminated(ctx, srv, tc.evt) + + if err == nil { + t.Fatalf("expected error, got nil") + } + + if !strings.Contains(err.Error(), tc.expErrContains) { + t.Errorf("expected error containing %q, got: %s", + tc.expErrContains, err) + } + }) + } +} + +func TestServer_registerSubscriptions_includesSelfTerminated(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), 2*time.Second) + defer cancel() + + harness := NewEngineHarness(log) + pubSub := events.NewPubSub(ctx, log) + + cfg := &config.Server{ + DisableEngineAutoRestart: false, + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + // Channel to signal when ANY handler processes the event + eventProcessed := make(chan struct{}) + var once sync.Once + + srv := &server{ + log: log, + harness: harness, + pubSub: pubSub, + evtLogger: control.MockEventLogger(log), + cfg: cfg, + restartMgr: restartMgr, + } + + registerSubscriptions(srv) + + // Add a secondary subscriber to detect when event is processed + // This ensures the event has gone through the pubsub system + pubSub.Subscribe(events.RASTypeInfoOnly, + events.HandlerFunc(func(ctx context.Context, evt *events.RASEvent) { + if evt.ID == events.RASEngineSelfTerminated { + once.Do(func() { close(eventProcessed) }) + } + })) + + // Give the subscription time to register in the eventLoop + time.Sleep(testSubscriptionDelay) + + evt := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Type: events.RASTypeInfoOnly, + Rank: 1, + Incarnation: 42, + Hostname: "test-host", + Timestamp: time.Now().Format(time.RFC3339), + } + + pubSub.Publish(evt) + + // Wait for event to be processed or timeout + select { + case <-eventProcessed: + // Event was processed + case <-time.After(testHandlerTimeout): + t.Fatal("timeout waiting for engine self terminated event to be processed") + } + + time.Sleep(testProcessingDelay) + + logOutput := buf.String() + hasHandler := strings.Contains(logOutput, "handleEngineSelfTerminated") || + strings.Contains(logOutput, "no instance found") || + strings.Contains(logOutput, "handling engine self termination") + + if !hasHandler { + t.Errorf("engine self termination handler does not appear to be registered\nLog:\n%s", logOutput) + } +} + +func TestServer_registerLeaderSubscriptions_includesSelfTerminated(t *testing.T) { + const testProcessingTimeout = 2 * time.Second + + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, cancel := context.WithTimeout(test.Context(t), testProcessingTimeout+time.Second) + defer cancel() + + harness := NewEngineHarness(log) + pubSub := events.NewPubSub(ctx, log) + + svc := newTestMgmtSvc(t, log) + + cfg := &config.Server{ + DisableEngineAutoRestart: false, + } + + restartMgr := newEngineRestartManager(log, cfg) + restartMgr.start(ctx) + defer restartMgr.stop() + + // Channel to signal when ANY handler processes the event + eventProcessed := make(chan struct{}) + var once sync.Once + + srv := &server{ + log: log, + harness: harness, + pubSub: pubSub, + evtLogger: control.MockEventLogger(log), + membership: svc.membership, + sysdb: svc.sysdb, + mgmtSvc: svc, + cfg: cfg, + restartMgr: restartMgr, + } + + registerLeaderSubscriptions(srv) + + // Add a secondary subscriber to detect when event is processed + // This ensures the event has gone through the pubsub system + pubSub.Subscribe(events.RASTypeInfoOnly, + events.HandlerFunc(func(ctx context.Context, evt *events.RASEvent) { + if evt.ID == events.RASEngineSelfTerminated { + once.Do(func() { close(eventProcessed) }) + } + })) + + // Give the subscription time to register in the eventLoop + time.Sleep(testSubscriptionDelay) + + evt := &events.RASEvent{ + ID: events.RASEngineSelfTerminated, + Type: events.RASTypeInfoOnly, + Rank: 1, + Incarnation: 42, + Hostname: "test-host", + Timestamp: time.Now().Format(time.RFC3339), + } + + pubSub.Publish(evt) + + // Wait for event to be processed or timeout + select { + case <-eventProcessed: + // Event was processed + case <-time.After(testProcessingTimeout): + t.Fatal("timeout waiting for engine self terminated event to be processed") + } + + time.Sleep(testProcessingDelay) + + logOutput := buf.String() + hasHandler := strings.Contains(logOutput, "handleEngineSelfTerminated") || + strings.Contains(logOutput, "no instance found") || + strings.Contains(logOutput, "handling engine self termination") + + if !hasHandler { + t.Errorf("engine self termination handler does not appear to be registered\nLog:\n%s", logOutput) + } +} diff --git a/src/engine/drpc_ras.c b/src/engine/drpc_ras.c index 8277e5001cc..26d0f5b6400 100644 --- a/src/engine/drpc_ras.c +++ b/src/engine/drpc_ras.c @@ -366,3 +366,14 @@ ds_notify_swim_rank_dead(d_rank_t rank, uint64_t incarnation) NULL /* pool */, NULL /* cont */, NULL /* objid */, NULL /* ctlop */, &evt, false /* wait_for_resp */); } + +int +ds_notify_rank_self_terminated(d_rank_t rank, uint64_t incarnation) +{ + Shared__RASEvent evt = SHARED__RASEVENT__INIT; + + return raise_ras(RAS_ENGINE_SELF_TERMINATED, "excluded rank self terminated detected", + RAS_TYPE_INFO, RAS_SEV_NOTICE, NULL /* hwid */, &rank /* rank */, + &incarnation /* inc */, NULL /* jobid */, NULL /* pool */, NULL /* cont */, + NULL /* objid */, NULL /* ctlop */, &evt, false /* wait_for_resp */); +} diff --git a/src/engine/init.c b/src/engine/init.c index b29fac7c55b..00731618bbc 100644 --- a/src/engine/init.c +++ b/src/engine/init.c @@ -597,17 +597,21 @@ dss_crt_event_cb(d_rank_t rank, uint64_t incarnation, enum crt_event_source src, if (rank == dss_self_rank()) { D_WARN("raising SIGKILL: exclusion of this engine (rank %u) detected\n", self_rank); - /* - * For now, we just raise a SIGKILL to ourselves; we could - * inform daos_server, who would initiate a termination and - * decide whether to restart us. + + /** + * Send RAS event to inform local server of intentional self termination + * before raising a SIGKILL to ourselves. Local daos_server can then decide + * whether to restart rank. */ + rc = ds_notify_rank_self_terminated(rank, incarnation); + if (rc) + D_ERROR("failed to handle %u/%u event: " DF_RC "\n", src, type, + DP_RC(rc)); + rc = kill(getpid(), SIGKILL); if (rc != 0) D_ERROR("failed to raise SIGKILL: %d\n", errno); - return; } - } } diff --git a/src/include/daos_srv/ras.h b/src/include/daos_srv/ras.h index c30bee513d4..7f47d540af6 100644 --- a/src/include/daos_srv/ras.h +++ b/src/include/daos_srv/ras.h @@ -42,6 +42,7 @@ X(RAS_ENGINE_DIED, "engine_died") \ X(RAS_ENGINE_ASSERTED, "engine_asserted") \ X(RAS_ENGINE_CLOCK_DRIFT, "engine_clock_drift") \ + X(RAS_ENGINE_SELF_TERMINATED, "engine_self_terminated") \ X(RAS_POOL_CORRUPTION_DETECTED, "pool_corruption_detected") \ X(RAS_POOL_REBUILD_START, "pool_rebuild_started") \ X(RAS_POOL_REBUILD_END, "pool_rebuild_finished") \ @@ -238,4 +239,15 @@ ds_notify_pool_svc_update(uuid_t *pool, d_rank_list_t *svcl, uint64_t version); int ds_notify_swim_rank_dead(d_rank_t rank, uint64_t incarnation); +/** + * Notify control plane that an excluded engine has self terminated. + * + * \param[in] rank Rank that self terminated. + * \param[in] incarnation Incarnation of rank that self terminated. + * + * \retval Zero on success, non-zero otherwise. + */ +int +ds_notify_rank_self_terminated(d_rank_t rank, uint64_t incarnation); + #endif /* __DAOS_RAS_H_ */ diff --git a/src/tests/ftest/control/engine_auto_restart.py b/src/tests/ftest/control/engine_auto_restart.py new file mode 100644 index 00000000000..e2689ada126 --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart.py @@ -0,0 +1,179 @@ +""" + (C) Copyright 2026 Hewlett Packard Enterprise Development LP + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import time + +from control_test_base import ControlTestBase + + +class EngineAutoRestartTest(ControlTestBase): + """Test automatic engine restart on self-termination. + + Test Class Description: + Verify automatic engine restart behavior when engines self-terminate + after being excluded from the system. + + :avocado: recursive + """ + + def setUp(self): + """Set up for engine_auto_restart tests""" + super().setUp() + + # Make sure we reset the restart state even if the test fails + self.register_cleanup(self.reset_engine_restart_state) + + def test_auto_restart_basic(self): + """Test basic automatic engine restart after self-termination. + + Test Description: + 1. Exclude a rank from the system + 2. Wait for rank to self-terminate + 3. Verify rank automatically restarts and rejoins the system + + :avocado: tags=all,pr,daily_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart + :avocado: tags=EngineAutoRestartTest,test_auto_restart_basic + """ + all_ranks = self.get_all_ranks() + if len(all_ranks) < 2: + self.fail("Test requires at least 2 ranks") + + test_rank = self.random.choice(all_ranks) + + self.log_step(f"testing automatic restart of rank {test_rank}") + + # get initial incarnation number + initial_incarnation = self.get_rank_incarnation(test_rank) + + self.log.info("rank %s initial incarnation: %s", test_rank, initial_incarnation) + + restarted, final_state = self.exclude_rank_and_wait_restart(test_rank) + if not restarted: + self.fail(f"rank {test_rank} did not automatically restart. " + f"final state: {final_state}") + + # verify incarnation increased after restart + final_incarnation = self.get_rank_incarnation(test_rank) + + self.log.info("rank %s final incarnation: %s", test_rank, final_incarnation) + + if final_incarnation <= initial_incarnation: + self.fail(f"rank {test_rank} incarnation did not increase after restart. " + f"before: {initial_incarnation}, after: {final_incarnation}") + + self.log.info("SUCCESS: rank %s automatically restarted after self-termination " + "(incarnation %s -> %s)", + test_rank, initial_incarnation, final_incarnation) + + def test_auto_restart_multiple_ranks(self): + """Test automatic restart of multiple ranks. + + Test Description: + 1. Exclude multiple ranks simultaneously + 2. Wait for all to self-terminate + 3. Verify all automatically restart and rejoin + + :avocado: tags=all,full_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart + :avocado: tags=EngineAutoRestartTest,test_auto_restart_multiple_ranks + """ + all_ranks = self.get_all_ranks() + if len(all_ranks) < 3: + self.fail("Test requires at least 3 ranks") + + # Exclude half the ranks + num_to_test = max(2, len(all_ranks) // 2) + test_ranks = self.random.sample(all_ranks, num_to_test) + + self.log_step(f"Exclude {num_to_test} ranks: {test_ranks}") + + incs = [] + for rank in test_ranks: + initial_incarnation = self.get_rank_incarnation(rank) + incs.append(initial_incarnation) + self.dmg.system_exclude(ranks=[rank], rank_hosts=None) + time.sleep(1) # small delay between exclusions + self.dmg.system_clear_exclude(ranks=[rank], rank_hosts=None) + + # Wait and verify all restart + wait_time = 35 + self.log_step(f"Waiting {wait_time}s to verify all automatically restart") + + errors = [] + end_incs = [] + for rank in test_ranks: + failed = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["joined"], max_checks=wait_time) + if failed: + errors.append(f"Rank {rank} unexpectedly not restarted when auto-restart enabled") + end_incarnation = self.get_rank_incarnation(rank) + end_incs.append(end_incarnation) + + if errors: + self.fail("\n".join(errors)) + + # Show changes + for idx, (old, new) in enumerate(zip(incs, end_incs)): + actual_rank = test_ranks[idx] + if new > old: + self.log.debug("Rank %s: %s -> %s (restarted)", actual_rank, old, new) + else: + self.log.debug("Rank %s: %s -> %s (NOT restarted!)", actual_rank, old, new) + + # Verify all increased + all_increased = all(a > b for b, a in zip(incs, end_incs)) + if not all_increased: + self.fail("ERROR: Not all ranks restarted!") + + self.log.info("SUCCESS: All of %s automatically restarted", test_ranks) + + def test_auto_restart_with_pool(self): + """Test automatic restart works with active pools. + + Test Description: + 1. Create a pool + 2. Exclude a rank (not in pool service) + 3. Verify rank automatically restarts + 4. Verify pool remains accessible + + :avocado: tags=all,daily_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart,pool + :avocado: tags=EngineAutoRestartTest,test_auto_restart_with_pool + """ + all_ranks = self.get_all_ranks() + if len(all_ranks) < 4: + self.fail("Test requires at least 4 ranks") + + # Create pool first + pool = self.get_pool(connect=False) + + test_rank = all_ranks[-1] + + self.log_step(f"Excluding non-service rank {test_rank} while pool is active") + + # Get initial incarnation + initial_incarnation = self.get_rank_incarnation(test_rank) + + restarted, final_state = self.exclude_rank_and_wait_restart(test_rank) + if not restarted: + self.fail(f"Rank {test_rank} did not restart. State: {final_state}") + + # Verify incarnation increased + final_incarnation = self.get_rank_incarnation(test_rank) + + if final_incarnation <= initial_incarnation: + self.fail(f"Rank {test_rank} incarnation did not increase. " + f"Before: {initial_incarnation}, After: {final_incarnation}") + + # Verify pool is still accessible + self.log_step("Verifying pool is still accessible after rank restart") + pool.query() + + self.log.info("SUCCESS: Rank %s restarted (incarnation %s -> %s) and pool remains " + "accessible", test_rank, initial_incarnation, final_incarnation) diff --git a/src/tests/ftest/control/engine_auto_restart.yaml b/src/tests/ftest/control/engine_auto_restart.yaml new file mode 100644 index 00000000000..24a512d1c2b --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart.yaml @@ -0,0 +1,19 @@ +hosts: + test_servers: 2 +timeout: 300 +server_config: + name: daos_server + engines_per_host: 2 + engines: + 0: + log_file: daos_server0.log + targets: 4 + nr_xs_helpers: 0 + storage: auto + 1: + log_file: daos_server1.log + targets: 4 + nr_xs_helpers: 0 + storage: auto +pool: + size: 2G diff --git a/src/tests/ftest/control/engine_auto_restart_advanced.py b/src/tests/ftest/control/engine_auto_restart_advanced.py new file mode 100644 index 00000000000..bc902be5e89 --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart_advanced.py @@ -0,0 +1,154 @@ +""" + (C) Copyright 2026 Hewlett Packard Enterprise Development LP + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import time + +from control_test_base import ControlTestBase + + +class EngineAutoRestartAdvanced(ControlTestBase): + """Test advanced automatic engine restart scenarios. + + Test Class Description: + Verify automatic engine restart with custom configurations including + rate-limiting, deferred restarts, and disabled restart behavior. + + :avocado: recursive + """ + + def setUp(self): + """Set up for engine_auto_restart_advanced tests""" + super().setUp() + + # Make sure we reset the restart state even if the test fails + self.register_cleanup(self.reset_engine_restart_state) + + def wait_for_rank_state(self, rank, expected_state, timeout=30, check_interval=2): + """Wait for a rank to reach expected state. + + Args: + rank (int): Rank number + expected_state (str): Expected state + timeout (int): Maximum seconds to wait + check_interval (int): Seconds between state checks + + Returns: + bool: True if state reached, False if timeout + """ + start_time = time.time() + + while time.time() - start_time < timeout: + failed_ranks = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=[expected_state], max_checks=1) + + if not failed_ranks: + self.log.info("Rank %s reached state '%s' after %.1fs", + rank, expected_state, time.time() - start_time) + return True + + time.sleep(check_interval) + + current_state = self.get_rank_state(rank) + self.log.warning("Rank %s did not reach '%s' within %ss. Current state: %s", + rank, expected_state, timeout, current_state) + return False + + def test_deferred_restart(self): + """Test deferred restart when multiple self-terminations occur rapidly. Use custom delay. + + Test Description: + This test requires custom server configuration with a short + engine_auto_restart_min_delay (20 seconds) to avoid long test runtime. + + 1. Exclude rank and wait for automatic restart (first restart) + 2. Immediately exclude same rank again (second self-termination) + Confirm restart is deferred, not immediate + 3. Wait for deferred restart to execute after delay expires + Confirm deferred restart executes successfully and rank joined + 4. Measure time until deferred restart executed + 5. Verify delay matches configured value + + :avocado: tags=all,full_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart + :avocado: tags=EngineAutoRestartAdvanced,test_deferred_restart + """ + # Get configured restart delay from test params + expected_delay = self.params.get("engine_auto_restart_min_delay", + "/run/server_config/*", 20) + + all_ranks = self.get_all_ranks() + if len(all_ranks) < 2: + self.fail("Test requires at least 2 ranks") + + test_rank = self.random.choice(all_ranks) + + self.log_step(f"Automatic restart of rank {test_rank}") + + # Get initial incarnation + initial_incarnation = self.get_rank_incarnation(test_rank) + + restarted, final_state = self.exclude_rank_and_wait_restart(test_rank) + if not restarted: + self.fail(f"Rank {test_rank} did not automatically restart. " + f"Final state: {final_state}") + + # Verify incarnation increased + first_restart_incarnation = self.get_rank_incarnation(test_rank) + + if first_restart_incarnation <= initial_incarnation: + self.fail(f"Rank {test_rank} incarnation did not increase after first restart. " + f"Before: {initial_incarnation}, After: {first_restart_incarnation}") + + first_restart_time = time.time() + self.log.info("First restart completed at T=%.1f (incarnation %s -> %s)", + first_restart_time, initial_incarnation, first_restart_incarnation) + + # Second exclusion - should be deferred due to rate-limiting + self.log_step(f"Second exclusion of rank {test_rank} (should be deferred)") + + restarted, final_state = self.exclude_rank_and_wait_restart(test_rank, + timeout=10) + + if restarted: + self.fail(f"Rank {test_rank} unexpectedly restarted. Final state: {final_state}") + + self.log.info("Confirmed: Restart is deferred (rank still in excluded state)") + + # Wait for deferred restart to execute (after delay expires), add buffer + wait_time = expected_delay + 5 + self.log_step(f"Waiting {wait_time}s for deferred restart to execute") + + if not self.wait_for_rank_state(test_rank, "joined", timeout=wait_time): + self.fail(f"Rank {test_rank} did not restart after rate-limit delay") + + # Verify incarnation increased again after deferred restart + deferred_restart_incarnation = self.get_rank_incarnation(test_rank) + + if deferred_restart_incarnation <= first_restart_incarnation: + self.fail(f"Rank {test_rank} incarnation did not increase after deferred restart. " + f"After first: {first_restart_incarnation}, " + f"After deferred: {deferred_restart_incarnation}") + + self.log_step("Measure time between initial and deferred restarts") + deferred_restart_time = time.time() + actual_delay = deferred_restart_time - first_restart_time + + self.log.info("Confirmed: Deferred restart executed after %.1fs (expected ~%ss), " + "incarnation %s -> %s", + actual_delay, expected_delay, + first_restart_incarnation, deferred_restart_incarnation) + + self.log_step("Verify delay was approximately correct (80%% to 200%% of expected)") + min_delay = expected_delay * 0.8 + max_delay = expected_delay * 2.0 + + if actual_delay < min_delay: + self.fail(f"Restart too early: {actual_delay:.1f}s < {min_delay:.1f}s") + elif actual_delay > max_delay: + self.fail(f"Restart too late: {actual_delay:.1f}s > {max_delay:.1f}s") + else: + self.log.info("SUCCESS: Restart delay within expected range [%.1fs, %.1fs]", + min_delay, max_delay) diff --git a/src/tests/ftest/control/engine_auto_restart_advanced.yaml b/src/tests/ftest/control/engine_auto_restart_advanced.yaml new file mode 100644 index 00000000000..0b0b3045d17 --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart_advanced.yaml @@ -0,0 +1,20 @@ +hosts: + test_servers: 1 +timeout: 400 +server_config: + name: daos_server + engines_per_host: 2 + engine_auto_restart_min_delay: 30 + engines: + 0: + log_file: daos_server0.log + targets: 4 + nr_xs_helpers: 0 + storage: auto + 1: + log_file: daos_server1.log + targets: 4 + nr_xs_helpers: 0 + storage: auto +pool: + size: 8G diff --git a/src/tests/ftest/control/engine_auto_restart_disabled.py b/src/tests/ftest/control/engine_auto_restart_disabled.py new file mode 100644 index 00000000000..f1a52803a4c --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart_disabled.py @@ -0,0 +1,152 @@ +""" + (C) Copyright 2026 Hewlett Packard Enterprise Development LP + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import time + +from control_test_base import ControlTestBase +from general_utils import report_errors + + +class EngineAutoRestartDisabled(ControlTestBase): + """Test automatic engine restart disabled configuration. + + Test Class Description: + Verify that automatic engine restart can be disabled and that + excluded ranks stay excluded when auto-restart is disabled. + + :avocado: recursive + """ + + def setUp(self): + """Set up for engine_auto_restart_disabled tests""" + super().setUp() + + # Make sure we reset the restart state even if the test fails + self.register_cleanup(self.reset_engine_restart_state) + + def test_no_restart_when_disabled(self): + """Test that engines do not automatically restart when feature is disabled. + + Test Description: + Server is configured with disable_engine_auto_restart: true. + + 1. Exclude a rank from the system + 2. Wait for rank to self-terminate + 3. Wait additional time to verify NO automatic restart occurs + 4. Manually start the rank to verify it can still be started + 5. Verify manual start succeeds + + :avocado: tags=all,daily_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart + :avocado: tags=EngineAutoRestartDisabled,test_no_restart_when_disabled + """ + all_ranks = self.get_all_ranks() + if len(all_ranks) < 2: + self.fail("Test requires at least 2 ranks") + + test_rank = self.random.choice(all_ranks) + + self.log_step("Excluding rank {test_rank} (auto-restart is DISABLED)") + + restarted, _ = self.exclude_rank_and_wait_restart(test_rank, timeout=35) + + if restarted: + self.fail("Rank {test_rank} unexpectedly restarted when auto-restart disabled!") + + self.log.info("Confirmed: Rank %s did NOT automatically restart (as expected)", test_rank) + + # Manually start the rank + self.log_step("Manually starting rank {test_rank}") + self.dmg.system_start(ranks=test_rank) + + # Verify manual start succeeds + failed_ranks = self.server_managers[0].check_rank_state( + ranks=[test_rank], valid_states=["joined"], max_checks=15) + if failed_ranks: + self.fail(f"Manual start of rank {test_rank} failed") + + self.log.info("SUCCESS: Rank %s stayed excluded when auto-restart disabled, and manual " + "start succeeded", test_rank) + + def test_multiple_ranks_no_restart(self): + """Test that multiple excluded ranks stay excluded when auto-restart disabled. + + Test Description: + Server configured with disable_engine_auto_restart: true. + + 1. Exclude multiple ranks + 2. Verify all self-terminate and reach AdminExcluded state + 3. Wait to confirm none automatically restart + 4. Manually restart all ranks + 5. Verify all successfully rejoin + + :avocado: tags=all,full_regression + :avocado: tags=hw,medium + :avocado: tags=dmg,control,engine_auto_restart + :avocado: tags=EngineAutoRestartDisabled,test_multiple_ranks_no_restart + """ + all_ranks = self.get_all_ranks() + if len(all_ranks) < 3: + self.fail("Test requires at least 3 ranks") + + # Exclude half the ranks + num_to_test = max(2, len(all_ranks) // 2) + test_ranks = self.random.sample(all_ranks, num_to_test) + + self.log_step("Excluding {num_to_test} ranks: {test_ranks}") + + for rank in test_ranks: + self.dmg.system_exclude(ranks=[rank], rank_hosts=None) + time.sleep(1) # Small delay between exclusions + + # Verify all reach adminexcluded state + self.log_step("Verifying all ranks get excluded from system") + time.sleep(10) + + for rank in test_ranks: + failed = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["adminexcluded"], max_checks=5) + if failed: + self.fail("Rank {rank} did not get excluded from system") + self.dmg.system_clear_exclude(ranks=[rank], rank_hosts=None) + + # Wait and verify none restart + wait_time = 20 + self.log_step("Waiting {wait_time}s to verify no automatic restarts") + time.sleep(wait_time) + + errors = [] + for rank in test_ranks: + failed = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["excluded"], max_checks=1) + if failed: + errors.append("Rank {rank} unexpectedly restarted when auto-restart disabled") + + if errors: + self.fail("\n".join(errors)) + + self.log.info("Confirmed: None of %s automatically restarted", test_ranks) + + # Manually restart all + self.log_step("Manually restart ranks") + + for rank in test_ranks: + self.dmg.system_start(ranks=rank) + + # Verify all rejoin + self.log_step("Verifying all ranks successfully rejoin") + time.sleep(10) + + for rank in test_ranks: + failed = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["joined"], max_checks=10) + if failed: + errors.append(f"Manual restart of rank {rank} failed") + + report_errors(test=self, errors=errors) + + self.log.info("SUCCESS: All %s ranks stayed excluded and manual restart succeeded", + num_to_test) diff --git a/src/tests/ftest/control/engine_auto_restart_disabled.yaml b/src/tests/ftest/control/engine_auto_restart_disabled.yaml new file mode 100644 index 00000000000..c9534640f58 --- /dev/null +++ b/src/tests/ftest/control/engine_auto_restart_disabled.yaml @@ -0,0 +1,20 @@ +hosts: + test_servers: 2 +timeout: 300 +server_config: + name: daos_server + engines_per_host: 2 + disable_engine_auto_restart: true + engines: + 0: + log_file: daos_server0.log + targets: 4 + nr_xs_helpers: 0 + storage: auto + 1: + log_file: daos_server1.log + targets: 4 + nr_xs_helpers: 0 + storage: auto +pool: + size: 8G diff --git a/src/tests/ftest/util/control_test_base.py b/src/tests/ftest/util/control_test_base.py index eff064f53f2..7d8e06e9d4d 100644 --- a/src/tests/ftest/util/control_test_base.py +++ b/src/tests/ftest/util/control_test_base.py @@ -1,11 +1,14 @@ """ (C) Copyright 2020-2022 Intel Corporation. + (C) Copyright 2026 Hewlett Packard Enterprise Development LP SPDX-License-Identifier: BSD-2-Clause-Patent """ +import time from apricot import TestWithServers from ClusterShell.NodeSet import NodeSet +from exception_utils import CommandFailure class ControlTestBase(TestWithServers): @@ -46,3 +49,165 @@ def verify_dmg_storage_scan(self, verify_method): if errors: self.fail("\n--- Errors found! ---\n{}".format("\n".join(errors))) + + def get_all_ranks(self): + """Get list of all ranks in the system. + + Returns: + list: List of all rank numbers + """ + return list(self.server_managers[0].ranks.keys()) + + def get_rank_state(self, rank): + """Get the state of a rank. + + Args: + rank (int): Rank number + + Returns: + str: Current state of the rank + """ + data = self.dmg.system_query(ranks=f"{rank}") + if data["status"] != 0: + raise CommandFailure("dmg system query failed") + if "response" in data and "members" in data["response"]: + if data["response"]["members"] is None: + raise CommandFailure("No members returned from dmg system query") + for member in data["response"]["members"]: + return member["state"].lower() + raise CommandFailure("No member state returned from dmg system query") + + def exclude_rank_and_wait_restart(self, rank, timeout=30): + """Exclude a rank and wait for it to self-terminate and potentially restart. + + Args: + rank (int): Rank to exclude + timeout (int): Maximum seconds to wait for restart + + Returns: + tuple: (restarted, final_state) - whether rank restarted and its final state + """ + self.log_step(f"Excluding rank {rank}") + self.dmg.system_exclude(ranks=[rank], rank_hosts=None) + + # Wait for rank to self-terminate (should go to AdminExcluded state) + self.log_step(f"Waiting for rank {rank} to self-terminate") + time.sleep(2) + + # Check if rank is adminexcluded + failed_ranks = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["adminexcluded"], max_checks=10) + if failed_ranks: + self.fail(f"Rank {rank} did not reach AdminExcluded state after exclusion") + + # After triggering rank exclusion with dmg system exclude, clear + # AdminExcluded state so rank can join on auto-restart. This enables + # mimic of rank exclusion via SWIM inactivity detection. + self.log_step(f"Clearing AdminExcluded state for rank {rank}") + self.dmg.system_clear_exclude(ranks=[rank], rank_hosts=None) + + # Check if rank is excluded + failed_ranks = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["excluded"], max_checks=10) + if failed_ranks: + self.fail(f"Rank {rank} did not reach Excluded state after clear-excluded") + + # Wait for automatic restart (rank should go to Joined state) + self.log_step(f"Waiting for rank {rank} to automatically restart") + start_time = time.time() + restarted = False + + while time.time() - start_time < timeout: + time.sleep(2) + # Check if rank has rejoined + failed_ranks = self.server_managers[0].check_rank_state( + ranks=[rank], valid_states=["joined"], max_checks=1) + if not failed_ranks: + restarted = True + break + + if restarted: + self.log.info("Rank %s automatically restarted and rejoined within %ss", rank, timeout) + return (True, "joined") + + state = self.get_rank_state(rank) + self.log.info("Rank %s (%s) did not restart within %ss", rank, state, timeout) + return (False, state) + + def get_rank_incarnation(self, rank): # pylint: disable=too-many-return-statements + """Get the incarnation number of a rank. + + The incarnation number increments each time a rank restarts, allowing + verification that a rank has actually restarted rather than just + remaining in the same state. + + Args: + rank (int): Rank number + + Returns: + int: Current incarnation number of the rank + + Raises: + Logs error and raises exception on failure + """ + data = self.dmg.system_query(ranks=f"{rank}") + if data.get("status") != 0: + self.log.error("dmg system query failed for rank %s", rank) + raise CommandFailure("dmg system query failed") + + if "response" not in data or "members" not in data["response"]: + self.log.error("Invalid response from dmg system query for rank %s", rank) + raise CommandFailure("dmg system query invalid response") + + members = data["response"]["members"] + if not members: + self.log.error("No members returned from dmg system query for rank %s", rank) + raise CommandFailure("dmg system query no members") + + for member in members: + if member.get("rank") == rank: + incarnation = member.get("incarnation") + if incarnation is not None: + self.log.debug("Rank %s incarnation: %s", rank, incarnation) + return incarnation + self.log.error("No incarnation field for rank %s", rank) + raise CommandFailure("dmg system query no incarnation for member") + + self.log.error("Rank %s not found in system query response", rank) + raise CommandFailure("dmg system query no matching member") + + def reset_engine_restart_state(self): + """Reset engine auto-restart state between tests. + + The engine restart manager tracks last restart times for rate-limiting + automatic restarts. This state persists across test methods when servers + continue running, which can cause unexpected rate-limiting behavior in + sequential tests. + + This method resets the state by restarting all servers via: + 1. dmg system stop (automatically clears restart history for stopped ranks) + 2. dmg system start (automatically clears restart history for started ranks) + 3. Wait for all ranks to rejoin + + The automatic clearing is handled by SystemStop/SystemStart in mgmt_system.go, + which calls clearRankRestartHistory() for affected ranks. + + Usage: + Should be called in tearDown() of test classes that use engine restart + functionality. If this method fails, tearDown() should fail the test + to prevent subsequent tests from running with contaminated state. + + Note: + This operation adds ~5-10 seconds per test due to server restart overhead, + but is necessary to ensure test isolation and reliable results. + """ + self.log.info("Restarting servers to reset engine restart manager state") + self.server_managers[0].system_stop() + self.server_managers[0].system_start() + + # Wait for all ranks to join + all_ranks = self.get_all_ranks() + failed_ranks = self.server_managers[0].check_rank_state( + ranks=all_ranks, valid_states=["joined"], max_checks=30) + if failed_ranks: + self.log.warning("Some ranks failed to rejoin after restart: %s", failed_ranks) diff --git a/src/tests/ftest/util/dmg_utils.py b/src/tests/ftest/util/dmg_utils.py index 106f0e50426..e4b51e72c40 100644 --- a/src/tests/ftest/util/dmg_utils.py +++ b/src/tests/ftest/util/dmg_utils.py @@ -1212,7 +1212,11 @@ def system_query(self, ranks=None, verbose=True): # "uuid": "e7f2cb06-a111-4d55-a6a5-b494b70d62ab", # "fabric_uri": "ofi+sockets://192.168.100.11:31416", # "fabric_contexts": 17, - # "info": "" + # "secondary_fabric_uri": "", + # "secondary_fabric_contexts": 0, + # "info": "", + # "last_update": "", + # "incarnation": 10 # }, # { # "addr": "10.8.1.74:10001", @@ -1222,7 +1226,11 @@ def system_query(self, ranks=None, verbose=True): # "uuid": "db36ab28-fdb0-4822-97e6-89547393ed03", # "fabric_uri": "ofi+sockets://192.168.100.74:31416", # "fabric_contexts": 17, - # "info": "" + # "secondary_fabric_uri": "", + # "secondary_fabric_contexts": 0, + # "info": "", + # "last_update": "", + # "incarnation": 12 # } # ] # }, diff --git a/src/tests/ftest/util/server_utils_params.py b/src/tests/ftest/util/server_utils_params.py index 36c9f5eb946..273026d94e7 100644 --- a/src/tests/ftest/util/server_utils_params.py +++ b/src/tests/ftest/util/server_utils_params.py @@ -57,7 +57,7 @@ def _get_new(self): return DaosServerTransportCredentials(self._log_dir) -class DaosServerYamlParameters(YamlParameters): +class DaosServerYamlParameters(YamlParameters): # pylint: disable=too-many-instance-attributes """Defines the daos_server configuration yaml parameters.""" def __init__(self, filename, common_yaml, version=None): @@ -176,6 +176,10 @@ def __init__(self, filename, common_yaml, version=None): self.fault_path = BasicParameter(None) self.fault_cb = BasicParameter(None) + # Engine auto-restart parameters + self.disable_engine_auto_restart = BasicParameter(None) + self.engine_auto_restart_min_delay = BasicParameter(None) + def get_params(self, test): """Get values for all of the command params from the yaml file. @@ -443,7 +447,7 @@ def _get_new(self): return ControlMetadataParameters(self.namespace) -class EngineYamlParameters(YamlParameters): +class EngineYamlParameters(YamlParameters): # pylint: disable=too-many-instance-attributes """Defines the configuration yaml parameters for a single server engine.""" # Engine environment variables that are required by provider type. @@ -884,7 +888,7 @@ def _get_new(self): return StorageYamlParameters(self.namespace, self._max_tiers) -class StorageTierYamlParameters(YamlParameters): +class StorageTierYamlParameters(YamlParameters): # pylint: disable=too-many-instance-attributes """Defines the configuration yaml parameters for each storage tier for an engine.""" def __init__(self, base_namespace, tier): diff --git a/utils/config/daos_server.yml b/utils/config/daos_server.yml index 8a0737b65c0..a16693c3564 100644 --- a/utils/config/daos_server.yml +++ b/utils/config/daos_server.yml @@ -310,6 +310,28 @@ #telemetry_port: 9191 # # +## Disable automatic restart of engines that self-terminate. +# +## When an excluded engine self-terminates, the control plane automatically restarts it +## by default. Set this option to true to disable automatic restarts entirely. +# +## default: false +#disable_engine_auto_restart: true +# +# +## Minimum delay (in seconds) between automatic restarts of the same rank. +# +## When an excluded engine self-terminates, the control plane automatically restarts it +## after a configurable delay to prevent rapid restart loops. This setting specifies the +## minimum time that must elapse after restarting a rank before it can be automatically +## restarted again. If a self-termination event occurs before this delay expires, a +## deferred restart is scheduled that will automatically trigger when the delay period +## ends. Multiple events during the delay result in only one deferred restart. +# +## default: 300 (5 minutes) +#engine_auto_restart_min_delay: 120 +# +# ## If desired, a set of client-side environment variables may be ## defined here. Note that these are intended to be defaults and ## may be overridden by manually-set environment variables when