Skip to content

Commit

Permalink
Add support for all createVol params mentioned in the docs.
Browse files Browse the repository at this point in the history
Signed-off-by: Omar Pakker <[email protected]>
  • Loading branch information
Omar007 committed Oct 2, 2024
1 parent c1ef6e0 commit 10da91a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 70 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
Expand Down
177 changes: 110 additions & 67 deletions pkg/cubefs/cfs_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand All @@ -35,19 +36,35 @@ import (
)

const (
KVolumeName = "volName"
KMasterAddr = "masterAddr"
KLogLevel = "logLevel"
KLogDir = "logDir"
KOwner = "owner"
KMountPoint = "mountPoint"
KExporterPort = "exporterPort"
KProfPort = "profPort"
KCrossZone = "crossZone"
KEnableToken = "enableToken"
KZoneName = "zoneName"
KConsulAddr = "consulAddr"
KVolType = "volType"
KVolumeName = "volName"
KMasterAddr = "masterAddr"
KLogLevel = "logLevel"
KLogDir = "logDir"
KOwner = "owner"
KMountPoint = "mountPoint"
KExporterPort = "exporterPort"
KProfPort = "profPort"
KCrossZone = "crossZone"
KEnableToken = "enableToken"
KZoneName = "zoneName"
KConsulAddr = "consulAddr"
KVolType = "volType"
KMpCount = "mpCount"
KDpCount = "dpCount"
KDpSize = "dpSize"
KReplicaNum = "replicaNum"
KEnablePosixAcl = "enablePosixAcl"
KFollowerRead = "followerRead"
KNormalZonesFirst = "normalZonesFirst"
KCacheRuleKey = "cacheRuleKey"
KEbsBlkSize = "ebsBlkSize"
KCacheCap = "cacheCap"
KCacheAction = "cacheAction"
KCacheThreshold = "cacheThreshold"
KCacheTTL = "cacheTTL"
KCacheHighWater = "cacheHighWater"
KCacheLowWater = "cacheLowWater"
KCacheLRUInterval = "cacheLRUInterval"
)

