Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for additional createVol parameters #107

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
Expand Down
183 changes: 113 additions & 70 deletions pkg/cubefs/cfs_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
Expand All @@ -35,19 +36,35 @@ import (
)

const (
KVolumeName = "volName"
KMasterAddr = "masterAddr"
KLogLevel = "logLevel"
KLogDir = "logDir"
KOwner = "owner"
KMountPoint = "mountPoint"
KExporterPort = "exporterPort"
KProfPort = "profPort"
KCrossZone = "crossZone"
KEnableToken = "enableToken"
KZoneName = "zoneName"
KConsulAddr = "consulAddr"
KVolType = "volType"
KVolumeName = "volName"
KMasterAddr = "masterAddr"
KLogLevel = "logLevel"
KLogDir = "logDir"
KOwner = "owner"
KMountPoint = "mountPoint"
KExporterPort = "exporterPort"
KProfPort = "profPort"
KCrossZone = "crossZone"
KEnableToken = "enableToken"
KZoneName = "zoneName"
KConsulAddr = "consulAddr"
KVolType = "volType"
KMpCount = "mpCount"
KDpCount = "dpCount"
KDpSize = "dpSize"
KReplicaNum = "replicaNum"
KEnablePosixAcl = "enablePosixAcl"
KFollowerRead = "followerRead"
KNormalZonesFirst = "normalZonesFirst"
KCacheRuleKey = "cacheRuleKey"
KEbsBlkSize = "ebsBlkSize"
KCacheCap = "cacheCap"
KCacheAction = "cacheAction"
KCacheThreshold = "cacheThreshold"
KCacheTTL = "cacheTTL"
KCacheHighWater = "cacheHighWater"
KCacheLowWater = "cacheLowWater"
KCacheLRUInterval = "cacheLRUInterval"
)

