Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to support hyperdisk multi-writer mode #1901

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
10 changes: 10 additions & 0 deletions pkg/common/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
)

const (
// Disk Params
ParameterAccessMode = "access-mode"

// Parameters for StorageClass
ParameterKeyType = "type"
ParameterKeyReplicationType = "replication-type"
Expand Down Expand Up @@ -106,6 +109,9 @@ type DiskParameters struct {
// Values: {bool}
// Default: false
MultiZoneProvisioning bool
// Values: READ_WRITE_SINGLE, READ_ONLY_MANY, READ_WRITE_MANY
// Default: READ_WRITE_SINGLE
AccessMode string
}

// SnapshotParameters contains normalized and defaulted parameters for snapshots
Expand Down Expand Up @@ -253,6 +259,10 @@ func (pp *ParameterProcessor) ExtractAndDefaultParameters(parameters map[string]
if paramEnableMultiZoneProvisioning {
p.Labels[MultiZoneLabel] = "true"
}
case ParameterAccessMode:
if v != "" {
p.AccessMode = v
}
default:
return p, fmt.Errorf("parameters contains invalid option %q", k)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func TestIsUserMultiAttachError(t *testing.T) {
},
}
for _, test := range cases {
code, err := isUserMultiAttachError(fmt.Errorf(test.errorString))
code, err := isUserMultiAttachError(fmt.Errorf("%s", test.errorString))
if test.expectCode {
if err != nil || code != test.expectedCode {
t.Errorf("Failed with non-nil error %v or bad code %v: %s", err, code, test.errorString)
Expand Down
11 changes: 0 additions & 11 deletions pkg/gce-cloud-provider/compute/cloud-disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,6 @@ func (d *CloudDisk) GetKMSKeyName() string {
return ""
}

func (d *CloudDisk) GetMultiWriter() bool {
switch {
case d.disk != nil:
return false
case d.betaDisk != nil:
return d.betaDisk.MultiWriter
default:
return false
}
}

func (d *CloudDisk) GetEnableConfidentialCompute() bool {
switch {
case d.disk != nil:
Expand Down
7 changes: 1 addition & 6 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string
}

// Disk Methods
func (cloud *FakeCloudProvider) GetDisk(ctx context.Context, project string, volKey *meta.Key, api GCEAPIVersion) (*CloudDisk, error) {
func (cloud *FakeCloudProvider) GetDisk(ctx context.Context, project string, volKey *meta.Key) (*CloudDisk, error) {
disk, ok := cloud.disks[volKey.String()]
if !ok {
return nil, notFoundError()
Expand Down Expand Up @@ -212,11 +212,6 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *
params.DiskType, respType[len(respType)-1])
}

// We are assuming here that a multiWriter disk could be used as non-multiWriter
if multiWriter && !resp.GetMultiWriter() {
return fmt.Errorf("disk already exists with incompatible capability. Need MultiWriter. Got non-MultiWriter")
}

klog.V(4).Infof("Compatible disk already exists")
return ValidateDiskParameters(resp, params)
}
Expand Down
117 changes: 34 additions & 83 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type GCECompute interface {
GetDefaultProject() string
GetDefaultZone() string
// Disk Methods
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error)
GetDisk(ctx context.Context, project string, volumeKey *meta.Key) (*CloudDisk, error)
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error
Expand Down Expand Up @@ -321,28 +321,16 @@ func (cloud *CloudProvider) ListSnapshots(ctx context.Context, filter string) ([
return items, "", nil
}

func (cloud *CloudProvider) GetDisk(ctx context.Context, project string, key *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error) {
func (cloud *CloudProvider) GetDisk(ctx context.Context, project string, key *meta.Key) (*CloudDisk, error) {
klog.V(5).Infof("Getting disk %v", key)

// Override GCEAPIVersion as hyperdisk is only available in beta and we cannot get the disk-type with get disk call.
gceAPIVersion = GCEAPIVersionBeta
switch key.Type() {
case meta.Zonal:
if gceAPIVersion == GCEAPIVersionBeta {
disk, err := cloud.getZonalBetaDiskOrError(ctx, project, key.Zone, key.Name)
return CloudDiskFromBeta(disk), err
} else {
disk, err := cloud.getZonalDiskOrError(ctx, project, key.Zone, key.Name)
return CloudDiskFromV1(disk), err
}
disk, err := cloud.getZonalBetaDiskOrError(ctx, project, key.Zone, key.Name)
return CloudDiskFromBeta(disk), err
case meta.Regional:
if gceAPIVersion == GCEAPIVersionBeta {
disk, err := cloud.getRegionalBetaDiskOrError(ctx, project, key.Region, key.Name)
return CloudDiskFromBeta(disk), err
} else {
disk, err := cloud.getRegionalDiskOrError(ctx, project, key.Region, key.Name)
return CloudDiskFromV1(disk), err
}
disk, err := cloud.getRegionalBetaDiskOrError(ctx, project, key.Region, key.Name)
return CloudDiskFromBeta(disk), err
default:
return nil, fmt.Errorf("key was neither zonal nor regional, got: %v", key.String())
}
Expand Down Expand Up @@ -407,11 +395,6 @@ func (cloud *CloudProvider) ValidateExistingDisk(ctx context.Context, resp *Clou
reqBytes, common.GbToBytes(resp.GetSizeGb()), limBytes)
}

// We are assuming here that a multiWriter disk could be used as non-multiWriter
if multiWriter && !resp.GetMultiWriter() {
return fmt.Errorf("disk already exists with incompatible capability. Need MultiWriter. Got non-MultiWriter")
}

return ValidateDiskParameters(resp, params)
}

Expand Down Expand Up @@ -553,9 +536,6 @@ func convertV1DiskToBetaDisk(v1Disk *computev1.Disk) *computebeta.Disk {
AccessMode: v1Disk.AccessMode,
}

// Hyperdisk doesn't currently support multiWriter (https://cloud.google.com/compute/docs/disks/hyperdisks#limitations),
// but if multiWriter + hyperdisk is supported in the future, we want the PDCSI driver to support this feature without
// any additional code change.
if v1Disk.ProvisionedIops > 0 {
betaDisk.ProvisionedIops = v1Disk.ProvisionedIops
}
Expand Down Expand Up @@ -619,9 +599,6 @@ func convertBetaDiskToV1Disk(betaDisk *computebeta.Disk) *computev1.Disk {
AccessMode: betaDisk.AccessMode,
}

// Hyperdisk doesn't currently support multiWriter (https://cloud.google.com/compute/docs/disks/hyperdisks#limitations),
// but if multiWriter + hyperdisk is supported in the future, we want the PDCSI driver to support this feature without
// any additional code change.
if betaDisk.ProvisionedIops > 0 {
v1Disk.ProvisionedIops = betaDisk.ProvisionedIops
}
Expand All @@ -646,16 +623,11 @@ func (cloud *CloudProvider) insertRegionalDisk(
description string,
multiWriter bool) error {
var (
err error
opName string
gceAPIVersion = GCEAPIVersionV1
err error
opName string
)

if multiWriter {
gceAPIVersion = GCEAPIVersionBeta
}

diskToCreate := &computev1.Disk{
diskToCreate := &computebeta.Disk{
Name: volKey.Name,
SizeGb: common.BytesToGbRoundUp(capBytes),
Description: description,
Expand Down Expand Up @@ -684,7 +656,7 @@ func (cloud *CloudProvider) insertRegionalDisk(
diskToCreate.ReplicaZones = replicaZones
}
if params.DiskEncryptionKMSKey != "" {
diskToCreate.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
diskToCreate.DiskEncryptionKey = &computebeta.CustomerEncryptionKey{
KmsKeyName: params.DiskEncryptionKMSKey,
}
}
Expand All @@ -694,29 +666,21 @@ func (cloud *CloudProvider) insertRegionalDisk(
}

if len(resourceTags) > 0 {
diskToCreate.Params = &computev1.DiskParams{
diskToCreate.Params = &computebeta.DiskParams{
ResourceManagerTags: resourceTags,
}
}

if gceAPIVersion == GCEAPIVersionBeta {
var insertOp *computebeta.Operation
betaDiskToCreate := convertV1DiskToBetaDisk(diskToCreate)
betaDiskToCreate.MultiWriter = multiWriter
insertOp, err = cloud.betaService.RegionDisks.Insert(project, volKey.Region, betaDiskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}
} else {
var insertOp *computev1.Operation
insertOp, err = cloud.service.RegionDisks.Insert(project, volKey.Region, diskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}
var insertOp *computebeta.Operation
diskToCreate.MultiWriter = multiWriter
insertOp, err = cloud.betaService.RegionDisks.Insert(project, volKey.Region, diskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}

if err != nil {
if IsGCEError(err, "alreadyExists") {
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion)
disk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
// failed to GetDisk, however the Disk may already exist
// the error code should be non-Final
Expand All @@ -742,7 +706,7 @@ func (cloud *CloudProvider) insertRegionalDisk(
// the error code returned should be non-final
if err != nil {
if IsGCEError(err, "alreadyExists") {
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion)
disk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
Expand Down Expand Up @@ -774,15 +738,11 @@ func (cloud *CloudProvider) insertZonalDisk(
multiWriter bool,
accessMode string) error {
var (
err error
opName string
gceAPIVersion = GCEAPIVersionV1
err error
opName string
)
if multiWriter {
gceAPIVersion = GCEAPIVersionBeta
}

diskToCreate := &computev1.Disk{
diskToCreate := &computebeta.Disk{
Name: volKey.Name,
SizeGb: common.BytesToGbRoundUp(capBytes),
Description: description,
Expand Down Expand Up @@ -824,7 +784,7 @@ func (cloud *CloudProvider) insertZonalDisk(
}

if params.DiskEncryptionKMSKey != "" {
diskToCreate.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
diskToCreate.DiskEncryptionKey = &computebeta.CustomerEncryptionKey{
KmsKeyName: params.DiskEncryptionKMSKey,
}
}
Expand All @@ -836,31 +796,22 @@ func (cloud *CloudProvider) insertZonalDisk(
}

if len(resourceTags) > 0 {
diskToCreate.Params = &computev1.DiskParams{
diskToCreate.Params = &computebeta.DiskParams{
ResourceManagerTags: resourceTags,
}
}
diskToCreate.AccessMode = accessMode

if gceAPIVersion == GCEAPIVersionBeta {
var insertOp *computebeta.Operation
betaDiskToCreate := convertV1DiskToBetaDisk(diskToCreate)
betaDiskToCreate.MultiWriter = multiWriter
insertOp, err = cloud.betaService.Disks.Insert(project, volKey.Zone, betaDiskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}
} else {
var insertOp *computev1.Operation
insertOp, err = cloud.service.Disks.Insert(project, volKey.Zone, diskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}
diskToCreate.AccessMode = accessMode
var insertOp *computebeta.Operation
diskToCreate.MultiWriter = multiWriter
insertOp, err = cloud.betaService.Disks.Insert(project, volKey.Zone, diskToCreate).Context(ctx).Do()
if insertOp != nil {
opName = insertOp.Name
}

if err != nil {
if IsGCEError(err, "alreadyExists") {
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion)
disk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
// failed to GetDisk, however the Disk may already exist
// the error code should be non-Final
Expand All @@ -887,7 +838,7 @@ func (cloud *CloudProvider) insertZonalDisk(
// failed to wait for Op to finish, however, the Op possibly is still running as expected
// the error code returned should be non-final
if IsGCEError(err, "alreadyExists") {
disk, err := cloud.GetDisk(ctx, project, volKey, gceAPIVersion)
disk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
return common.NewTemporaryError(codes.Unavailable, fmt.Errorf("error when getting disk: %w", err))
}
Expand Down Expand Up @@ -1186,7 +1137,7 @@ func (cloud *CloudProvider) waitForAttachOnDisk(ctx context.Context, project str
start := time.Now()
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) {
klog.V(6).Infof("Polling disks.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
disk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1)
disk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
return false, fmt.Errorf("GetDisk failed to get disk: %w", err)
}
Expand Down Expand Up @@ -1436,7 +1387,7 @@ func (cloud *CloudProvider) DeleteImage(ctx context.Context, project, imageName
// k8s.io/apimachinery/quantity package for better size handling
func (cloud *CloudProvider) ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error) {
klog.V(5).Infof("Resizing disk %v to size %v", volKey, requestBytes)
cloudDisk, err := cloud.GetDisk(ctx, project, volKey, GCEAPIVersionV1)
cloudDisk, err := cloud.GetDisk(ctx, project, volKey)
if err != nil {
return -1, fmt.Errorf("failed to get disk: %w", err)
}
Expand Down
Loading