const (
Expand All @@ -57,8 +74,6 @@ const (
defaultProfPort int = 10094
defaultLogLevel = "info"
jsonFileSuffix = ".json"
defaultConsulAddr = "http://consul-service.cubefs.svc.cluster.local:8500"
defaultVolType = "0"
)

const (
Expand All @@ -67,6 +82,30 @@ const (
ErrDuplicateVolMsg = "duplicate vol"
)

var createVolParams = [...]string{
KOwner,
KCrossZone,
KEnableToken,
KZoneName,
KVolType,
KMpCount,
KDpCount,
KDpSize,
KReplicaNum,
KEnablePosixAcl,
KFollowerRead,
KNormalZonesFirst,
KCacheRuleKey,
KEbsBlkSize,
KCacheCap,
KCacheAction,
KCacheThreshold,
KCacheTTL,
KCacheHighWater,
KCacheLowWater,
KCacheLRUInterval,
}

type cfsServer struct {
clientConfFile string
masterAddrs []string
Expand All @@ -81,35 +120,35 @@ type cfsServerResponse struct {
}

func newCfsServer(volName string, param map[string]string) (cs *cfsServer, err error) {
masterAddr := param[KMasterAddr]
if len(volName) == 0 || len(masterAddr) == 0 {
return nil, fmt.Errorf("invalid argument for initializing cfsServer")
if len(volName) == 0 {
return nil, fmt.Errorf("invalid argument value in volName.")
}
if !hasValue(param, KMasterAddr) {
return nil, fmt.Errorf("master address(es) must be configured")
}

newVolName := getValueWithDefault(param, KVolumeName, volName)
clientConfFile := defaultClientConfPath + newVolName + jsonFileSuffix
newOwner := csicommon.ShortenString(fmt.Sprintf("csi_%d", time.Now().UnixNano()), 20)
param[KMasterAddr] = masterAddr
param[KVolumeName] = newVolName
param[KOwner] = getValueWithDefault(param, KOwner, newOwner)
finalVolName := getValueWithDefault(param, KVolumeName, volName)
generatedOwner := csicommon.ShortenString(fmt.Sprintf("csi_%d", time.Now().UnixNano()), 20)
param[KVolumeName] = finalVolName
param[KOwner] = getValueWithDefault(param, KOwner, generatedOwner)
param[KLogLevel] = getValueWithDefault(param, KLogLevel, defaultLogLevel)
param[KLogDir] = defaultLogDir + newVolName
param[KConsulAddr] = getValueWithDefault(param, KConsulAddr, defaultConsulAddr)
param[KVolType] = getValueWithDefault(param, KVolType, defaultVolType)
param[KLogDir] = defaultLogDir + finalVolName
return &cfsServer{
clientConfFile: clientConfFile,
masterAddrs: strings.Split(masterAddr, ","),
clientConfFile: defaultClientConfPath + finalVolName + jsonFileSuffix,
masterAddrs: strings.Split(param[KMasterAddr], ","),
clientConf: param,
}, err
}

func hasValue(param map[string]string, key string) bool {
return len(param[key]) > 0
}

func getValueWithDefault(param map[string]string, key string, defaultValue string) string {
value := param[key]
if len(value) == 0 {
value = defaultValue
if hasValue(param, key) {
return param[key]
}

return value
return defaultValue
}

func (cs *cfsServer) persistClientConf(mountPoint string) error {
Expand All @@ -122,37 +161,36 @@ func (cs *cfsServer) persistClientConf(mountPoint string) error {
clientConfBytes, _ := json.Marshal(cs.clientConf)
err := os.WriteFile(cs.clientConfFile, clientConfBytes, 0444)
if err != nil {
return status.Errorf(codes.Internal, "create client config file fail. err: %v", err.Error())
return status.Errorf(codes.Internal, "create client config file fail. err(%s)", err.Error())
}

glog.V(0).Infof("create client config file success, volumeId:%v", cs.clientConf[KVolumeName])
glog.V(0).Infof("create client config file success. volumeName(%s)", cs.clientConf[KVolumeName])
return nil
}

func (cs *cfsServer) createVolume(capacityGB int64) (err error) {
valName := cs.clientConf[KVolumeName]
owner := cs.clientConf[KOwner]
crossZone := cs.clientConf[KCrossZone]
token := cs.clientConf[KEnableToken]
zone := cs.clientConf[KZoneName]
volType := cs.clientConf[KVolType]

return cs.forEachMasterAddr("CreateVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/admin/createVol?name=%s&capacity=%v&owner=%v&crossZone=%v&enableToken=%v&zoneName=%v&volType=%v",
addr, valName, capacityGB, owner, crossZone, token, zone, volType)
glog.Infof("createVol url: %v", url)
resp, err := cs.executeRequest(url)
volName := cs.clientConf[KVolumeName]
params := url.Values{}
params.Add("name", volName)
params.Add("capacity", strconv.FormatInt(capacityGB, 10))
for _, param := range createVolParams {
if hasValue(cs.clientConf, param) {
params.Add(param, cs.clientConf[param])
}
}
resp, err := cs.executeRequest(addr, "admin/createVol", params)
if err != nil {
return err
}

if resp.Code != 0 {
if strings.Contains(resp.Msg, ErrDuplicateVolMsg) {
glog.Warningf("duplicate to create volume. url(%v) msg: %v", url, resp.Msg)
glog.Warningf("duplicate create volume[%s]. msg(%s)", volName, resp.Msg)
return nil
}

return fmt.Errorf("create volume failed: url(%v) code=(%v), msg: %v", url, resp.Code, resp.Msg)
return fmt.Errorf("create volume[%s] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
Expand All @@ -165,11 +203,11 @@ func (cs *cfsServer) forEachMasterAddr(stage string, f func(addr string) error)
break
}

glog.Warningf("try %s with master %q failed: %v", stage, addr, err)
glog.Warningf("try %s with master[%s] failed. err(%v)", stage, addr, err)
}

if err != nil {
glog.Errorf("%s failed with all masters: %v", stage, err)
glog.Errorf("%s failed with all masters. err(%v)", stage, err)
return err
}

Expand All @@ -182,43 +220,46 @@ func (cs *cfsServer) deleteVolume() (err error) {
return err
}

valName := cs.clientConf[KVolumeName]
volName := cs.clientConf[KVolumeName]
return cs.forEachMasterAddr("DeleteVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/vol/delete?name=%s&authKey=%v", addr, valName, ownerMd5)
glog.Infof("deleteVol url: %v", url)
resp, err := cs.executeRequest(url)
params := url.Values{}
params.Add("name", volName)
params.Add("authKey", ownerMd5)
resp, err := cs.executeRequest(addr, "vol/delete", params)
if err != nil {
return err
}

if resp.Code != 0 {
if resp.Code == ErrCodeVolNotExists {
glog.Warningf("volume[%s] not exists, assuming the volume has already been deleted. code:%v, msg:%v",
valName, resp.Code, resp.Msg)
glog.Warningf("volume[%s] does not exist, assuming the volume has already been deleted. code(%d), msg(%s)",
volName, resp.Code, resp.Msg)
return nil
}
return fmt.Errorf("delete volume[%s] is failed. code:%v, msg:%v", valName, resp.Code, resp.Msg)
return fmt.Errorf("delete volume[%s] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
})
}

func (cs *cfsServer) executeRequest(url string) (*cfsServerResponse, error) {
func (cs *cfsServer) executeRequest(host string, path string, params url.Values) (*cfsServerResponse, error) {
url := fmt.Sprintf("http://%s/%s?%s", host, path, params.Encode())
glog.Infof("request url: %v", url)
httpResp, err := http.Get(url)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "request url failed, url(%v) err(%v)", url, err)
return nil, status.Errorf(codes.Unavailable, "request to url[%s] failed. err(%v)", url, err)
}

defer httpResp.Body.Close()
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "read http response body, url(%v) bodyLen(%v) err(%v)", url, len(body), err)
return nil, status.Errorf(codes.Unavailable, "reading http response body from url[%s] failed. bodyLen(%d), err(%v)", url, len(body), err)
}

resp := &cfsServerResponse{}
if err := json.Unmarshal(body, resp); err != nil {
return nil, status.Errorf(codes.Unavailable, "unmarshal http response body, url(%v) msg(%v) err(%v)", url, resp.Msg, err)
return nil, status.Errorf(codes.Unavailable, "unmarshalling http response body from url[%s] failed. msg(%s), err(%v)", url, resp.Msg, err)
}
return resp, nil
}
Expand All @@ -236,15 +277,17 @@ func (cs *cfsServer) expandVolume(capacityGB int64) (err error) {
volName := cs.clientConf[KVolumeName]

return cs.forEachMasterAddr("ExpandVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/vol/expand?name=%s&authKey=%v&capacity=%v", addr, volName, ownerMd5, capacityGB)
glog.Infof("expandVolume url: %v", url)
resp, err := cs.executeRequest(url)
params := url.Values{}
params.Add("name", volName)
params.Add("authKey", ownerMd5)
params.Add("capacity", strconv.FormatInt(capacityGB, 10))
resp, err := cs.executeRequest(addr, "vol/expand", params)
if err != nil {
return err
}

if resp.Code != 0 {
return fmt.Errorf("expand volume[%v] failed, code:%v, msg:%v", volName, resp.Code, resp.Msg)
return fmt.Errorf("expand volume[%v] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
Expand All @@ -255,7 +298,7 @@ func (cs *cfsServer) getOwnerMd5() (string, error) {
owner := cs.clientConf[KOwner]
key := md5.New()
if _, err := key.Write([]byte(owner)); err != nil {
return "", status.Errorf(codes.Internal, "calc owner[%v] md5 fail. err(%v)", owner, err)
return "", status.Errorf(codes.Internal, "calculating owner[%s] md5 failed. err(%v)", owner, err)
}

return hex.EncodeToString(key.Sum(nil)), nil
Expand Down

0 comments on commit 10da91a

Please sign in to comment.