const (
Expand All @@ -57,8 +74,6 @@ const (
defaultProfPort int = 10094
defaultLogLevel = "info"
jsonFileSuffix = ".json"
defaultConsulAddr = "http://consul-service.cubefs.svc.cluster.local:8500"
defaultVolType = "0"
)

const (
Expand All @@ -67,6 +82,30 @@ const (
ErrDuplicateVolMsg = "duplicate vol"
)

var createVolParams = [...]string{
KOwner,
KCrossZone,
KEnableToken,
KZoneName,
KVolType,
KMpCount,
KDpCount,
KDpSize,
KReplicaNum,
KEnablePosixAcl,
KFollowerRead,
KNormalZonesFirst,
KCacheRuleKey,
KEbsBlkSize,
KCacheCap,
KCacheAction,
KCacheThreshold,
KCacheTTL,
KCacheHighWater,
KCacheLowWater,
KCacheLRUInterval,
}

type cfsServer struct {
clientConfFile string
masterAddrs []string
Expand All @@ -81,35 +120,35 @@ type cfsServerResponse struct {
}

func newCfsServer(volName string, param map[string]string) (cs *cfsServer, err error) {
masterAddr := param[KMasterAddr]
if len(volName) == 0 || len(masterAddr) == 0 {
return nil, fmt.Errorf("invalid argument for initializing cfsServer")
if len(volName) == 0 {
return nil, fmt.Errorf("invalid argument value in volName.")
}
if !hasValue(param, KMasterAddr) {
return nil, fmt.Errorf("master address(es) must be configured")
}

newVolName := getValueWithDefault(param, KVolumeName, volName)
clientConfFile := defaultClientConfPath + newVolName + jsonFileSuffix
newOwner := csicommon.ShortenString(fmt.Sprintf("csi_%d", time.Now().UnixNano()), 20)
param[KMasterAddr] = masterAddr
param[KVolumeName] = newVolName
param[KOwner] = getValueWithDefault(param, KOwner, newOwner)
finalVolName := getValueWithDefault(param, KVolumeName, volName)
generatedOwner := csicommon.ShortenString(fmt.Sprintf("csi_%d", time.Now().UnixNano()), 20)
param[KVolumeName] = finalVolName
param[KOwner] = getValueWithDefault(param, KOwner, generatedOwner)
param[KLogLevel] = getValueWithDefault(param, KLogLevel, defaultLogLevel)
param[KLogDir] = defaultLogDir + newVolName
param[KConsulAddr] = getValueWithDefault(param, KConsulAddr, defaultConsulAddr)
param[KVolType] = getValueWithDefault(param, KVolType, defaultVolType)
param[KLogDir] = defaultLogDir + finalVolName
return &cfsServer{
clientConfFile: clientConfFile,
masterAddrs: strings.Split(masterAddr, ","),
clientConfFile: defaultClientConfPath + finalVolName + jsonFileSuffix,
masterAddrs: strings.Split(param[KMasterAddr], ","),
clientConf: param,
}, err
}

func hasValue(param map[string]string, key string) bool {
return len(param[key]) > 0
}

func getValueWithDefault(param map[string]string, key string, defaultValue string) string {
value := param[key]
if len(value) == 0 {
value = defaultValue
if hasValue(param, key) {
return param[key]
}

return value
return defaultValue
}

func (cs *cfsServer) persistClientConf(mountPoint string) error {
Expand All @@ -120,39 +159,38 @@ func (cs *cfsServer) persistClientConf(mountPoint string) error {
cs.clientConf[KProfPort] = strconv.Itoa(profPort)
_ = os.Mkdir(cs.clientConf[KLogDir], 0777)
clientConfBytes, _ := json.Marshal(cs.clientConf)
err := ioutil.WriteFile(cs.clientConfFile, clientConfBytes, 0444)
err := os.WriteFile(cs.clientConfFile, clientConfBytes, 0444)
if err != nil {
return status.Errorf(codes.Internal, "create client config file fail. err: %v", err.Error())
return status.Errorf(codes.Internal, "create client config file fail. err(%s)", err.Error())
}

glog.V(0).Infof("create client config file success, volumeId:%v", cs.clientConf[KVolumeName])
glog.V(0).Infof("create client config file success. volumeName(%s)", cs.clientConf[KVolumeName])
return nil
}

func (cs *cfsServer) createVolume(capacityGB int64) (err error) {
valName := cs.clientConf[KVolumeName]
owner := cs.clientConf[KOwner]
crossZone := cs.clientConf[KCrossZone]
token := cs.clientConf[KEnableToken]
zone := cs.clientConf[KZoneName]
volType := cs.clientConf[KVolType]

return cs.forEachMasterAddr("CreateVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/admin/createVol?name=%s&capacity=%v&owner=%v&crossZone=%v&enableToken=%v&zoneName=%v&volType=%v",
addr, valName, capacityGB, owner, crossZone, token, zone, volType)
glog.Infof("createVol url: %v", url)
resp, err := cs.executeRequest(url)
volName := cs.clientConf[KVolumeName]
params := url.Values{}
params.Add("name", volName)
params.Add("capacity", strconv.FormatInt(capacityGB, 10))
for _, param := range createVolParams {
if hasValue(cs.clientConf, param) {
params.Add(param, cs.clientConf[param])
}
}
resp, err := cs.executeRequest(addr, "admin/createVol", params)
if err != nil {
return err
}

if resp.Code != 0 {
if strings.Contains(resp.Msg, ErrDuplicateVolMsg) {
glog.Warningf("duplicate to create volume. url(%v) msg: %v", url, resp.Msg)
glog.Warningf("duplicate create volume[%s]. msg(%s)", volName, resp.Msg)
return nil
}

return fmt.Errorf("create volume failed: url(%v) code=(%v), msg: %v", url, resp.Code, resp.Msg)
return fmt.Errorf("create volume[%s] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
Expand All @@ -165,11 +203,11 @@ func (cs *cfsServer) forEachMasterAddr(stage string, f func(addr string) error)
break
}

glog.Warningf("try %s with master %q failed: %v", stage, addr, err)
glog.Warningf("try %s with master[%s] failed. err(%v)", stage, addr, err)
}

if err != nil {
glog.Errorf("%s failed with all masters: %v", stage, err)
glog.Errorf("%s failed with all masters. err(%v)", stage, err)
return err
}

Expand All @@ -182,43 +220,46 @@ func (cs *cfsServer) deleteVolume() (err error) {
return err
}

valName := cs.clientConf[KVolumeName]
volName := cs.clientConf[KVolumeName]
return cs.forEachMasterAddr("DeleteVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/vol/delete?name=%s&authKey=%v", addr, valName, ownerMd5)
glog.Infof("deleteVol url: %v", url)
resp, err := cs.executeRequest(url)
params := url.Values{}
params.Add("name", volName)
params.Add("authKey", ownerMd5)
resp, err := cs.executeRequest(addr, "vol/delete", params)
if err != nil {
return err
}

if resp.Code != 0 {
if resp.Code == ErrCodeVolNotExists {
glog.Warningf("volume[%s] not exists, assuming the volume has already been deleted. code:%v, msg:%v",
valName, resp.Code, resp.Msg)
glog.Warningf("volume[%s] does not exist, assuming the volume has already been deleted. code(%d), msg(%s)",
volName, resp.Code, resp.Msg)
return nil
}
return fmt.Errorf("delete volume[%s] is failed. code:%v, msg:%v", valName, resp.Code, resp.Msg)
return fmt.Errorf("delete volume[%s] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
})
}

func (cs *cfsServer) executeRequest(url string) (*cfsServerResponse, error) {
func (cs *cfsServer) executeRequest(host string, path string, params url.Values) (*cfsServerResponse, error) {
url := fmt.Sprintf("http://%s/%s?%s", host, path, params.Encode())
glog.Infof("request url: %v", url)
httpResp, err := http.Get(url)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "request url failed, url(%v) err(%v)", url, err)
return nil, status.Errorf(codes.Unavailable, "request to url[%s] failed. err(%v)", url, err)
}

defer httpResp.Body.Close()
body, err := ioutil.ReadAll(httpResp.Body)
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "read http response body, url(%v) bodyLen(%v) err(%v)", url, len(body), err)
return nil, status.Errorf(codes.Unavailable, "reading http response body from url[%s] failed. bodyLen(%d), err(%v)", url, len(body), err)
}

resp := &cfsServerResponse{}
if err := json.Unmarshal(body, resp); err != nil {
return nil, status.Errorf(codes.Unavailable, "unmarshal http response body, url(%v) msg(%v) err(%v)", url, resp.Msg, err)
return nil, status.Errorf(codes.Unavailable, "unmarshalling http response body from url[%s] failed. msg(%s), err(%v)", url, resp.Msg, err)
}
return resp, nil
}
Expand All @@ -236,15 +277,17 @@ func (cs *cfsServer) expandVolume(capacityGB int64) (err error) {
volName := cs.clientConf[KVolumeName]

return cs.forEachMasterAddr("ExpandVolume", func(addr string) error {
url := fmt.Sprintf("http://%s/vol/expand?name=%s&authKey=%v&capacity=%v", addr, volName, ownerMd5, capacityGB)
glog.Infof("expandVolume url: %v", url)
resp, err := cs.executeRequest(url)
params := url.Values{}
params.Add("name", volName)
params.Add("authKey", ownerMd5)
params.Add("capacity", strconv.FormatInt(capacityGB, 10))
resp, err := cs.executeRequest(addr, "vol/expand", params)
if err != nil {
return err
}

if resp.Code != 0 {
return fmt.Errorf("expand volume[%v] failed, code:%v, msg:%v", volName, resp.Code, resp.Msg)
return fmt.Errorf("expand volume[%v] failed. code(%d), msg(%s)", volName, resp.Code, resp.Msg)
}

return nil
Expand All @@ -255,7 +298,7 @@ func (cs *cfsServer) getOwnerMd5() (string, error) {
owner := cs.clientConf[KOwner]
key := md5.New()
if _, err := key.Write([]byte(owner)); err != nil {
return "", status.Errorf(codes.Internal, "calc owner[%v] md5 fail. err(%v)", owner, err)
return "", status.Errorf(codes.Internal, "calculating owner[%s] md5 failed. err(%v)", owner, err)
}

return hex.EncodeToString(key.Sum(nil)), nil
Expand Down