diff --git a/cmd/directpv/node-server.go b/cmd/directpv/node-server.go
index aee61104..7635db06 100644
--- a/cmd/directpv/node-server.go
+++ b/cmd/directpv/node-server.go
@@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
+ "time"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/consts"
@@ -33,7 +34,11 @@ import (
"k8s.io/klog/v2"
)
-var metricsPort = consts.MetricsPort
+var (
+ metricsPort = consts.MetricsPort
+ volumeHealthMonitorInterval = 10 * time.Minute
+ enableVolumeHealthMonitor bool
+)
var nodeServerCmd = &cobra.Command{
Use: consts.NodeServerName,
@@ -56,6 +61,8 @@ var nodeServerCmd = &cobra.Command{
func init() {
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
+ nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
+ nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "Interval for volume health monitoring in duration. Example: '20m','1h'")
}
func startNodeServer(ctx context.Context) error {
@@ -114,6 +121,15 @@ func startNodeServer(ctx context.Context) error {
}
}()
+ if enableVolumeHealthMonitor {
+ go func() {
+ if err := volume.RunHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
+ klog.ErrorS(err, "unable to run volume health monitor")
+ errCh <- err
+ }
+ }()
+ }
+
return <-errCh
}
diff --git a/cmd/kubectl-directpv/install.go b/cmd/kubectl-directpv/install.go
index b684505b..9546f40a 100644
--- a/cmd/kubectl-directpv/install.go
+++ b/cmd/kubectl-directpv/install.go
@@ -40,21 +40,22 @@ import (
)
var (
- image = consts.AppName + ":" + Version
- registry = "quay.io"
- org = "minio"
- nodeSelectorArgs = []string{}
- tolerationArgs = []string{}
- seccompProfile = ""
- apparmorProfile = ""
- imagePullSecrets = []string{}
- nodeSelector map[string]string
- tolerations []corev1.Toleration
- k8sVersion = "1.27.0"
- kubeVersion *version.Version
- legacyFlag bool
- declarativeFlag bool
- openshiftFlag bool
+ image = consts.AppName + ":" + Version
+ registry = "quay.io"
+ org = "minio"
+ nodeSelectorArgs = []string{}
+ tolerationArgs = []string{}
+ seccompProfile = ""
+ apparmorProfile = ""
+ imagePullSecrets = []string{}
+ nodeSelector map[string]string
+ tolerations []corev1.Toleration
+ k8sVersion = "1.27.0"
+ kubeVersion *version.Version
+ legacyFlag bool
+ declarativeFlag bool
+ openshiftFlag bool
+ enableVolumeHealthMonitor bool
)
var installCmd = &cobra.Command{
@@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv
7. Install DirectPV with seccomp profile
- $ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
+ $ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json
+
+8. Install DirectPV with volume health monitoring enabled
+ $ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
`{PLUGIN_NAME}`,
consts.AppName,
),
@@ -123,6 +127,7 @@ func init() {
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
installCmd.PersistentFlags().MarkHidden("declarative")
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
+ installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
}
func validateNodeSelectorArgs() error {
@@ -305,8 +310,9 @@ func installMain(ctx context.Context) {
}
}
}
- args.Declarative = declarativeFlag
args.Openshift = openshiftFlag
+ args.Declarative = declarativeFlag
+ args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor
var failed bool
var installedComponents []installer.Component
diff --git a/cmd/kubectl-directpv/list_volumes.go b/cmd/kubectl-directpv/list_volumes.go
index b86bd694..af3a98e9 100644
--- a/cmd/kubectl-directpv/list_volumes.go
+++ b/cmd/kubectl-directpv/list_volumes.go
@@ -241,6 +241,8 @@ func listVolumesMain(ctx context.Context) {
status = "Released"
case volume.IsDriveLost():
status = "Lost"
+ case volume.HasError():
+ status = "Error"
case volume.IsPublished():
status = "Bounded"
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 113927b9..a2ffda5e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -79,3 +79,21 @@ scrape_configs:
action: replace
target_label: kubernetes_name
```
+
+# Volume health monitoring
+
+This is a [CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/) introduced as an Alpha feature in Kubernetes v1.19 and a second Alpha was done in v1.21. This feature is to detect "abnormal" volume conditions and report them as events on PVCs and Pods. A DirectPV volume will be considered as "abnormal" if the respective volume mounts are not present in the host.
+
+For node side monitoring, the feature gate `CSIVolumeHealth` needs to be enabled. However, DirectPV also installs external health monitor controller which monitors and reports volume health events to PVCs.
+
+To enable volume health monitoring, Install directpv with `--enable-volume-health-monitoring` flag.
+
+```sh
+kubectl directpv install --enable-volume-health-monitoring
+```
+
+For private registries, please note that the following image is required for enabling volume health monitoring
+
+```
+quay.io/minio/csi-external-health-monitor-controller:v0.10.0
+```
diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go
index 5c1d06f1..c84f63fd 100644
--- a/pkg/apis/directpv.min.io/types/types.go
+++ b/pkg/apis/directpv.min.io/types/types.go
@@ -120,7 +120,8 @@ type VolumeConditionType string
// Enum value of VolumeConditionType type.
const (
- VolumeConditionTypeLost VolumeConditionType = "Lost"
+ VolumeConditionTypeLost VolumeConditionType = "Lost"
+ VolumeConditionTypeError VolumeConditionType = "Error"
)
// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
@@ -128,7 +129,9 @@ type VolumeConditionReason string
// Enum values of VolumeConditionReason type.
const (
- VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
+ VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
+ VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
+ VolumeConditionReasonNoError VolumeConditionReason = "NoError"
)
// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
@@ -136,7 +139,9 @@ type VolumeConditionMessage string
// Enum values of VolumeConditionMessage type.
const (
- VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
+ VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
+ VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
+ VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
)
// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.
diff --git a/pkg/apis/directpv.min.io/v1beta1/volume.go b/pkg/apis/directpv.min.io/v1beta1/volume.go
index 2ab19724..683f5f47 100644
--- a/pkg/apis/directpv.min.io/v1beta1/volume.go
+++ b/pkg/apis/directpv.min.io/v1beta1/volume.go
@@ -19,8 +19,10 @@ package v1beta1
import (
"strconv"
+ "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -123,6 +125,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
return false
}
+// HasError returns if the volume is in error state.
+func (volume DirectPVVolume) HasError() bool {
+ condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
+ return condition != nil && condition.Status == metav1.ConditionTrue
+}
+
// SetDriveLost sets associated drive is lost.
func (volume *DirectPVVolume) SetDriveLost() {
c := metav1.Condition{
@@ -316,6 +324,39 @@ func (volume *DirectPVVolume) Resume() bool {
return volume.RemoveLabel(types.SuspendLabelKey)
}
+// ResetStageMountErrorCondition resets the stage volume mount error condition.
+func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
+ k8s.ResetConditionIfMatches(volume.Status.Conditions,
+ string(types.VolumeConditionTypeError),
+ string(types.VolumeConditionReasonNotMounted),
+ string(types.VolumeConditionMessageStagingPathNotMounted),
+ string(types.VolumeConditionReasonNoError))
+}
+
+// ResetTargetMountErrorCondition resets the target volume mount error condition.
+func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
+ k8s.ResetConditionIfMatches(volume.Status.Conditions,
+ string(types.VolumeConditionTypeError),
+ string(types.VolumeConditionReasonNotMounted),
+ string(types.VolumeConditionMessageTargetPathNotMounted),
+ string(types.VolumeConditionReasonNoError))
+}
+
+// GetCSIVolumeCondition returns the CSI volume condition.
+func (volume *DirectPVVolume) GetCSIVolumeCondition() *csi.VolumeCondition {
+ var isAbnormal bool
+ var message string
+ errorCondition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
+ if errorCondition != nil && errorCondition.Status == metav1.ConditionTrue {
+ isAbnormal = true
+ message = errorCondition.Message
+ }
+ return &csi.VolumeCondition{
+ Abnormal: isAbnormal,
+ Message: message,
+ }
+}
+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// DirectPVVolumeList denotes list of volumes.
diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go
index 9bee57e0..f8a474b3 100644
--- a/pkg/consts/consts.go
+++ b/pkg/consts/consts.go
@@ -97,4 +97,7 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // Volume Health Monitor
+ VolumeHealthMonitorIntervalInDuration = "10m"
)
diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in
index bf50b159..a2f406de 100644
--- a/pkg/consts/consts.go.in
+++ b/pkg/consts/consts.go.in
@@ -95,4 +95,7 @@ const (
// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
+
+ // Volume Health Monitor
+ VolumeHealthMonitorIntervalInDuration = "10m"
)
diff --git a/pkg/csi/controller/server.go b/pkg/csi/controller/server.go
index 12bfd7bb..eb13199f 100644
--- a/pkg/csi/controller/server.go
+++ b/pkg/csi/controller/server.go
@@ -97,6 +97,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
},
},
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
+ },
+ },
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
+ },
+ },
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
+ },
+ },
},
}, nil
}
@@ -359,8 +374,52 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller
// ListVolumes implements ListVolumes controller RPC
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
-func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
- return nil, status.Error(codes.Unimplemented, "unimplemented")
+func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
+ result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
+ Limit: int64(req.GetMaxEntries()),
+ Continue: req.GetStartingToken(),
+ })
+ if err != nil {
+ if req.GetStartingToken() != "" {
+ return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
+ }
+ return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
+ }
+ var volumeEntries []*csi.ListVolumesResponse_Entry
+ for _, volume := range result.Items {
+ csiVolume, err := getCSIVolume(ctx, &volume)
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
+ Volume: csiVolume,
+ Status: &csi.ListVolumesResponse_VolumeStatus{
+ VolumeCondition: volume.GetCSIVolumeCondition(),
+ },
+ })
+ }
+ return &csi.ListVolumesResponse{
+ Entries: volumeEntries,
+ NextToken: result.Continue,
+ }, nil
+}
+
+func getCSIVolume(ctx context.Context, volume *types.Volume) (*csi.Volume, error) {
+ drive, err := client.DriveClient().Get(ctx, string(volume.GetDriveID()), metav1.GetOptions{
+ TypeMeta: types.NewDriveTypeMeta(),
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &csi.Volume{
+ CapacityBytes: volume.Status.TotalCapacity,
+ VolumeId: volume.Name,
+ AccessibleTopology: []*csi.Topology{
+ {
+ Segments: drive.Status.Topology,
+ },
+ },
+ }, nil
}
// ControllerPublishVolume - controller RPC to publish volumes
@@ -377,8 +436,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU
// ControllerGetVolume - controller RPC for get volume
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
-func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
- return nil, status.Error(codes.Unimplemented, "unimplemented")
+func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
+ volume, err := client.VolumeClient().Get(
+ ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
+ )
+ if err != nil {
+ return nil, status.Error(codes.NotFound, err.Error())
+ }
+ csiVolume, err := getCSIVolume(ctx, volume)
+ if err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ return &csi.ControllerGetVolumeResponse{
+ Volume: csiVolume,
+ Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
+ VolumeCondition: volume.GetCSIVolumeCondition(),
+ },
+ }, nil
}
// ListSnapshots - controller RPC for listing snapshots
diff --git a/pkg/csi/controller/server_test.go b/pkg/csi/controller/server_test.go
index 72ea13f6..3100e4fb 100644
--- a/pkg/csi/controller/server_test.go
+++ b/pkg/csi/controller/server_test.go
@@ -275,6 +275,21 @@ func TestControllerGetCapabilities(t *testing.T) {
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
},
},
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
+ },
+ },
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
+ },
+ },
+ {
+ Type: &csi.ControllerServiceCapability_Rpc{
+ Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
+ },
+ },
},
}
if !reflect.DeepEqual(result, expectedResult) {
@@ -331,8 +346,131 @@ func TestValidateVolumeCapabilities(t *testing.T) {
}
func TestListVolumes(t *testing.T) {
- if _, err := NewServer().ListVolumes(context.TODO(), nil); err == nil {
- t.Fatal("error expected")
+ testObjects := []runtime.Object{
+ &types.Drive{
+ TypeMeta: types.NewDriveTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-drive",
+ },
+ Status: types.DriveStatus{
+ Topology: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-abnormal-volume-1",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionTrue,
+ Message: string(directpvtypes.VolumeConditionMessageStagingPathNotMounted),
+ Reason: string(directpvtypes.VolumeConditionReasonNotMounted),
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-abnormal-volume-2",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionTrue,
+ Message: string(directpvtypes.VolumeConditionMessageTargetPathNotMounted),
+ Reason: string(directpvtypes.VolumeConditionReasonNotMounted),
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-normal-volume-1",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionFalse,
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ }
+
+ ctx := context.TODO()
+ cl := NewServer()
+ clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset(testObjects...))
+ client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
+ client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
+
+ getListVolumeResponseEntry := func(volumeId string, abnormal bool, message string) *csi.ListVolumesResponse_Entry {
+ return &csi.ListVolumesResponse_Entry{
+ Volume: &csi.Volume{
+ CapacityBytes: int64(100),
+ VolumeId: volumeId,
+ AccessibleTopology: []*csi.Topology{
+ {
+ Segments: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
+ },
+ },
+ },
+ Status: &csi.ListVolumesResponse_VolumeStatus{
+ VolumeCondition: &csi.VolumeCondition{
+ Abnormal: abnormal,
+ Message: message,
+ },
+ },
+ }
+ }
+
+ expectedListVolumeResponseEntries := []*csi.ListVolumesResponse_Entry{
+ getListVolumeResponseEntry("test-abnormal-volume-1", true, string(directpvtypes.VolumeConditionMessageStagingPathNotMounted)),
+ getListVolumeResponseEntry("test-abnormal-volume-2", true, string(directpvtypes.VolumeConditionMessageTargetPathNotMounted)),
+ getListVolumeResponseEntry("test-normal-volume-1", false, ""),
+ }
+
+ req := &csi.ListVolumesRequest{
+ MaxEntries: int32(3),
+ StartingToken: "",
+ }
+ listVolumesRes, err := cl.ListVolumes(ctx, req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ listVolumeResponseEntries := listVolumesRes.GetEntries()
+ if !reflect.DeepEqual(listVolumeResponseEntries, expectedListVolumeResponseEntries) {
+ t.Fatalf("expected volume response entries: %#+v, got: %#+v\n", expectedListVolumeResponseEntries, listVolumeResponseEntries)
}
}
@@ -396,8 +534,146 @@ func TestControllerExpandVolume(t *testing.T) {
}
func TestControllerGetVolume(t *testing.T) {
- if _, err := NewServer().ControllerGetVolume(context.TODO(), nil); err == nil {
- t.Fatal("error expected")
+ testObjects := []runtime.Object{
+ &types.Drive{
+ TypeMeta: types.NewDriveTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-drive",
+ },
+ Status: types.DriveStatus{
+ Topology: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-abnormal-volume-1",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionTrue,
+ Message: string(directpvtypes.VolumeConditionMessageStagingPathNotMounted),
+ Reason: string(directpvtypes.VolumeConditionReasonNotMounted),
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-abnormal-volume-2",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionTrue,
+ Message: string(directpvtypes.VolumeConditionMessageTargetPathNotMounted),
+ Reason: string(directpvtypes.VolumeConditionReasonNotMounted),
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ &types.Volume{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-normal-volume-1",
+ Labels: map[string]string{
+ string(directpvtypes.DriveLabelKey): "test-drive",
+ string(directpvtypes.NodeLabelKey): "N1",
+ string(directpvtypes.DriveNameLabelKey): "/dev/test-drive",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ TotalCapacity: int64(100),
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionFalse,
+ LastTransitionTime: metav1.Now(),
+ },
+ },
+ },
+ },
+ }
+
+ ctx := context.TODO()
+ cl := NewServer()
+ clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset(testObjects...))
+ client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
+ client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
+
+ getControllerGetVolumeResponse := func(volumeId string, abnormal bool, message string) *csi.ControllerGetVolumeResponse {
+ return &csi.ControllerGetVolumeResponse{
+ Volume: &csi.Volume{
+ CapacityBytes: int64(100),
+ VolumeId: volumeId,
+ AccessibleTopology: []*csi.Topology{
+ {
+ Segments: map[string]string{"node": "N1", "rack": "RK1", "zone": "Z1", "region": "R1"},
+ },
+ },
+ },
+ Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
+ VolumeCondition: &csi.VolumeCondition{
+ Abnormal: abnormal,
+ Message: message,
+ },
+ },
+ }
+ }
+
+ testCases := []struct {
+ req *csi.ControllerGetVolumeRequest
+ expectedRes *csi.ControllerGetVolumeResponse
+ }{
+ {
+ req: &csi.ControllerGetVolumeRequest{
+ VolumeId: "test-abnormal-volume-1",
+ },
+ expectedRes: getControllerGetVolumeResponse("test-abnormal-volume-1", true, string(directpvtypes.VolumeConditionMessageStagingPathNotMounted)),
+ },
+ {
+ req: &csi.ControllerGetVolumeRequest{
+ VolumeId: "test-abnormal-volume-2",
+ },
+ expectedRes: getControllerGetVolumeResponse("test-abnormal-volume-2", true, string(directpvtypes.VolumeConditionMessageTargetPathNotMounted)),
+ },
+ {
+ req: &csi.ControllerGetVolumeRequest{
+ VolumeId: "test-normal-volume-1",
+ },
+ expectedRes: getControllerGetVolumeResponse("test-normal-volume-1", false, ""),
+ },
+ }
+
+ for i, testCase := range testCases {
+ result, err := cl.ControllerGetVolume(ctx, testCase.req)
+ if err != nil {
+ t.Fatalf("case %v: unexpected error %v", i+1, err)
+ }
+ if !reflect.DeepEqual(result, testCase.expectedRes) {
+ t.Fatalf("case %v: expected: %#+v, got: %#+v\n", i+1, testCase.expectedRes, result)
+ }
}
}
diff --git a/pkg/csi/node/fake.go b/pkg/csi/node/fake.go
index a8fae524..55dc0e25 100644
--- a/pkg/csi/node/fake.go
+++ b/pkg/csi/node/fake.go
@@ -25,15 +25,21 @@ import (
"github.com/minio/directpv/pkg/xfs"
)
-const testNodeName = "test-node"
+const (
+ testNodeName = "test-node"
+ testIdentityName = "test-identity"
+ testRackName = "test-rack"
+ testZoneName = "test-zone"
+ testRegionName = "test-region"
+)
func createFakeServer() *Server {
return &Server{
nodeID: testNodeName,
- identity: "test-identity",
- rack: "test-rack",
- zone: "test-zone",
- region: "test-region",
+ identity: testIdentityName,
+ rack: testRackName,
+ zone: testZoneName,
+ region: testRegionName,
getMounts: func() (map[string]utils.StringSet, map[string]utils.StringSet, error) {
return map[string]utils.StringSet{consts.MountRootDir: nil}, map[string]utils.StringSet{consts.MountRootDir: nil}, nil
},
diff --git a/pkg/csi/node/publish_unpublish.go b/pkg/csi/node/publish_unpublish.go
index 46b55dea..1a693fce 100644
--- a/pkg/csi/node/publish_unpublish.go
+++ b/pkg/csi/node/publish_unpublish.go
@@ -134,6 +134,7 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
}
+ volume.ResetTargetMountErrorCondition()
volume.Status.TargetPath = req.GetTargetPath()
_, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
@@ -212,6 +213,7 @@ func (server *Server) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}
if volume.Status.TargetPath == targetPath {
+ volume.ResetTargetMountErrorCondition()
volume.Status.TargetPath = ""
if _, err := client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
diff --git a/pkg/csi/node/server.go b/pkg/csi/node/server.go
index f3360085..0f27dc5c 100644
--- a/pkg/csi/node/server.go
+++ b/pkg/csi/node/server.go
@@ -123,6 +123,7 @@ func (server *Server) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapab
nodeCap(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
nodeCap(csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME),
nodeCap(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
+ nodeCap(csi.NodeServiceCapability_RPC_VOLUME_CONDITION),
},
}, nil
}
@@ -144,6 +145,15 @@ func (server *Server) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo
return nil, status.Error(codes.NotFound, err.Error())
}
+ if volumeCondition := volume.GetCSIVolumeCondition(); volumeCondition != nil && volumeCondition.GetAbnormal() {
+ res := &csi.NodeGetVolumeStatsResponse{}
+ res.Usage = []*csi.VolumeUsage{
+ {},
+ }
+ res.VolumeCondition = volumeCondition
+ return res, nil
+ }
+
device, err := server.getDeviceByFSUUID(volume.Status.FSUUID)
if err != nil {
klog.ErrorS(
diff --git a/pkg/csi/node/server_test.go b/pkg/csi/node/server_test.go
index 2ac51731..5f0019cf 100644
--- a/pkg/csi/node/server_test.go
+++ b/pkg/csi/node/server_test.go
@@ -18,15 +18,177 @@ package node
import (
"context"
+ "reflect"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
clientsetfake "github.com/minio/directpv/pkg/clientset/fake"
+ "github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/xfs"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
)
+func TestNodeGetInfo(t *testing.T) {
+ result, err := createFakeServer().NodeGetInfo(context.TODO(), nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ expectedResult := &csi.NodeGetInfoResponse{
+ NodeId: testNodeName,
+ AccessibleTopology: &csi.Topology{
+ Segments: map[string]string{
+ string(directpvtypes.TopologyDriverIdentity): testIdentityName,
+ string(directpvtypes.TopologyDriverRack): testRackName,
+ string(directpvtypes.TopologyDriverZone): testZoneName,
+ string(directpvtypes.TopologyDriverRegion): testRegionName,
+ string(directpvtypes.TopologyDriverNode): testNodeName,
+ },
+ },
+ }
+ if !reflect.DeepEqual(result, expectedResult) {
+ t.Fatalf("expected: %#+v, got: %#+v\n", expectedResult, result)
+ }
+}
+
+func TestNodeGetCapabilities(t *testing.T) {
+ result, err := createFakeServer().NodeGetCapabilities(context.TODO(), nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ expectedResult := &csi.NodeGetCapabilitiesResponse{
+ Capabilities: []*csi.NodeServiceCapability{
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
+ },
+ },
+ },
+ {
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{
+ Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
+ },
+ },
+ },
+ },
+ }
+ if !reflect.DeepEqual(result, expectedResult) {
+ t.Fatalf("expected: %#+v, got: %#+v\n", expectedResult, result)
+ }
+}
+
+func TestNodeGetVolumeStats(t *testing.T) {
+ testObjects := []runtime.Object{
+ &types.Volume{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "volume-1",
+ Labels: map[string]string{
+ string(directpvtypes.NodeLabelKey): "test-node",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ StagingTargetPath: "/stagingpath/volume-1",
+ TargetPath: "/targetpath/cvolume-1",
+ Conditions: []metav1.Condition{},
+ },
+ },
+ &types.Volume{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "volume-2",
+ Labels: map[string]string{
+ string(directpvtypes.NodeLabelKey): "test-node",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ StagingTargetPath: "/stagingpath/volume-2",
+ TargetPath: "/containerpath/volume-2",
+ Conditions: []metav1.Condition{
+ {
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionTrue,
+ Reason: string(directpvtypes.VolumeConditionReasonNotMounted),
+ Message: string(directpvtypes.VolumeConditionMessageStagingPathNotMounted),
+ },
+ },
+ },
+ },
+ }
+
+ testCases := []struct {
+ request *csi.NodeGetVolumeStatsRequest
+ expectedResponse *csi.NodeGetVolumeStatsResponse
+ }{
+ {
+ request: &csi.NodeGetVolumeStatsRequest{
+ VolumeId: "volume-1",
+ VolumePath: "/stagingpath/volume-1",
+ },
+ expectedResponse: &csi.NodeGetVolumeStatsResponse{
+ Usage: []*csi.VolumeUsage{
+ {
+ Unit: csi.VolumeUsage_BYTES,
+ },
+ },
+ VolumeCondition: &csi.VolumeCondition{
+ Abnormal: false,
+ Message: "",
+ },
+ },
+ },
+ {
+ request: &csi.NodeGetVolumeStatsRequest{
+ VolumeId: "volume-2",
+ VolumePath: "/stagingpath/volume-2",
+ },
+ expectedResponse: &csi.NodeGetVolumeStatsResponse{
+ Usage: []*csi.VolumeUsage{
+ {},
+ },
+ VolumeCondition: &csi.VolumeCondition{
+ Abnormal: true,
+ Message: string(directpvtypes.VolumeConditionMessageStagingPathNotMounted),
+ },
+ },
+ },
+ }
+ nodeServer := createFakeServer()
+ clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset(testObjects...))
+ client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
+ client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
+
+ ctx := context.TODO()
+ for i, testCase := range testCases {
+ response, err := nodeServer.NodeGetVolumeStats(ctx, testCase.request)
+ if err != nil {
+ t.Fatalf("case %v: unexpected error %v", i+1, err)
+ }
+ if !reflect.DeepEqual(response, testCase.expectedResponse) {
+ t.Fatalf("case %v: expected: %#+v, got: %#+v\n", i+1, testCase.expectedResponse, response)
+ }
+ }
+}
+
func TestNodeExpandVolume(t *testing.T) {
volumeID := "volume-id-1"
volume := types.NewVolume(volumeID, "fsuuid1", "node-1", "drive-1", "sda", 100*MiB)
diff --git a/pkg/csi/node/stage_unstage.go b/pkg/csi/node/stage_unstage.go
index 059945ac..7bc7160b 100644
--- a/pkg/csi/node/stage_unstage.go
+++ b/pkg/csi/node/stage_unstage.go
@@ -110,6 +110,7 @@ func (server *Server) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
if volume.Status.StagingTargetPath == stagingTargetPath {
+ volume.ResetStageMountErrorCondition()
volume.Status.StagingTargetPath = ""
if _, err := client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
diff --git a/pkg/drive/event.go b/pkg/drive/event.go
index 2eefd41b..dc15c4dc 100644
--- a/pkg/drive/event.go
+++ b/pkg/drive/event.go
@@ -138,6 +138,7 @@ func StageVolume(
}
}
+ volume.ResetStageMountErrorCondition()
volume.Status.DataPath = volumeDir
volume.Status.StagingTargetPath = stagingTargetPath
volume.Status.Status = directpvtypes.VolumeStatusReady
diff --git a/pkg/installer/args.go b/pkg/installer/args.go
index fde3c5f8..e259b080 100644
--- a/pkg/installer/args.go
+++ b/pkg/installer/args.go
@@ -47,6 +47,9 @@ const (
// csiResizerImage = csi-resizer:v1.8.0
csiResizerImage = "csi-resizer@sha256:819f68a4daf75acec336302843f303cf360d4941249f9f5019ffbb690c8ac7c0"
openshiftCSIResizerImage = "registry.redhat.io/openshift4/ose-csi-external-resizer-rhel8@sha256:837b32a0c432123e2605ad6d029e7f3b4489d9c52a9d272c7a133d41ad10db87"
+
+ // csiHealthMonitorImage = csi-external-health-monitor-controller:v0.10.0
+ csiHealthMonitorImage = "registry.k8s.io/sig-storage/csi-external-health-monitor-controller:v0.10.0"
)
// Args represents DirectPV installation arguments.
@@ -54,23 +57,24 @@ type Args struct {
image string
// Optional arguments
- Registry string
- Org string
- ImagePullSecrets []string
- NodeSelector map[string]string
- Tolerations []corev1.Toleration
- SeccompProfile string
- AppArmorProfile string
- Quiet bool
- KubeVersion *version.Version
- Legacy bool
- ObjectWriter io.Writer
- DryRun bool
- Declarative bool
- Openshift bool
- ObjectMarshaler func(runtime.Object) ([]byte, error)
- ProgressCh chan<- Message
- ForceUninstall bool
+ Registry string
+ Org string
+ ImagePullSecrets []string
+ NodeSelector map[string]string
+ Tolerations []corev1.Toleration
+ SeccompProfile string
+ AppArmorProfile string
+ Quiet bool
+ KubeVersion *version.Version
+ Legacy bool
+ ObjectWriter io.Writer
+ DryRun bool
+ Declarative bool
+ Openshift bool
+ ObjectMarshaler func(runtime.Object) ([]byte, error)
+ ProgressCh chan<- Message
+ ForceUninstall bool
+ EnableVolumeHealthMonitor bool
podSecurityAdmission bool
csiProvisionerImage string
@@ -167,3 +171,7 @@ func (args *Args) getCSIResizerImage() string {
}
return path.Join(args.Registry, args.Org, args.csiResizerImage)
}
+
+func (args *Args) getCSIHealthMonitorImage() string {
+ return csiHealthMonitorImage
+}
diff --git a/pkg/installer/daemonset.go b/pkg/installer/daemonset.go
index 8e067711..b0478660 100644
--- a/pkg/installer/daemonset.go
+++ b/pkg/installer/daemonset.go
@@ -249,7 +249,7 @@ func doCreateDaemonset(ctx context.Context, args *Args) (err error) {
securityContext := newSecurityContext(args.SeccompProfile)
pluginSocketDir := newPluginsSocketDir(kubeletDirPath, consts.Identity)
volumes, volumeMounts := getVolumesAndMounts(pluginSocketDir)
- containerArgs := []string{
+ nodeServerArgs := []string{
consts.NodeServerName,
fmt.Sprintf("-v=%d", logLevel),
fmt.Sprintf("--identity=%s", consts.Identity),
@@ -258,6 +258,12 @@ func doCreateDaemonset(ctx context.Context, args *Args) (err error) {
fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort),
fmt.Sprintf("--metrics-port=%d", consts.MetricsPort),
}
+ if args.EnableVolumeHealthMonitor {
+ nodeServerArgs = append(nodeServerArgs,
+ fmt.Sprintf("--enable-volume-health-monitor"),
+ fmt.Sprintf("--volume-health-monitor-interval=%s", consts.VolumeHealthMonitorIntervalInDuration),
+ )
+ }
nodeControllerArgs := []string{
consts.NodeControllerName,
fmt.Sprintf("-v=%d", logLevel),
@@ -272,7 +278,7 @@ func doCreateDaemonset(ctx context.Context, args *Args) (err error) {
ImagePullSecrets: args.getImagePullSecrets(),
Containers: []corev1.Container{
nodeDriverRegistrarContainer(args.getNodeDriverRegistrarImage(), pluginSocketDir),
- nodeServerContainer(args.getContainerImage(), containerArgs, securityContext, volumeMounts),
+ nodeServerContainer(args.getContainerImage(), nodeServerArgs, securityContext, volumeMounts),
nodeControllerContainer(args.getContainerImage(), nodeControllerArgs, securityContext, volumeMounts),
livenessProbeContainer(args.getLivenessProbeImage()),
},
@@ -320,7 +326,7 @@ func doCreateLegacyDaemonset(ctx context.Context, args *Args) (err error) {
securityContext := newSecurityContext(args.SeccompProfile)
pluginSocketDir := newPluginsSocketDir(kubeletDirPath, legacyclient.Identity)
volumes, volumeMounts := getVolumesAndMounts(pluginSocketDir)
- containerArgs := []string{
+ nodeServerArgs := []string{
consts.LegacyNodeServerName,
fmt.Sprintf("-v=%d", logLevel),
fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName),
@@ -336,7 +342,7 @@ func doCreateLegacyDaemonset(ctx context.Context, args *Args) (err error) {
ImagePullSecrets: args.getImagePullSecrets(),
Containers: []corev1.Container{
nodeDriverRegistrarContainer(args.getNodeDriverRegistrarImage(), pluginSocketDir),
- nodeServerContainer(args.getContainerImage(), containerArgs, securityContext, volumeMounts),
+ nodeServerContainer(args.getContainerImage(), nodeServerArgs, securityContext, volumeMounts),
livenessProbeContainer(args.getLivenessProbeImage()),
},
NodeSelector: args.NodeSelector,
diff --git a/pkg/installer/deployment.go b/pkg/installer/deployment.go
index af1b8aea..919fe952 100644
--- a/pkg/installer/deployment.go
+++ b/pkg/installer/deployment.go
@@ -176,6 +176,28 @@ func doCreateDeployment(ctx context.Context, args *Args, legacy bool, step int)
},
}
+ if args.EnableVolumeHealthMonitor {
+ podSpec.Containers = append(podSpec.Containers, corev1.Container{
+ Name: "volume-health-monitor",
+ Image: args.getCSIHealthMonitorImage(),
+ Args: []string{
+ fmt.Sprintf("--v=%d", logLevel),
+ "--timeout=300s",
+ fmt.Sprintf("--csi-address=$(%s)", csiEndpointEnvVarName),
+ "--leader-election",
+ },
+ Env: []corev1.EnvVar{csiEndpointEnvVar},
+ VolumeMounts: []corev1.VolumeMount{
+ newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false),
+ },
+ TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
+ TerminationMessagePath: "/var/log/controller-csi-resizer-termination-log",
+ SecurityContext: &corev1.SecurityContext{
+ Privileged: &privileged,
+ },
+ })
+ }
+
var selectorValue string
if !args.DryRun {
deployment, err := k8s.KubeClient().AppsV1().Deployments(namespace).Get(
diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go
index b33d360b..4ef91a76 100644
--- a/pkg/k8s/k8s.go
+++ b/pkg/k8s/k8s.go
@@ -122,16 +122,43 @@ func IsCondition(conditions []metav1.Condition, ctype string, status metav1.Cond
}
// UpdateCondition updates type/status/reason/message of conditions matched by condition type.
-func UpdateCondition(conditions []metav1.Condition, ctype string, status metav1.ConditionStatus, reason, message string) {
+func UpdateCondition(conditions []metav1.Condition, ctype string, status metav1.ConditionStatus, reason, message string) ([]metav1.Condition, bool) {
+ if condition := GetConditionByType(conditions, ctype); condition != nil {
+ var updated bool
+ if condition.Status != status {
+ condition.Status = status
+ updated = true
+ }
+ if condition.Reason != reason {
+ condition.Reason = reason
+ updated = true
+ }
+ if condition.Message != message {
+ condition.Message = message
+ updated = true
+ }
+ if updated {
+ condition.LastTransitionTime = metav1.Now()
+ }
+ return conditions, updated
+ }
+ return append(conditions, metav1.Condition{
+ Type: ctype,
+ Status: status,
+ Reason: reason,
+ Message: message,
+ LastTransitionTime: metav1.Now(),
+ }), true
+}
+
+// GetConditionByType returns the condition by type.
+func GetConditionByType(conditions []metav1.Condition, ctype string) *metav1.Condition {
for i := range conditions {
if conditions[i].Type == ctype {
- conditions[i].Status = status
- conditions[i].Reason = reason
- conditions[i].Message = message
- conditions[i].LastTransitionTime = metav1.Now()
- break
+ return &conditions[i]
}
}
+ return nil
}
// MatchTrueConditions matches whether types and status list are in a true conditions or not.
@@ -162,6 +189,17 @@ func BoolToConditionStatus(val bool) metav1.ConditionStatus {
return metav1.ConditionFalse
}
+// ResetConditionIfMatches resets the condition values to default if the type, reason and message matches.
+func ResetConditionIfMatches(conditions []metav1.Condition, ctype string, reason, message, newReason string) {
+ for i := range conditions {
+ if conditions[i].Type == ctype && conditions[i].Reason == reason && conditions[i].Message == message {
+ conditions[i].Status = metav1.ConditionFalse
+ conditions[i].Reason = newReason
+ conditions[i].Message = ""
+ }
+ }
+}
+
// SanitizeResourceName - Sanitize given name to a valid kubernetes name format.
// RegEx for a kubernetes name is
//
diff --git a/pkg/volume/volume-health.go b/pkg/volume/volume-health.go
new file mode 100644
index 00000000..1422ff31
--- /dev/null
+++ b/pkg/volume/volume-health.go
@@ -0,0 +1,115 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package volume
+
+import (
+ "context"
+ "time"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/sys"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/utils"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+)
+
+// RunHealthMonitor periodically checks for volume health and updates the condition if the volume is in error state.
+func RunHealthMonitor(ctx context.Context, nodeID directpvtypes.NodeID, interval time.Duration) error {
+ healthCheckTicker := time.NewTicker(interval)
+ defer healthCheckTicker.Stop()
+ for {
+ select {
+ case <-healthCheckTicker.C:
+ if err := checkVolumesHealth(ctx, nodeID, getMountpointsByVolumeName); err != nil {
+ return err
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+}
+
+func checkVolumesHealth(ctx context.Context, nodeID directpvtypes.NodeID, getVolumeMounts func(string) utils.StringSet) error {
+ volumes, err := NewLister().NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(nodeID))}).Get(ctx)
+ if err != nil {
+ return err
+ }
+ for _, volume := range volumes {
+ if !volume.IsStaged() && !volume.IsPublished() {
+ continue
+ }
+ checkVolumeHealth(ctx, volume.Name, getVolumeMounts)
+ }
+ return nil
+}
+
+func checkVolumeHealth(ctx context.Context, volumeName string, getVolumeMounts func(string) utils.StringSet) {
+ volume, err := client.VolumeClient().Get(
+ ctx, volumeName, metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
+ )
+ if err != nil {
+ klog.V(5).ErrorS(err, "unable to get the volume", "volume", volumeName)
+ return
+ }
+ if err = checkVolumeMounts(ctx, volume, getVolumeMounts); err != nil {
+ klog.V(5).ErrorS(err, "unable to check the volume mounts", "volume", volumeName)
+ return
+ }
+ return
+}
+
+func checkVolumeMounts(ctx context.Context, volume *types.Volume, getVolumeMounts func(string) utils.StringSet) (err error) {
+ var message string
+ mountExists := true
+ reason := string(directpvtypes.VolumeConditionReasonNoError)
+ mountPoints := getVolumeMounts(volume.Name)
+ if volume.IsPublished() && !mountPoints.Exist(volume.Status.TargetPath) {
+ mountExists = false
+ message = string(directpvtypes.VolumeConditionMessageTargetPathNotMounted)
+ }
+ if volume.IsStaged() && !mountPoints.Exist(volume.Status.StagingTargetPath) {
+ mountExists = false
+ message = string(directpvtypes.VolumeConditionMessageStagingPathNotMounted)
+ }
+ if !mountExists {
+ reason = string(directpvtypes.VolumeConditionReasonNotMounted)
+ }
+ if updatedConditions, updated := k8s.UpdateCondition(
+ volume.Status.Conditions,
+ string(directpvtypes.VolumeConditionTypeError),
+ k8s.BoolToConditionStatus(!mountExists),
+ reason,
+ message,
+ ); updated {
+ volume.Status.Conditions = updatedConditions
+ _, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()})
+ }
+
+ return
+}
+
+func getMountpointsByVolumeName(volumeName string) utils.StringSet {
+ _, _, _, rootMountMap, err := sys.GetMounts(false)
+ if err != nil {
+ klog.V(5).ErrorS(err, "unable to get mountpoints by volume name", "volume name", volumeName)
+ return nil
+ }
+ return rootMountMap["/"+volumeName]
+}
diff --git a/pkg/volume/volume-health_test.go b/pkg/volume/volume-health_test.go
new file mode 100644
index 00000000..e5935c2d
--- /dev/null
+++ b/pkg/volume/volume-health_test.go
@@ -0,0 +1,138 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2021, 2022, 2023 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package volume
+
+import (
+ "context"
+ "testing"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ clientsetfake "github.com/minio/directpv/pkg/clientset/fake"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/utils"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+func newTestVolume(name string, stagingPath, containerPath string, errorCondition *metav1.Condition) *types.Volume {
+ volume := &types.Volume{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Labels: map[string]string{
+ string(directpvtypes.NodeLabelKey): "test-node",
+ string(directpvtypes.CreatedByLabelKey): consts.ControllerName,
+ },
+ },
+ Status: types.VolumeStatus{
+ StagingTargetPath: stagingPath,
+ TargetPath: containerPath,
+ Conditions: []metav1.Condition{},
+ },
+ }
+ if errorCondition != nil {
+ volume.Status.Conditions = append(volume.Status.Conditions, *errorCondition)
+ }
+ return volume
+}
+
+func newErrorCondition(hasError bool, message string) *metav1.Condition {
+ condition := metav1.Condition{
+ Type: string(directpvtypes.VolumeConditionTypeError),
+ Status: metav1.ConditionFalse,
+ Reason: string(directpvtypes.VolumeConditionReasonNoError),
+ Message: message,
+ }
+ if hasError {
+ condition.Status = metav1.ConditionTrue
+ condition.Reason = string(directpvtypes.VolumeConditionReasonNotMounted)
+ }
+ return &condition
+}
+
+func TestCheckVolumesHealth(t *testing.T) {
+ objects := []runtime.Object{
+ newTestVolume("volume-1", "/stagingpath/volume-1", "/containerpath/volume-1", nil),
+ newTestVolume("volume-2", "/stagingpath/volume-2", "/containerpath/volume-2", newErrorCondition(false, "")),
+ newTestVolume("volume-3", "/stagingpath/volume-3", "/containerpath/volume-3", newErrorCondition(false, "")),
+ newTestVolume("volume-4", "/stagingpath/volume-4", "/containerpath/volume-4", newErrorCondition(false, "")),
+ newTestVolume("volume-5", "/stagingpath/volume-5", "/containerpath/volume-5", newErrorCondition(false, "")),
+ }
+
+ clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset(objects...))
+ client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
+
+ getMountsFn := func(volumeName string) (m utils.StringSet) {
+ m = make(utils.StringSet)
+ switch volumeName {
+ case "volume-1":
+ m["/stagingpath/volume-1"] = struct{}{}
+ m["/containerpath/volume-1"] = struct{}{}
+ return
+ case "volume-2":
+ m["/stagingpath/volume-2"] = struct{}{}
+ m["/containerpath/volume-2"] = struct{}{}
+ return
+ case "volume-3":
+ m["/containerpath/volume-3"] = struct{}{}
+ return
+ case "volume-4":
+ m["/stagingpath/volume-4"] = struct{}{}
+ return
+ case "volume-5":
+ m["/stagingpath/volume-x"] = struct{}{}
+ m["/containerpath/volume-x"] = struct{}{}
+ return
+ default:
+ return
+ }
+ }
+
+ expectedErrorConditions := map[string]*metav1.Condition{
+ "volume-1": newErrorCondition(false, ""),
+ "volume-2": newErrorCondition(false, ""),
+ "volume-3": newErrorCondition(true, string(directpvtypes.VolumeConditionMessageStagingPathNotMounted)),
+ "volume-4": newErrorCondition(true, string(directpvtypes.VolumeConditionMessageTargetPathNotMounted)),
+ "volume-5": newErrorCondition(true, string(directpvtypes.VolumeConditionMessageStagingPathNotMounted)),
+ }
+
+ if err := checkVolumesHealth(context.TODO(), directpvtypes.NodeID("test-node"), getMountsFn); err != nil {
+ t.Fatalf("unable to check volumes health: %v", err)
+ }
+
+ for volumeName, condition := range expectedErrorConditions {
+ volume, err := client.VolumeClient().Get(context.TODO(), volumeName, metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()})
+ if err != nil {
+ t.Fatalf("Error while getting the volume %v: %+v", volume.Name, err)
+ }
+ errorCondition := k8s.GetConditionByType(volume.Status.Conditions, string(directpvtypes.VolumeConditionTypeError))
+ if errorCondition == nil {
+ t.Fatalf("[volume: %s] Expected error condition but got nil", volumeName)
+ }
+ if errorCondition.Status != condition.Status {
+ t.Fatalf("[volume: %s] Expected condition status %v but got %v", volumeName, condition.Status, errorCondition.Status)
+ }
+ if errorCondition.Reason != condition.Reason {
+ t.Fatalf("[volume: %s] Expected condition reason %v but got %v", volumeName, condition.Reason, errorCondition.Reason)
+ }
+ if errorCondition.Message != condition.Message {
+ t.Fatalf("[volume: %s] Expected condition message %v but got %v", volumeName, condition.Message, errorCondition.Message)
+ }
+ }
+}