diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node.go b/cloud/blockstore/tools/csi_driver/internal/driver/node.go index 784a325bfc1..d6e966e1947 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node.go @@ -492,6 +492,11 @@ func (s *nodeService) nodePublishDiskAsVhostSocket( }) if err != nil { + if s.IsGrpcTimeoutError(err) { + s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + } return fmt.Errorf("failed to start NBS endpoint: %w", err) } @@ -582,6 +587,11 @@ func (s *nodeService) nodeStageDiskAsVhostSocket( }) if err != nil { + if s.IsGrpcTimeoutError(err) { + s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + } return fmt.Errorf("failed to start NBS endpoint: %w", err) } @@ -716,6 +726,19 @@ func (s *nodeService) IsMountConflictError(err error) bool { return false } +func (s *nodeService) IsGrpcTimeoutError(err error) bool { + if err != nil { + var clientErr *nbsclient.ClientError + if errors.As(err, &clientErr) { + if clientErr.Code == nbsclient.E_GRPC_DEADLINE_EXCEEDED { + return true + } + } + } + + return false +} + func (s *nodeService) hasLocalEndpoint( ctx context.Context, diskId string) (bool, error) { @@ -900,7 +923,7 @@ func (s *nodeService) startNbsEndpointForNBD( } hostType := nbsapi.EHostType_HOST_TYPE_DEFAULT - return s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{ + resp, err := s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{ UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), DiskId: diskId, InstanceId: nbsInstanceId, @@ -918,6 +941,14 @@ func (s *nodeService) startNbsEndpointForNBD( HostType: &hostType, }, }) + + if s.IsGrpcTimeoutError(err) { + s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + } + + return resp, err } func (s *nodeService) getNfsClient(fileSystemId string) nfsclient.EndpointClientIface { @@ -956,6 +987,12 @@ func (s *nodeService) nodePublishFileStoreAsVhostSocket( }, }) if err != nil { + if s.IsGrpcTimeoutError(err) { + s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + } + return fmt.Errorf("failed to start NFS endpoint: %w", err) } @@ -994,6 +1031,12 @@ func (s *nodeService) nodeStageFileStoreAsVhostSocket( }, }) if err != nil { + if s.IsGrpcTimeoutError(err) { + s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{ + UnixSocketPath: filepath.Join(endpointDir, nbsSocketName), + }) + } + return fmt.Errorf("failed to start NFS endpoint: %w", err) } diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go index 6b52cf2877b..f57ae786c52 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node_test.go @@ -4,6 +4,7 @@ package driver import ( "context" + "fmt" "io/fs" "os" "os/exec" @@ -16,6 +17,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" nbs "github.com/ydb-platform/nbs/cloud/blockstore/public/api/protos" + nbsclient "github.com/ydb-platform/nbs/cloud/blockstore/public/sdk/go/client" "github.com/ydb-platform/nbs/cloud/blockstore/tools/csi_driver/internal/driver/mocks" csimounter "github.com/ydb-platform/nbs/cloud/blockstore/tools/csi_driver/internal/mounter" nfs "github.com/ydb-platform/nbs/cloud/filestore/public/api/protos" @@ -937,3 +939,171 @@ func TestPublishDeviceWithReadWriteManyModeIsNotSupportedWithNBS(t *testing.T) { }) require.Error(t, err) } + +func TestGrpcTimeoutForIKubevirt(t *testing.T) { + tempDir := t.TempDir() + + nbsClient := mocks.NewNbsClientMock() + nfsClient := mocks.NewNfsEndpointClientMock() + nfsLocalClient := mocks.NewNfsEndpointClientMock() + mounter := csimounter.NewMock() + + ctx := context.Background() + nodeId := "testNodeId" + clientId := "testClientId" + instanceId := "testInstanceId" + actualClientId := "testClientId-" + instanceId + diskId := "test-disk-id-42" + deviceName := diskId + volumeId := diskId + "#" + instanceId + backend := "nbs" + + stagingTargetPath := filepath.Join(tempDir, "testStagingTargetPath") + socketsDir := filepath.Join(tempDir, "sockets") + sourcePath := filepath.Join(socketsDir, instanceId, diskId) + targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount") + nbsSocketPath := filepath.Join(sourcePath, "nbs.sock") + + nodeService := newNodeService( + nodeId, + clientId, + true, + socketsDir, + targetFsPathPattern, + "", + make(LocalFilestoreOverrideMap), + nbsClient, + nfsClient, + nfsLocalClient, + mounter, + ) + + accessMode := csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + + volumeCapability := csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: accessMode, + }, + } + + volumeContext := map[string]string{ + backendVolumeContextKey: backend, + instanceIdKey: instanceId, + } + + hostType := nbs.EHostType_HOST_TYPE_DEFAULT + grpcError := nbsclient.ClientError{Code: nbsclient.E_GRPC_DEADLINE_EXCEEDED} + startEndpointError := fmt.Errorf("%w", grpcError) + nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{ + UnixSocketPath: nbsSocketPath, + DiskId: diskId, + InstanceId: instanceId, + ClientId: actualClientId, + DeviceName: deviceName, + IpcType: nbs.EClientIpcType_IPC_VHOST, + VhostQueuesCount: 8, + VolumeAccessMode: nbs.EVolumeAccessMode_VOLUME_ACCESS_READ_WRITE, + VolumeMountMode: nbs.EVolumeMountMode_VOLUME_MOUNT_LOCAL, + Persistent: true, + NbdDevice: &nbs.TStartEndpointRequest_UseFreeNbdDeviceFile{ + false, + }, + ClientProfile: &nbs.TClientProfile{ + HostType: &hostType, + }, + }).Once().Return(&nbs.TStartEndpointResponse{}, startEndpointError) + + nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ + UnixSocketPath: nbsSocketPath, + }).Once().Return(&nbs.TStopEndpointResponse{}, nil) + + _, err := nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: volumeId, + StagingTargetPath: stagingTargetPath, + VolumeCapability: &volumeCapability, + VolumeContext: volumeContext, + }) + require.Error(t, err) +} + +func TestGrpcTimeoutForInfrakuber(t *testing.T) { + tempDir := t.TempDir() + + nbsClient := mocks.NewNbsClientMock() + mounter := csimounter.NewMock() + + ipcType := nbs.EClientIpcType_IPC_NBD + nbdDeviceFile := filepath.Join(tempDir, "dev", "nbd3") + err := os.MkdirAll(nbdDeviceFile, fs.FileMode(0755)) + require.NoError(t, err) + + ctx := context.Background() + nodeId := "testNodeId" + clientId := "testClientId" + diskId := "test-disk-id-42" + actualClientId := "testClientId-testNodeId" + targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount") + stagingTargetPath := "testStagingTargetPath" + socketsDir := filepath.Join(tempDir, "sockets") + socketPath := filepath.Join(socketsDir, diskId, "nbs.sock") + + nodeService := newNodeService( + nodeId, + clientId, + false, + socketsDir, + targetFsPathPattern, + "", + make(LocalFilestoreOverrideMap), + nbsClient, + nil, + nil, + mounter, + ) + + volumeCapability := csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{}, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + } + + volumeContext := map[string]string{} + + hostType := nbs.EHostType_HOST_TYPE_DEFAULT + grpcError := nbsclient.ClientError{Code: nbsclient.E_GRPC_DEADLINE_EXCEEDED} + startEndpointError := fmt.Errorf("%w", grpcError) + nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{ + UnixSocketPath: socketPath, + DiskId: diskId, + InstanceId: nodeId, + ClientId: actualClientId, + DeviceName: diskId, + IpcType: ipcType, + VhostQueuesCount: 8, + VolumeAccessMode: nbs.EVolumeAccessMode_VOLUME_ACCESS_READ_WRITE, + VolumeMountMode: nbs.EVolumeMountMode_VOLUME_MOUNT_LOCAL, + Persistent: true, + NbdDevice: &nbs.TStartEndpointRequest_UseFreeNbdDeviceFile{ + true, + }, + ClientProfile: &nbs.TClientProfile{ + HostType: &hostType, + }, + }).Return(&nbs.TStartEndpointResponse{}, startEndpointError) + + nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{ + UnixSocketPath: socketPath, + }).Return(&nbs.TStopEndpointResponse{}, nil) + + _, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{ + VolumeId: diskId, + StagingTargetPath: stagingTargetPath, + VolumeCapability: &volumeCapability, + VolumeContext: volumeContext, + }) + require.Error(t, err) +}