Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewManager(
packagesDir: options.PackagesDir,
ftdc: options.FTDC,
modPeerConnTracker: options.ModPeerConnTracker,
failedModules: make(map[string]bool),
}
return ret, nil
}
Expand Down Expand Up @@ -151,6 +152,9 @@ type Manager struct {
// modPeerConnTracker must be updated as modules create/destroy any underlying WebRTC
// PeerConnections.
modPeerConnTracker *rdkgrpc.ModPeerConnTracker

failedModulesMu sync.RWMutex
failedModules map[string]bool
}

// Close terminates module connections and processes.
Expand Down Expand Up @@ -256,6 +260,7 @@ func (mgr *Manager) Add(ctx context.Context, confs ...config.Module) error {
// The config was already validated, but we must check again before attempting to add.
if err := conf.Validate(""); err != nil {
mgr.logger.CErrorw(ctx, "Module config validation error; skipping", "module", conf.Name, "error", err)
mgr.AddToFailedModules(conf.Name)
errs[i] = err
continue
}
Expand All @@ -270,9 +275,12 @@ func (mgr *Manager) Add(ctx context.Context, confs ...config.Module) error {
err := mgr.add(ctx, conf, moduleLogger)
if err != nil {
moduleLogger.CErrorw(ctx, "Error adding module", "module", conf.Name, "error", err)
mgr.AddToFailedModules(conf.Name)
errs[i] = err
return
}
// module started successfully, remove it from failedModules
mgr.deleteFromFailedModules(conf.Name)
}(i, conf)
}
wg.Wait()
Expand Down Expand Up @@ -428,10 +436,15 @@ func (mgr *Manager) Reconfigure(ctx context.Context, conf config.Module) ([]reso
mod.logger.CInfow(ctx, "Existing module process stopped. Starting new module process", "module", conf.Name)

if err := mgr.startModule(ctx, mod); err != nil {
// could not start module during reconfiguration, add it to failedModules
mgr.AddToFailedModules(conf.Name)
// If re-addition fails, assume all handled resources are orphaned.
return handledResourceNames, err
}

// reconfiguration successful, remove from failed modules
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if reconfig successful (module gets fixed? or something breaking the module gets fixed) remove from failedModules

mgr.deleteFromFailedModules(conf.Name)

mod.logger.CInfow(ctx, "New module process is running and responding to gRPC requests", "module",
mod.cfg.Name, "module address", mod.addr)

Expand Down Expand Up @@ -847,6 +860,9 @@ func (mgr *Manager) newOnUnexpectedExitHandler(ctx context.Context, mod *module)
"Module has unexpectedly exited.", "module", mod.cfg.Name, "exit_code", exitCode,
)

// Add to failedModules when crash is detected
mgr.AddToFailedModules(mod.cfg.Name)

// There are two relevant calls that may race with a crashing module:
// 1. mgr.Remove, which wants to stop the module and remove it entirely
// 2. mgr.Reconfigure, which wants to stop the module and replace it with
Expand Down Expand Up @@ -899,8 +915,12 @@ func (mgr *Manager) newOnUnexpectedExitHandler(ctx context.Context, mod *module)

err := mgr.attemptRestart(ctx, mod)
if err == nil {
// restart successful, remove module from failedModules
mgr.deleteFromFailedModules(mod.cfg.Name)
break
}
// could not restart crashed module, add it to failedModules
mgr.AddToFailedModules(mod.cfg.Name)
unlock()
utils.SelectContextOrWait(ctx, oueRestartInterval)
}
Expand Down Expand Up @@ -1120,3 +1140,38 @@ func getModuleDataParentDirectory(options modmanageroptions.Options) string {
}
return filepath.Join(options.ViamHomeDir, parentModuleDataFolderName, robotID)
}

// AddToFailedModules adds a failing module to the failedModules map.
func (mgr *Manager) AddToFailedModules(moduleName string) {
mgr.failedModulesMu.Lock()
mgr.failedModules[moduleName] = true
mgr.failedModulesMu.Unlock()
}

func (mgr *Manager) deleteFromFailedModules(moduleName string) {
mgr.failedModulesMu.Lock()
delete(mgr.failedModules, moduleName)
mgr.failedModulesMu.Unlock()
}

// FailedModules returns the names of all failing modules.
func (mgr *Manager) FailedModules() []string {
mgr.failedModulesMu.RLock()
defer mgr.failedModulesMu.RUnlock()
var failedModuleNames []string
for moduleName := range mgr.failedModules {
failedModuleNames = append(failedModuleNames, moduleName)
}
return failedModuleNames
}

