Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions internal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type nvidiaDevicePlugin struct {
cdiHandler cdi.Interface
cdiAnnotationPrefix string

socket string
server *grpc.Server
health chan *rm.Device
stop chan interface{}
socket string
server *grpc.Server
healthy chan *rm.Device
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing a new channel, is there any benefit in having a single channel that accepts a device and the desired status?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could even mark the device as healthy or unhealthy at the point where we send the device on the channel and then keep the health channel as is.

unHealthy chan *rm.Device
stop chan interface{}

imexChannels imex.Channels

Expand Down Expand Up @@ -89,9 +90,10 @@ func (o *options) devicePluginForResource(resourceManager rm.ResourceManager) (I
socket: getPluginSocketPath(resourceManager.Resource()),
// These will be reinitialized every
// time the plugin server is restarted.
server: nil,
health: nil,
stop: nil,
server: nil,
healthy: nil,
unHealthy: nil,
stop: nil,
}
return &plugin, nil
}
Expand All @@ -105,14 +107,16 @@ func getPluginSocketPath(resource spec.ResourceName) string {

func (plugin *nvidiaDevicePlugin) initialize() {
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
plugin.health = make(chan *rm.Device)
plugin.healthy = make(chan *rm.Device)
plugin.unHealthy = make(chan *rm.Device)
plugin.stop = make(chan interface{})
}

func (plugin *nvidiaDevicePlugin) cleanup() {
close(plugin.stop)
plugin.server = nil
plugin.health = nil
plugin.healthy = nil
plugin.unHealthy = nil
plugin.stop = nil
}

Expand Down Expand Up @@ -147,7 +151,7 @@ func (plugin *nvidiaDevicePlugin) Start(kubeletSocket string) error {

go func() {
// TODO: add MPS health check
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
err := plugin.rm.CheckHealth(plugin.stop, plugin.healthy, plugin.unHealthy)
if err != nil {
klog.Errorf("Failed to start health check: %v; continuing with health checks disabled", err)
}
Expand Down Expand Up @@ -270,13 +274,18 @@ func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.D
select {
case <-plugin.stop:
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
case d := <-plugin.unHealthy:
d.Health = pluginapi.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
}
case d := <-plugin.healthy:
d.Health = pluginapi.Healthy
klog.Infof("'%s' device marked healthy: %s", plugin.rm.Resource(), d.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
}
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions internal/rm/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

// CheckHealth performs health checks on a set of devices, writing to the 'unhealthy' channel with any unhealthy devices
func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devices, unhealthy chan<- *Device) error {
func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devices, healthy chan<- *Device, unhealthy chan<- *Device) error {
disableHealthChecks := strings.ToLower(os.Getenv(envDisableHealthChecks))
if disableHealthChecks == "all" {
disableHealthChecks = allHealthChecks
Expand Down Expand Up @@ -147,16 +147,6 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
continue
}

if e.EventType != nvml.EventTypeXidCriticalError {
klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e)
continue
}

if skippedXids[e.EventData] {
klog.Infof("Skipping event %+v", e)
continue
}

klog.Infof("Processing event %+v", e)
eventUUID, ret := e.Device.GetUUID()
if ret != nvml.SUCCESS {
Expand All @@ -174,6 +164,18 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
continue
}

if e.EventType != nvml.EventTypeXidCriticalError {
klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e)
healthy <- d
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are xid events mutually exclusive? Why are we treating non-critical (or skipped events) as indicators of device health?

continue
}

if skippedXids[e.EventData] {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that the device is set as unhealthy due to event A, event B is detected at this time (i.e. satisfying the condition of (e.EventType! =Nvml. EventTypeXidCriticalError)) caused the device to be restored to health. Will this situation lead to the device being erroneously restored to health? I think a reasonable approach should be to restore the device from the same error state before setting it to healthy?

klog.Infof("Skipping event %+v", e)
healthy <- d
continue
}

if d.IsMigDevice() && e.GpuInstanceId != 0xFFFFFFFF && e.ComputeInstanceId != 0xFFFFFFFF {
gi := deviceIDToGiMap[d.ID]
ci := deviceIDToCiMap[d.ID]
Expand Down
4 changes: 2 additions & 2 deletions internal/rm/nvml_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (r *nvmlResourceManager) GetDevicePaths(ids []string) []string {
}

// CheckHealth performs health checks on a set of devices, writing to the 'unhealthy' channel with any unhealthy devices
func (r *nvmlResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error {
return r.checkHealth(stop, r.devices, unhealthy)
func (r *nvmlResourceManager) CheckHealth(stop <-chan interface{}, healthy chan<- *Device, unhealthy chan<- *Device) error {
return r.checkHealth(stop, r.devices, healthy, unhealthy)
}

// getPreferredAllocation runs an allocation algorithm over the inputs.
Expand Down
2 changes: 1 addition & 1 deletion internal/rm/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ResourceManager interface {
Devices() Devices
GetDevicePaths([]string) []string
GetPreferredAllocation(available, required []string, size int) ([]string, error)
CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error
CheckHealth(stop <-chan interface{}, healthy chan<- *Device, unhealthy chan<- *Device) error
ValidateRequest(AnnotatedIDs) error
}

Expand Down
2 changes: 1 addition & 1 deletion internal/rm/tegra_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (r *tegraResourceManager) GetDevicePaths(ids []string) []string {
}

// CheckHealth is disabled for the tegraResourceManager
func (r *tegraResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error {
func (r *tegraResourceManager) CheckHealth(stop <-chan interface{}, healthy chan<- *Device, unhealthy chan<- *Device) error {
return nil
}