// UpdateFailedModules clears the failedModules map at the start of reconfigure.
// Modules will be added to failedModules as they fail during the reconfigure process.
func (mgr *Manager) UpdateFailedModules(newConfigModules []config.Module) {
mgr.failedModulesMu.Lock()
defer mgr.failedModulesMu.Unlock()

for k := range mgr.failedModules {
delete(mgr.failedModules, k)
}
}
16 changes: 15 additions & 1 deletion robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package robotimpl

import (
"context"
"fmt"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -800,7 +802,14 @@ func (r *localRobot) newResource(
resName := conf.ResourceName()
resInfo, ok := resource.LookupRegistration(resName.API, conf.Model)
if !ok {
return nil, errors.Errorf("unknown resource type: API %v with model %v not registered", resName.API, conf.Model)
failedModules := r.manager.moduleManager.FailedModules()
var modules string
if len(failedModules) > 0 {
sort.Strings(failedModules)
modules = fmt.Sprintf("May be in failing module: %v; ", failedModules)
}
return nil, errors.Errorf("unknown resource type: API %v with model %v not registered; "+
"%sThere may be no module in config that provides this model", resName.API, conf.Model, modules)
}

deps, err := r.getDependencies(resName, gNode)
Expand Down Expand Up @@ -1506,6 +1515,11 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config,
}
}()

// We update the failing modules tracker to remove any modules that are no longer in the config.
if r.manager.moduleManager != nil {
r.manager.moduleManager.UpdateFailedModules(newConfig.Modules)
}

if diff.ResourcesEqual {
return
}
Expand Down
15 changes: 10 additions & 5 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,8 @@ func TestResourceStartsOnReconfigure(t *testing.T) {
test.ShouldBeError,
resource.NewNotAvailableError(
base.Named("fake0"),
errors.New(`resource build error: unknown resource type: API rdk:component:base with model rdk:builtin:random not registered`),
errors.New(`resource build error: unknown resource type: API rdk:component:base with model rdk:builtin:random not registered; `+
`There may be no module in config that provides this model`),
),
)
test.That(t, noBase, test.ShouldBeNil)
Expand Down Expand Up @@ -1957,15 +1958,17 @@ func TestOrphanedResources(t *testing.T) {
test.That(t, err, test.ShouldBeError,
resource.NewNotAvailableError(
gizmoapi.Named("g"),
errors.New(`resource build error: unknown resource type: API acme:component:gizmo with model acme:demo:mygizmo not registered`),
errors.New(`resource build error: unknown resource type: API acme:component:gizmo with model acme:demo:mygizmo not registered; `+
`There may be no module in config that provides this model`),
),
)
test.That(t, res, test.ShouldBeNil)
res, err = r.ResourceByName(summationapi.Named("s"))
test.That(t, err, test.ShouldBeError,
resource.NewNotAvailableError(
summationapi.Named("s"),
errors.New(`resource build error: unknown resource type: API acme:service:summation with model acme:demo:mysum not registered`),
errors.New(`resource build error: unknown resource type: API acme:service:summation with model acme:demo:mysum not registered; `+
`There may be no module in config that provides this model`),
),
)
test.That(t, res, test.ShouldBeNil)
Expand Down Expand Up @@ -2324,7 +2327,8 @@ func TestDependentAndOrphanedResources(t *testing.T) {
test.That(t, err, test.ShouldBeError,
resource.NewNotAvailableError(
gizmoapi.Named("g"),
errors.New(`resource build error: unknown resource type: API acme:component:gizmo with model acme:demo:mygizmo not registered`),
errors.New(`resource build error: unknown resource type: API acme:component:gizmo with model acme:demo:mygizmo not registered; `+
`There may be no module in config that provides this model`),
),
)
test.That(t, res, test.ShouldBeNil)
Expand Down Expand Up @@ -2539,7 +2543,8 @@ func TestCrashedModuleReconfigure(t *testing.T) {
test.That(t, err, test.ShouldBeError,
resource.NewNotAvailableError(
generic.Named("h"),
errors.New(`resource build error: unknown resource type: API rdk:component:generic with model rdk:test:helper not registered`),
errors.New(`resource build error: unknown resource type: API rdk:component:generic with model rdk:test:helper not registered; `+
`May be in failing module: [mod]; There may be no module in config that provides this model`),
),
)
})
Expand Down
Loading
Loading