From c0a38f13be9fba742a0ef4d15c8a43cde121a6a8 Mon Sep 17 00:00:00 2001 From: Cyber-SiKu Date: Wed, 22 Feb 2023 13:59:17 +0800 Subject: [PATCH 1/3] [fix]playbook/memcached Signed-off-by: Cyber-SiKu --- playbook/memcached/hosts.yaml | 9 ++++++ playbook/memcached/scripts/clean.sh | 7 +++++ playbook/memcached/scripts/deploy.sh | 15 ++++++---- playbook/memcached/scripts/start.sh | 44 ++++++++++++++++++++++++++++ playbook/memcached/scripts/status.sh | 11 ++++++- 5 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 playbook/memcached/scripts/start.sh diff --git a/playbook/memcached/hosts.yaml b/playbook/memcached/hosts.yaml index d79e2cccd..24fd8cc14 100644 --- a/playbook/memcached/hosts.yaml +++ b/playbook/memcached/hosts.yaml @@ -18,6 +18,9 @@ hosts: - EXT_PATH=/mnt/memcachefile/cachefile:1024G - EXT_WBUF_SIZE=8 # size in megabytes of page write buffers. - EXT_ITEM_AGE=1 # store items idle at least this long (seconds, default: no age limit) + - VERBOSE="v" # v: verbose (print errors/warnings while in event loop) + # vv: very verbose (also print client commands/responses) + # vvv: extremely verbose (internal state transitions) - host: server-host2 hostname: 10.0.1.2 labels: @@ -32,6 +35,9 @@ hosts: - EXT_PATH=/mnt/memcachefile/cachefile:1024G - EXT_WBUF_SIZE=8 # size in megabytes of page write buffers. - EXT_ITEM_AGE=1 # store items idle at least this long (seconds, default: no age limit) + - VERBOSE="v" # v: verbose (print errors/warnings while in event loop) + # vv: very verbose (also print client commands/responses) + # vvv: extremely verbose (internal state transitions) - host: server-host3 hostname: 10.0.1.3 labels: @@ -46,3 +52,6 @@ hosts: - EXT_PATH=/mnt/memcachefile/cachefile:1024G - EXT_WBUF_SIZE=8 # size in megabytes of page write buffers. - EXT_ITEM_AGE=1 # store items idle at least this long (seconds, default: no age limit) + - VERBOSE="v" # v: verbose (print errors/warnings while in event loop) + # vv: very verbose (also print client commands/responses) + # vvv: extremely verbose (internal state transitions) diff --git a/playbook/memcached/scripts/clean.sh b/playbook/memcached/scripts/clean.sh index 2de697677..84fbe15e9 100644 --- a/playbook/memcached/scripts/clean.sh +++ b/playbook/memcached/scripts/clean.sh @@ -2,6 +2,7 @@ g_container_name="memcached-"${PORT} g_docker_cmd="${SUDO_ALIAS} docker" +g_rm_cmd="${SUDO_ALIAS} rm -rf" function msg() { printf '%b' "$1" >&2 @@ -33,5 +34,11 @@ stop_container() { success "rm container[${g_container_name}]\n" } +rm_cachefile() { + cachefile_path=(${EXT_PATH//:/ }) + ${g_rm_cmd} ${cachefile_path} +} + precheck stop_container +rm_cachefile diff --git a/playbook/memcached/scripts/deploy.sh b/playbook/memcached/scripts/deploy.sh index 070dc2437..deda5f5f3 100644 --- a/playbook/memcached/scripts/deploy.sh +++ b/playbook/memcached/scripts/deploy.sh @@ -4,6 +4,9 @@ g_container_name="memcached-"${PORT} g_start_args="" g_docker_cmd="${SUDO_ALIAS} docker" g_lsof_cmd="${SUDO_ALIAS} lsof" +g_rm_cmd="${SUDO_ALIAS} rm -rf" +g_mkdir_cmd="${SUDO_ALIAS} mkdir -p" +g_touch_cmd="${SUDO_ALIAS} touch" g_volume_bind="" g_status="" g_user="" @@ -37,11 +40,10 @@ precheck() { # check ext path if [ "${EXT_PATH}" ]; then - volume_path=(${EXT_PATH//:/ }) - if [ -f ${volume_path} ]; then - die "no file[${volume_path}]" - exit 1 - fi + cachefile_path=(${EXT_PATH//:/ }) + ${g_rm_cmd} ${cachefile_path} + ${g_mkdir_cmd} $(dirname ${cachefile_path}) + ${g_touch_cmd} ${cachefile_path} fi } @@ -74,6 +76,9 @@ init() { if [ "${EXT_ITEM_AGE}" ]; then g_start_args="${g_start_args} --extended ext_item_age=${EXT_ITEM_AGE}" fi + if [ "${VERBOSE}" ];then + g_start_args="${g_start_args} -${VERBOSE}" + fi } create_container() { diff --git a/playbook/memcached/scripts/start.sh b/playbook/memcached/scripts/start.sh new file mode 100644 index 000000000..cf8c4f69c --- /dev/null +++ b/playbook/memcached/scripts/start.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +g_container_name="memcached-"${PORT} +g_docker_cmd="${SUDO_ALIAS} docker" +g_rm_cmd="${SUDO_ALIAS} rm -rf" +g_mkdir_cmd="${SUDO_ALIAS} mkdir -p" +g_touch_cmd="${SUDO_ALIAS} touch" +g_status="running" + +function msg() { + printf '%b' "$1" >&2 +} + +function success() { + msg "\33[32m[✔]\33[0m ${1}${2}" +} + +function die() { + msg "\33[31m[✘]\33[0m ${1}${2}" + exit 1 +} + +precheck() { + # check ext path + get_status_container + if [ "${EXT_PATH}" ] && [ ${g_status} != "running" ]; then + cachefile_path=(${EXT_PATH//:/ }) + ${g_rm_cmd} ${cachefile_path} + ${g_mkdir_cmd} $(dirname ${cachefile_path}) + ${g_touch_cmd} ${cachefile_path} + fi +} + +start_container() { + ${g_docker_cmd} start ${g_container_name} >& /dev/null + success "start container[${g_container_name}]\n" +} + +get_status_container() { + g_status=`${g_docker_cmd} inspect --format='{{.State.Status}}' ${g_container_name}` +} + +precheck +start_container diff --git a/playbook/memcached/scripts/status.sh b/playbook/memcached/scripts/status.sh index f3bc5314b..eca3918e0 100644 --- a/playbook/memcached/scripts/status.sh +++ b/playbook/memcached/scripts/status.sh @@ -5,6 +5,7 @@ g_start_args="" g_docker_cmd="${SUDO_ALIAS} docker" g_volume_bind="" g_container_id="" +g_status="running" function msg() { printf '%b' "$1" >&2 @@ -35,6 +36,14 @@ show_ip_port() { printf "memcached addr:\t%s:%d\n" ${LISTEN} ${PORT} } +get_status_container() { + g_status=`${g_docker_cmd} inspect --format='{{.State.Status}}' ${g_container_name}` + if [ ${g_status} != "running" ]; then + exit 1 + fi +} + precheck +show_ip_port show_info_container -show_ip_port \ No newline at end of file +get_status_container From 10994d9e21836a0f3bdb9f909dcc1a97bafedfb8 Mon Sep 17 00:00:00 2001 From: Cyber-SiKu Date: Wed, 22 Feb 2023 13:59:17 +0800 Subject: [PATCH 2/3] [fix]playbook/memcached Signed-off-by: Cyber-SiKu --- playbook/memcached/hosts.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/playbook/memcached/hosts.yaml b/playbook/memcached/hosts.yaml index 24fd8cc14..bc6be31cf 100644 --- a/playbook/memcached/hosts.yaml +++ b/playbook/memcached/hosts.yaml @@ -9,6 +9,7 @@ hosts: labels: - memcached envs: + - SUDO_ALIAS=sudo - IMAGE=memcached:1.6.17 - LISTEN=10.0.1.1 - PORT=11211 @@ -26,6 +27,7 @@ hosts: labels: - memcached envs: + - SUDO_ALIAS=sudo - IMAGE=memcached:1.6.17 - LISTEN=10.0.1.2 - PORT=11211 @@ -43,6 +45,7 @@ hosts: labels: - memcached envs: + - SUDO_ALIAS=sudo - IMAGE=memcached:1.6.17 - LISTEN=10.0.1.3 - PORT=11211 From de36749cdc734b071db0f6b758082fadd5b9208e Mon Sep 17 00:00:00 2001 From: Lijin Xiong Date: Fri, 3 Mar 2023 11:13:03 +0800 Subject: [PATCH 3/3] Feature: support managing disk information in database With database disk records commited by `disks.yaml`, disk formatting cloud be performed without `format.yaml`. And it will get and wirte disk `size` and `URI` during disk formatting, thus record the ID of service(chunkserver) with associated disk when deploy curvebs cluster. Use `curveadm disks ls` to view all disk information. Usage: curveadm disks commit /path/to/disks.yaml curveadm disks ls curveadm format Signed-off-by: Lijin Xiong --- cli/cli/cli.go | 35 ++- cli/command/cmd.go | 2 + cli/command/disks/cmd.go | 45 ++++ cli/command/disks/commit.go | 220 ++++++++++++++++++ cli/command/disks/list.go | 73 ++++++ cli/command/disks/show.go | 57 +++++ cli/command/format.go | 35 ++- cli/command/precheck.go | 9 + configs/bs/cluster/client.yaml | 3 +- configs/bs/cluster/disks.yaml | 23 ++ configs/bs/stand-alone/client.yaml | 3 +- configs/fs/cluster/client.yaml | 1 + configs/fs/stand-alone/client.yaml | 1 + internal/build/debug.go | 3 +- internal/common/common.go | 17 ++ internal/configure/common/item_set.go | 32 ++- internal/configure/disks/dc_get.go | 71 ++++++ internal/configure/disks/dc_item.go | 77 ++++++ internal/configure/disks/disks.go | 216 +++++++++++++++++ internal/configure/format.go | 5 +- internal/configure/topology/dc_get.go | 8 +- internal/configure/topology/dc_item.go | 10 +- internal/errno/errno.go | 80 +++++-- internal/playbook/factory.go | 3 + internal/storage/sql.go | 70 +++++- internal/storage/storage.go | 173 ++++++++++++++ internal/task/task/bs/format.go | 63 +++++ internal/task/task/checker/service.go | 37 +++ internal/task/task/common/clean_service.go | 15 ++ internal/task/task/common/create_container.go | 11 + internal/tui/disks.go | 69 ++++++ internal/utils/common.go | 12 +- 32 files changed, 1426 insertions(+), 53 deletions(-) create mode 100644 cli/command/disks/cmd.go create mode 100644 cli/command/disks/commit.go create mode 100644 cli/command/disks/list.go create mode 100644 cli/command/disks/show.go create mode 100644 configs/bs/cluster/disks.yaml create mode 100644 internal/configure/disks/dc_get.go create mode 100644 internal/configure/disks/dc_item.go create mode 100644 internal/configure/disks/disks.go create mode 100644 internal/tui/disks.go diff --git a/cli/cli/cli.go b/cli/cli/cli.go index 703ca62e7..76c0b94fd 100644 --- a/cli/cli/cli.go +++ b/cli/cli/cli.go @@ -64,12 +64,14 @@ type CurveAdm struct { memStorage *utils.SafeMap // properties (hosts/cluster) - hosts string // hosts - clusterId int // current cluster id - clusterUUId string // current cluster uuid - clusterName string // current cluster name - clusterTopologyData string // cluster topology - clusterPoolData string // cluster pool + hosts string // hosts + disks string // disks + diskRecords []storage.Disk // disk list + clusterId int // current cluster id + clusterUUId string // current cluster uuid + clusterName string // current cluster name + clusterTopologyData string // cluster topology + clusterPoolData string // cluster pool } /* @@ -174,6 +176,23 @@ func (curveadm *CurveAdm) init() error { log.Field("ClusterName", cluster.Name)) } + // (8) Get Disks + var disks storage.Disks + diskses, err := s.GetDisks() + if err != nil { + log.Error("Get disks failed", log.Field("Error", err)) + return errno.ERR_GET_DISKS_FAILED.E(err) + } else if len(diskses) > 0 { + disks = diskses[0] + } + + // (9) Get Disk Records + diskRecords, err := s.GetDisk(comm.DISK_FILTER_ALL) + if err != nil { + log.Error("Get disk records failed", log.Field("Error", err)) + return errno.ERR_GET_DISK_RECORDS_FAILED.E(err) + } + curveadm.dbpath = dbpath curveadm.logpath = logpath curveadm.config = config @@ -183,6 +202,8 @@ func (curveadm *CurveAdm) init() error { curveadm.storage = s curveadm.memStorage = utils.NewSafeMap() curveadm.hosts = hosts.Data + curveadm.disks = disks.Data + curveadm.diskRecords = diskRecords curveadm.clusterId = cluster.Id curveadm.clusterUUId = cluster.UUId curveadm.clusterName = cluster.Name @@ -264,6 +285,8 @@ func (curveadm *CurveAdm) Err() io.Writer { return curveadm.e func (curveadm *CurveAdm) Storage() *storage.Storage { return curveadm.storage } func (curveadm *CurveAdm) MemStorage() *utils.SafeMap { return curveadm.memStorage } func (curveadm *CurveAdm) Hosts() string { return curveadm.hosts } +func (curveadm *CurveAdm) Disks() string { return curveadm.disks } +func (curveadm *CurveAdm) DiskRecords() []storage.Disk { return curveadm.diskRecords } func (curveadm *CurveAdm) ClusterId() int { return curveadm.clusterId } func (curveadm *CurveAdm) ClusterUUId() string { return curveadm.clusterUUId } func (curveadm *CurveAdm) ClusterName() string { return curveadm.clusterName } diff --git a/cli/command/cmd.go b/cli/command/cmd.go index 0c19d9548..58c4fd698 100644 --- a/cli/command/cmd.go +++ b/cli/command/cmd.go @@ -31,6 +31,7 @@ import ( "github.com/opencurve/curveadm/cli/command/client" "github.com/opencurve/curveadm/cli/command/cluster" "github.com/opencurve/curveadm/cli/command/config" + "github.com/opencurve/curveadm/cli/command/disks" "github.com/opencurve/curveadm/cli/command/hosts" "github.com/opencurve/curveadm/cli/command/pfs" "github.com/opencurve/curveadm/cli/command/playground" @@ -61,6 +62,7 @@ func addSubCommands(cmd *cobra.Command, curveadm *cli.CurveAdm) { cluster.NewClusterCommand(curveadm), // curveadm cluster ... config.NewConfigCommand(curveadm), // curveadm config ... hosts.NewHostsCommand(curveadm), // curveadm hosts ... + disks.NewDisksCommand(curveadm), // curveadm disks ... playground.NewPlaygroundCommand(curveadm), // curveadm playground ... target.NewTargetCommand(curveadm), // curveadm target ... pfs.NewPFSCommand(curveadm), // curveadm pfs ... diff --git a/cli/command/disks/cmd.go b/cli/command/disks/cmd.go new file mode 100644 index 000000000..68b095277 --- /dev/null +++ b/cli/command/disks/cmd.go @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "github.com/opencurve/curveadm/cli/cli" + cliutil "github.com/opencurve/curveadm/internal/utils" + "github.com/spf13/cobra" +) + +func NewDisksCommand(curveadm *cli.CurveAdm) *cobra.Command { + cmd := &cobra.Command{ + Use: "disks", + Short: "Manage disks", + Args: cliutil.NoArgs, + RunE: cliutil.ShowHelp(curveadm.Err()), + } + + cmd.AddCommand( + NewCommitCommand(curveadm), + NewShowCommand(curveadm), + NewListCommand(curveadm), + ) + return cmd +} diff --git a/cli/command/disks/commit.go b/cli/command/disks/commit.go new file mode 100644 index 000000000..987d58d32 --- /dev/null +++ b/cli/command/disks/commit.go @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "strings" + + "github.com/fatih/color" + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/common" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure/disks" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/storage" + "github.com/opencurve/curveadm/internal/tui" + tuicomm "github.com/opencurve/curveadm/internal/tui/common" + "github.com/opencurve/curveadm/internal/utils" + "github.com/spf13/cobra" +) + +const ( + COMMIT_EXAMPLE = `Examples: + $ curveadm disks commit /path/to/disks.yaml # Commit disks` +) + +type commitOptions struct { + filename string + slient bool +} + +func NewCommitCommand(curveadm *cli.CurveAdm) *cobra.Command { + var options commitOptions + cmd := &cobra.Command{ + Use: "commit DISKS [OPTIONS]", + Short: "Commit disks", + Args: utils.ExactArgs(1), + Example: COMMIT_EXAMPLE, + RunE: func(cmd *cobra.Command, args []string) error { + options.filename = args[0] + return runCommit(curveadm, options) + }, + DisableFlagsInUseLine: true, + } + + flags := cmd.Flags() + flags.BoolVarP(&options.slient, "slient", "s", false, "Slient output for disks commit") + + return cmd +} + +func readAndCheckDisks(curveadm *cli.CurveAdm, options commitOptions) (string, []*disks.DiskConfig, error) { + var dcs []*disks.DiskConfig + // 1) read disks from file + if !utils.PathExist(options.filename) { + return "", dcs, errno.ERR_DISKS_FILE_NOT_FOUND. + F("%s: no such file", utils.AbsPath(options.filename)) + } + data, err := utils.ReadFile(options.filename) + if err != nil { + return data, dcs, errno.ERR_READ_DISKS_FILE_FAILED.E(err) + } + + // 2) display disks difference + oldData := curveadm.Disks() + if !options.slient { + diff := utils.Diff(oldData, data) + curveadm.WriteOutln(diff) + } + + // 3) check disks data + dcs, err = disks.ParseDisks(data, curveadm) + return data, dcs, err +} + +func assambleNewDiskRecords(dcs []*disks.DiskConfig, + oldDiskRecords []storage.Disk) ([]storage.Disk, []storage.Disk) { + keySep := ":" + newDiskMap := make(map[string]bool) + + var newDiskRecords, diskRecordDeleteList []storage.Disk + for _, dc := range dcs { + for _, host := range dc.GetHost() { + key := strings.Join([]string{host, dc.GetDevice()}, keySep) + newDiskMap[key] = true + newDiskRecords = append( + newDiskRecords, storage.Disk{ + Host: host, + Device: dc.GetDevice(), + Size: comm.DISK_DEFAULT_NULL_SIZE, + URI: comm.DISK_DEFAULT_NULL_URI, + MountPoint: dc.GetMountPoint(), + FormatPercent: dc.GetFormatPercent(), + ChunkServerID: comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID, + }) + } + } + + for _, dr := range oldDiskRecords { + key := strings.Join([]string{dr.Host, dr.Device}, keySep) + if _, ok := newDiskMap[key]; !ok { + diskRecordDeleteList = append(diskRecordDeleteList, dr) + } + } + + return newDiskRecords, diskRecordDeleteList +} + +func writeDiskRecord(dr storage.Disk, curveadm *cli.CurveAdm) error { + if diskRecords, err := curveadm.Storage().GetDisk( + common.DISK_FILTER_DEVICE, dr.Host, dr.Device); err != nil { + return err + } else if len(diskRecords) == 0 { + if err := curveadm.Storage().SetDisk( + dr.Host, + dr.Device, + dr.MountPoint, + dr.ContainerImage, + dr.FormatPercent); err != nil { + return err + } + } + return nil +} + +func syncDiskRecords(data string, dcs []*disks.DiskConfig, + curveadm *cli.CurveAdm, options commitOptions) error { + oldDiskRecords := curveadm.DiskRecords() + tui.SortDiskRecords(oldDiskRecords) + + newDiskRecords, diskRecordDeleteList := assambleNewDiskRecords(dcs, oldDiskRecords) + tui.SortDiskRecords(newDiskRecords) + oldDiskRecordsString := tui.FormatDisks(oldDiskRecords) + newDiskRecordsString := tui.FormatDisks(newDiskRecords) + + if !options.slient { + diff := utils.Diff(oldDiskRecordsString, newDiskRecordsString) + curveadm.WriteOutln(diff) + } + + pass := tuicomm.ConfirmYes("Disk changes are showing above. Do you want to continue?") + if !pass { + curveadm.WriteOut(tuicomm.PromptCancelOpetation("commit disk table")) + return errno.ERR_CANCEL_OPERATION + } + + // write new disk records + for _, dr := range newDiskRecords { + if err := writeDiskRecord(dr, curveadm); err != nil { + return err + } + } + + // delete obsolete disk records + for _, dr := range diskRecordDeleteList { + if dr.ChunkServerID != comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID { + return errno.ERR_DELETE_SERVICE_BINDING_DISK. + F("The disk[%s:%s] is used by service[%s:%s]", + dr.Host, dr.Device, topology.ROLE_CHUNKSERVER, dr.ChunkServerID) + } + + if err := curveadm.Storage().DeleteDisk(dr.Host, dr.Device); err != nil { + return errno.ERR_UPDATE_DISK_FAILED.E(err) + } + } + + return nil +} + +func runCommit(curveadm *cli.CurveAdm, options commitOptions) error { + // 1) read and check disks + data, dcs, err := readAndCheckDisks(curveadm, options) + if err != nil { + return err + } + + // 2) confirm by user + pass := tuicomm.ConfirmYes("Do you want to continue?") + if !pass { + curveadm.WriteOut(tuicomm.PromptCancelOpetation("commit disks")) + return errno.ERR_CANCEL_OPERATION + } + + // 3) add disk records + err = syncDiskRecords(data, dcs, curveadm, options) + if err != nil { + return err + } + + // 4) add disks data + err = curveadm.Storage().SetDisks(data) + if err != nil { + return errno.ERR_UPDATE_DISKS_FAILED. + F("commit disks failed") + } + + // 5) print success prompt + curveadm.WriteOutln(color.GreenString("Disks updated")) + return nil +} diff --git a/cli/command/disks/list.go b/cli/command/disks/list.go new file mode 100644 index 000000000..f88f2f44d --- /dev/null +++ b/cli/command/disks/list.go @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/storage" + "github.com/opencurve/curveadm/internal/tui" + cliutil "github.com/opencurve/curveadm/internal/utils" + "github.com/spf13/cobra" +) + +type listOptions struct { + host string +} + +func NewListCommand(curveadm *cli.CurveAdm) *cobra.Command { + var options listOptions + cmd := &cobra.Command{ + Use: "ls [OPTIONS]", + Aliases: []string{"list"}, + Short: "List disk information", + Args: cliutil.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return runList(curveadm, options) + }, + DisableFlagsInUseLine: true, + } + + flags := cmd.Flags() + flags.StringVar(&options.host, "host", "*", "List disk of host") + + return cmd +} + +func runList(curveadm *cli.CurveAdm, options listOptions) error { + var err error + var diskRecords []storage.Disk + + if options.host == "*" { + diskRecords = curveadm.DiskRecords() + } else { + if diskRecords, err = curveadm.Storage().GetDisk(common.DISK_FILTER_HOST, + options.host); err != nil { + return err + } + } + + output := tui.FormatDisks(diskRecords) + curveadm.WriteOut(output) + return nil +} diff --git a/cli/command/disks/show.go b/cli/command/disks/show.go new file mode 100644 index 000000000..317833ddc --- /dev/null +++ b/cli/command/disks/show.go @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "github.com/opencurve/curveadm/cli/cli" + cliutil "github.com/opencurve/curveadm/internal/utils" + "github.com/spf13/cobra" +) + +type showOptions struct{} + +func NewShowCommand(curveadm *cli.CurveAdm) *cobra.Command { + var options showOptions + + cmd := &cobra.Command{ + Use: "show", + Short: "Show disks", + Args: cliutil.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return runShow(curveadm, options) + }, + DisableFlagsInUseLine: true, + } + + return cmd +} + +func runShow(curveadm *cli.CurveAdm, options showOptions) error { + disks := curveadm.Disks() + if len(disks) == 0 { + curveadm.WriteOutln("") + } else { + curveadm.WriteOut(disks) + } + return nil +} diff --git a/cli/command/format.go b/cli/command/format.go index d88c397ec..6c264f317 100644 --- a/cli/command/format.go +++ b/cli/command/format.go @@ -23,6 +23,8 @@ package command import ( + "fmt" + "github.com/opencurve/curveadm/cli/cli" comm "github.com/opencurve/curveadm/internal/common" "github.com/opencurve/curveadm/internal/configure" @@ -132,10 +134,35 @@ func displayFormatStatus(curveadm *cli.CurveAdm, fcs []*configure.FormatConfig, } func runFormat(curveadm *cli.CurveAdm, options formatOptions) error { - // 1) parse format config - fcs, err := configure.ParseFormat(options.filename) - if err != nil { - return err + var err error + var fcs []*configure.FormatConfig + diskRecords := curveadm.DiskRecords() + + // 1) parse format config from yaml file or database + if len(diskRecords) == 0 { + fcs, err = configure.ParseFormat(options.filename) + if err != nil { + return err + } + } else { + for _, dr := range diskRecords { + containerImage := configure.DEFAULT_CONTAINER_IMAGE + if len(dr.ContainerImage) > 0 { + containerImage = dr.ContainerImage + } + disk := fmt.Sprintf("%s:%s:%d", dr.Device, dr.MountPoint, dr.FormatPercent) + fc, err := configure.NewFormatConfig(containerImage, dr.Host, disk) + if err != nil { + return err + } + fc.UseDiskUri = true + chunkserverId := dr.ChunkServerID + if len(chunkserverId) > 1 { + // skip formatting the disk with nonempty chunkserver id + continue + } + fcs = append(fcs, fc) + } } // 2) generate start playbook diff --git a/cli/command/precheck.go b/cli/command/precheck.go index f77ce9a9f..7e2201ec0 100644 --- a/cli/command/precheck.go +++ b/cli/command/precheck.go @@ -52,6 +52,7 @@ const ( CHECK_ITEM_NERWORK = "network" CHECK_ITEM_DATE = "date" CHECK_ITEM_SERVICE = "service" + CHECK_ITEM_DISK = "disk" ) var ( @@ -67,6 +68,7 @@ var ( playbook.CHECK_NETWORK_FIREWALL, playbook.GET_HOST_DATE, // date playbook.CHECK_HOST_DATE, + playbook.CHECK_DISK_SIZE, // disk playbook.CHECK_CHUNKFILE_POOL, // service //playbook.CHECK_S3, } @@ -98,6 +100,7 @@ var ( playbook.CHECK_NETWORK_FIREWALL: CHECK_ITEM_NERWORK, playbook.GET_HOST_DATE: CHECK_ITEM_DATE, playbook.CHECK_HOST_DATE: CHECK_ITEM_DATE, + playbook.CHECK_DISK_SIZE: CHECK_ITEM_DISK, playbook.CHECK_CHUNKFILE_POOL: CHECK_ITEM_SERVICE, playbook.CHECK_S3: CHECK_ITEM_SERVICE, } @@ -190,6 +193,12 @@ func genPrecheckPlaybook(curveadm *cli.CurveAdm, configs = configs[:1] case playbook.CHECK_CHUNKFILE_POOL: configs = curveadm.FilterDeployConfigByRole(dcs, ROLE_CHUNKSERVER) + case playbook.CHECK_DISK_SIZE: + // skip disk size checking with empty records + if len(curveadm.DiskRecords()) == 0 { + continue + } + configs = curveadm.FilterDeployConfigByRole(dcs, ROLE_CHUNKSERVER) } pb.AddStep(&playbook.PlaybookStep{ diff --git a/configs/bs/cluster/client.yaml b/configs/bs/cluster/client.yaml index 6cd981ed9..a9942c5b0 100644 --- a/configs/bs/cluster/client.yaml +++ b/configs/bs/cluster/client.yaml @@ -1,7 +1,8 @@ +kind: curvebs user: curve host: 10.0.1.1 ssh_port: 22 private_key_file: /home/curve/.ssh/id_rsa container_image: opencurvedocker/curvebs:v1.2 mds.listen.addr: 10.0.1.1:6700,10.0.1.2:6700,10.0.1.3:6700 -log_dir: /home/curve/curvebs/logs/client \ No newline at end of file +log_dir: /home/curve/curvebs/logs/client diff --git a/configs/bs/cluster/disks.yaml b/configs/bs/cluster/disks.yaml new file mode 100644 index 000000000..6ee2214a0 --- /dev/null +++ b/configs/bs/cluster/disks.yaml @@ -0,0 +1,23 @@ +global: + format_percent: 95 + container_image: opencurvedocker/curvebs:v1.2 + host: + - curve-1 + - curve-2 + - curve-3 + +disk: + - device: /dev/sdb1 + mount: /data/chunkserver0 + - device: /dev/sdc1 + mount: /data/chunkserver1 + format_percent: 90 + - device: /dev/sdd1 + mount: /data/chunkserver2 + exclude: # for the use case that some hosts have not certain disk device + - curve-3 + - device: /dev/sde1 + mount: /data/chunkserver3 + host: + - curve-1 + - curve-2 diff --git a/configs/bs/stand-alone/client.yaml b/configs/bs/stand-alone/client.yaml index 10685ab7e..50315e9af 100644 --- a/configs/bs/stand-alone/client.yaml +++ b/configs/bs/stand-alone/client.yaml @@ -1,7 +1,8 @@ +kind: curvebs user: curve host: 10.0.1.1 ssh_port: 22 private_key_file: /home/curve/.ssh/id_rsa container_image: opencurvedocker/curvebs:v1.2 mds.listen.addr: 10.0.1.1:16700,10.0.1.1:26700,10.0.1.1:36700 -log_dir: /home/curve/curvebs/logs/client \ No newline at end of file +log_dir: /home/curve/curvebs/logs/client diff --git a/configs/fs/cluster/client.yaml b/configs/fs/cluster/client.yaml index 74c74e68b..cbe164c85 100644 --- a/configs/fs/cluster/client.yaml +++ b/configs/fs/cluster/client.yaml @@ -1,3 +1,4 @@ +kind: curvefs container_image: opencurvedocker/curvefs:latest mdsOpt.rpcRetryOpt.addrs: 10.0.1.1:6700,10.0.1.2:6700,10.0.1.3:6700 log_dir: /home/curve/curvefs/logs/client diff --git a/configs/fs/stand-alone/client.yaml b/configs/fs/stand-alone/client.yaml index 74c74e68b..cbe164c85 100644 --- a/configs/fs/stand-alone/client.yaml +++ b/configs/fs/stand-alone/client.yaml @@ -1,3 +1,4 @@ +kind: curvefs container_image: opencurvedocker/curvefs:latest mdsOpt.rpcRetryOpt.addrs: 10.0.1.1:6700,10.0.1.2:6700,10.0.1.3:6700 log_dir: /home/curve/curvefs/logs/client diff --git a/internal/build/debug.go b/internal/build/debug.go index 48559377e..8724ca5ad 100644 --- a/internal/build/debug.go +++ b/internal/build/debug.go @@ -33,12 +33,13 @@ import ( const ( DEBUG_CURVEADM_CONFIGURE = "DEBUG_CURVEADM_CONFIGURE" DEBUG_HOSTS = "DEBUG_HOSTS" + DEBUG_DISKS = "DEBUG_DISKS" DEBUG_SMART_CONFIGS = "DEBUG_SMART_CONFIGS" DEBUG_TOPOLOGY = "DEBUG_TOPOLOGY" DEBUG_TOOL = "DEBUG_TOOL" DEBUG_CLUSTER = "DEBUG_CLUSTER" DEBUG_CREATE_POOL = "DEBUG_CREATE_POOL" - DEBUG_CLIENT_CONFIGURE = "DEBUG_CLIENT_CONFIGURE" + DEBUG_CLIENT_CONFIGURE = "DEBUG_CLIENT_CONFIGURE" ) type Field struct { diff --git a/internal/common/common.go b/internal/common/common.go index c897f0cd7..a1bd10e72 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -46,6 +46,23 @@ const ( POOL_TYPE_LOGICAL = "logicalpool" POOL_TYPE_PHYSICAL = "physicalpool" + // disk + DISK_DEFAULT_NULL_SIZE = "-" + DISK_DEFAULT_NULL_URI = DISK_DEFAULT_NULL_SIZE + DISK_DEFAULT_NULL_CONTAINER_IMAGE = DISK_DEFAULT_NULL_SIZE + DISK_DEFAULT_NULL_CHUNKSERVER_ID = DISK_DEFAULT_NULL_SIZE + DISK_FILTER_ALL = "all" + DISK_FILTER_HOST = "host" + DISK_FILTER_DEVICE = "device" + DISK_FILTER_MOUNT = "mount" + DISK_FILTER_SERVICE = "service" + DISK_EXCLUDE_HOST = "exclude" + + DISK_FORMAT_PERCENT = "format_percent" + + DISK_FORMAT_MOUNT_POINT = "mount" + DISK_FORMAT_CONTAINER_IMAGE = "container_image" + // format KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS" diff --git a/internal/configure/common/item_set.go b/internal/configure/common/item_set.go index 9e853c376..acfbc9484 100644 --- a/internal/configure/common/item_set.go +++ b/internal/configure/common/item_set.go @@ -35,6 +35,7 @@ const ( REQUIRE_BOOL REQUIRE_INT REQUIRE_POSITIVE_INTEGER + REQUIRE_SLICE ) type ( @@ -89,8 +90,10 @@ func (itemset *ItemSet) Build(key string, value interface{}) (interface{}, error v, ok := utils.All2Str(value) if !ok { - return nil, errno.ERR_UNSUPPORT_CONFIGURE_VALUE_TYPE. - F("%s: %v", key, value) + if !utils.IsAnySlice(value) { + return nil, errno.ERR_UNSUPPORT_CONFIGURE_VALUE_TYPE. + F("%s: %v", key, value) + } } switch item.require { @@ -132,9 +135,34 @@ func (itemset *ItemSet) Build(key string, value interface{}) (interface{}, error return v, nil } + case REQUIRE_SLICE: + anySlice := value.([]any) + if len(anySlice) > 0 { + switch anySlice[0].(type) { + case string: + return convertSlice[string](value), nil + case int: + return convertSlice[int](value), nil + case bool: + return convertSlice[bool](value), nil + default: + return []any{}, errno.ERR_UNSUPPORT_CONFIGURE_VALUE_TYPE. + F("%s: %v", key, value) + } + } + return []any{}, nil + default: // do nothing } return value, nil } + +func convertSlice[T int | string | any](value any) []T { + var slice []T + for _, str := range value.([]any) { + slice = append(slice, str.(T)) + } + return slice +} diff --git a/internal/configure/disks/dc_get.go b/internal/configure/disks/dc_get.go new file mode 100644 index 000000000..e4896837e --- /dev/null +++ b/internal/configure/disks/dc_get.go @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + comm "github.com/opencurve/curveadm/internal/configure/common" + "github.com/opencurve/curveadm/internal/utils" +) + +func (dc *DiskConfig) get(i *comm.Item) interface{} { + if v, ok := dc.config[i.Key()]; ok { + return v + } + + defaultValue := i.DefaultValue() + if defaultValue != nil && utils.IsFunc(defaultValue) { + return defaultValue.(func(*DiskConfig) interface{})(dc) + } + return defaultValue +} + +func (dc *DiskConfig) getString(i *comm.Item) string { + v := dc.get(i) + if v == nil { + return "" + } + return v.(string) +} + +func (dc *DiskConfig) getInt(i *comm.Item) int { + v := dc.get(i) + if v == nil { + return 0 + } + return v.(int) +} + +func (dc *DiskConfig) getStrSlice(i *comm.Item) []string { + v := dc.get(i) + if v == nil { + return []string{} + } + return v.([]string) +} + +func (dc *DiskConfig) GetContainerImage() string { return dc.getString(CONFIG_GLOBAL_CONTAINER_IMAGE) } +func (dc *DiskConfig) GetFormatPercent() int { return dc.getInt(CONFIG_GLOBAL_FORMAT_PERCENT) } +func (dc *DiskConfig) GetHost() []string { return dc.getStrSlice(CONFIG_GLOBAL_HOST) } +func (dc *DiskConfig) GetDevice() string { return dc.getString(CONFIG_DISK_DEVICE) } +func (dc *DiskConfig) GetMountPoint() string { return dc.getString(CONFIG_DISK_MOUNT_POINT) } +func (dc *DiskConfig) GetHostExclude() []string { return dc.getStrSlice(CONFIG_DISK_HOST_EXCLUDE) } diff --git a/internal/configure/disks/dc_item.go b/internal/configure/disks/dc_item.go new file mode 100644 index 000000000..d56f10d8a --- /dev/null +++ b/internal/configure/disks/dc_item.go @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "github.com/opencurve/curveadm/internal/common" + comm "github.com/opencurve/curveadm/internal/configure/common" +) + +const ( + DEFAULT_FORMAT_PERCENT = 90 +) + +var ( + itemset = comm.NewItemSet() + + CONFIG_GLOBAL_CONTAINER_IMAGE = itemset.Insert( + common.DISK_FORMAT_CONTAINER_IMAGE, + comm.REQUIRE_STRING, + false, + nil, + ) + + CONFIG_GLOBAL_FORMAT_PERCENT = itemset.Insert( + common.DISK_FORMAT_PERCENT, + comm.REQUIRE_POSITIVE_INTEGER, + false, + DEFAULT_FORMAT_PERCENT, + ) + + CONFIG_GLOBAL_HOST = itemset.Insert( + common.DISK_FILTER_HOST, + comm.REQUIRE_SLICE, + false, + nil, + ) + CONFIG_DISK_DEVICE = itemset.Insert( + common.DISK_FILTER_DEVICE, + comm.REQUIRE_STRING, + false, + nil, + ) + + CONFIG_DISK_MOUNT_POINT = itemset.Insert( + common.DISK_FORMAT_MOUNT_POINT, + comm.REQUIRE_STRING, + false, + nil, + ) + + CONFIG_DISK_HOST_EXCLUDE = itemset.Insert( + common.DISK_EXCLUDE_HOST, + comm.REQUIRE_SLICE, + false, + nil, + ) +) diff --git a/internal/configure/disks/disks.go b/internal/configure/disks/disks.go new file mode 100644 index 000000000..8fa38c33a --- /dev/null +++ b/internal/configure/disks/disks.go @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package disks + +import ( + "bytes" + "strings" + + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/build" + "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/storage" + "github.com/opencurve/curveadm/internal/utils" + "github.com/spf13/viper" +) + +const ( + DISK_URI_SEP = "//" + DISK_URI_PROTO_FS_UUID = "fs:uuid" +) + +type ( + Disks struct { + Global map[string]interface{} `mapstructure:"global"` + Disk []map[string]interface{} `mapstructure:"disk"` + } + + DiskConfig struct { + sequence int + config map[string]interface{} + } +) + +func merge(parent, child map[string]interface{}) { + for k, v := range parent { + if child[k] == nil { + child[k] = v + } + } + +} + +func newIfNil(config map[string]interface{}) map[string]interface{} { + if config == nil { + return map[string]interface{}{} + } + return config +} + +func mergeFinalHost(dc *DiskConfig) { + hostExclude := dc.GetHostExclude() + if len(hostExclude) > 0 { + diskHost := []string{} + hostExcludeMap := utils.Slice2Map(hostExclude) + for _, h := range dc.GetHost() { + if _, ok := hostExcludeMap[h]; !ok { + diskHost = append(diskHost, h) + } + } + dc.config[common.DISK_FILTER_HOST] = diskHost + } +} + +func checkDupHost(dc *DiskConfig) error { + existHost := map[string]bool{} + for _, h := range dc.GetHost() { + if _, ok := existHost[h]; ok { + return errno.ERR_DUPLICATE_HOST. + F("duplicated host: %s", h) + } + existHost[h] = true + } + return nil +} + +func checkDiskRootConfig(disks *Disks) error { + if disks.Global == nil { + return errno.ERR_GLOBAL_FIELD_MISSING. + F("disks yaml has not 'global' field") + } + if disks.Disk == nil { + return errno.ERR_DISK_FIELD_MISSING. + F("disks yaml has not 'disk' field") + } + return nil +} + +func GenDiskURI(proto, uri string) string { + return strings.Join([]string{proto, uri}, DISK_URI_SEP) +} + +func GetDiskId(disk storage.Disk) (string, error) { + uriSlice := strings.Split(disk.URI, DISK_URI_SEP) + if len(uriSlice) == 0 { + return "", errno.ERR_INVALID_DISK_URI. + F("The disk[%s:%s] URI[%s] is invalid", disk.Host, disk.Device, disk.URI) + } + + if uriSlice[0] == DISK_URI_PROTO_FS_UUID { + return uriSlice[1], nil + } + return "", nil +} + +func (dc *DiskConfig) Build() error { + for key, value := range dc.config { + if itemset.Get(key) == nil { + return errno.ERR_UNSUPPORT_DISKS_CONFIGURE_ITEM. + F("disks[%d].%s = %v", dc.sequence, key, value) + } + v, err := itemset.Build(key, value) + if err != nil { + return err + } else { + dc.config[key] = v + } + } + + mergeFinalHost(dc) + + if len(dc.GetHost()) == 0 { + return errno.ERR_HOST_FIELD_MISSING. + F("disks[%d].host = []", dc.sequence) + } else if dc.GetDevice() == "" { + return errno.ERR_DEVICE_FIELD_MISSING. + F("disks[%d].device = nil", dc.sequence) + } else if dc.GetMountPoint() == "" { + return errno.ERR_MOUNT_POINT_FIELD_MISSING. + F("disks[%d].mount = nil", dc.sequence) + } else if dc.GetFormatPercent() == 0 { + return errno.ERR_FORMAT_PERCENT_FIELD_MISSING. + F("disks[%d].format_percent = nil", dc.sequence) + } else if dc.GetFormatPercent() > 100 { + return errno.ERR_DISK_FORMAT_PERCENT_EXCEED_100. + F("disks[%d].format_percent = %d", dc.sequence, dc.GetFormatPercent()) + } + + if err := checkDupHost(dc); err != nil { + return err + } + + return nil +} + +func NewDiskConfig(sequence int, config map[string]interface{}) *DiskConfig { + return &DiskConfig{ + sequence: sequence, + config: config, + } +} + +func ParseDisks(data string, curveadm *cli.CurveAdm) ([]*DiskConfig, error) { + parser := viper.NewWithOptions(viper.KeyDelimiter("::")) + parser.SetConfigType("yaml") + err := parser.ReadConfig(bytes.NewBuffer([]byte(data))) + if err != nil { + return nil, errno.ERR_PARSE_DISKS_FAILED.E(err) + } + + disks := &Disks{} + if err := parser.Unmarshal(disks); err != nil { + return nil, errno.ERR_PARSE_DISKS_FAILED.E(err) + } + + var dcs []*DiskConfig + exist := map[string]bool{} + if err := checkDiskRootConfig(disks); err != nil { + return dcs, err + } + for i, disk := range disks.Disk { + disk = newIfNil(disk) + merge(disks.Global, disk) + dc := NewDiskConfig(i, disk) + err = dc.Build() + if err != nil { + return nil, err + } + + if _, ok := exist[dc.GetDevice()]; ok { + return nil, errno.ERR_DUPLICATE_DISK. + F("duplicate disk: %s", dc.GetDevice()) + } + if _, ok := exist[dc.GetMountPoint()]; ok { + return nil, errno.ERR_DUPLICATE_DISK. + F("duplicate disk mount point: %s", dc.GetMountPoint()) + } + + dcs = append(dcs, dc) + exist[dc.GetDevice()] = true + exist[dc.GetMountPoint()] = true + } + + build.DEBUG(build.DEBUG_DISKS, disks) + return dcs, nil +} diff --git a/internal/configure/format.go b/internal/configure/format.go index fa9f0581f..571c113ae 100644 --- a/internal/configure/format.go +++ b/internal/configure/format.go @@ -51,6 +51,7 @@ type ( Device string MountPoint string FormtPercent int + UseDiskUri bool } Format struct { @@ -60,7 +61,7 @@ type ( } ) -func newFormatConfig(containerImage, host, disk string) (*FormatConfig, error) { +func NewFormatConfig(containerImage, host, disk string) (*FormatConfig, error) { items := strings.Split(disk, ":") if len(items) != 3 { return nil, errno.ERR_INVALID_DISK_FORMAT.S(disk) @@ -121,7 +122,7 @@ func ParseFormat(filename string) ([]*FormatConfig, error) { fcs := []*FormatConfig{} for _, host := range format.Hosts { for _, disk := range format.Disks { - fc, err := newFormatConfig(containerImage, host, disk) + fc, err := NewFormatConfig(containerImage, host, disk) if err != nil { return nil, err } diff --git a/internal/configure/topology/dc_get.go b/internal/configure/topology/dc_get.go index 85e9b6b3f..8e4cc44a1 100644 --- a/internal/configure/topology/dc_get.go +++ b/internal/configure/topology/dc_get.go @@ -121,9 +121,11 @@ func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.service func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables } // (2): config item -func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) } -func (dc *DeployConfig) GetReportUsage() bool { return dc.getBool(CONFIG_REPORT_USAGE) } -func (dc *DeployConfig) GetContainerImage() string { return dc.getString(CONFIG_CONTAINER_IMAGE) } +func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) } +func (dc *DeployConfig) GetReportUsage() bool { return dc.getBool(CONFIG_REPORT_USAGE) } +func (dc *DeployConfig) GetContainerImage() string { + return dc.getString(CONFIG_GLOBAL_CONTAINER_IMAGE) +} func (dc *DeployConfig) GetLogDir() string { return dc.getString(CONFIG_LOG_DIR) } func (dc *DeployConfig) GetDataDir() string { return dc.getString(CONFIG_DATA_DIR) } func (dc *DeployConfig) GetCoreDir() string { return dc.getString(CONFIG_CORE_DIR) } diff --git a/internal/configure/topology/dc_item.go b/internal/configure/topology/dc_item.go index d24856d14..9c52abc39 100644 --- a/internal/configure/topology/dc_item.go +++ b/internal/configure/topology/dc_item.go @@ -32,6 +32,7 @@ const ( REQUIRE_STRING REQUIRE_BOOL REQUIRE_POSITIVE_INTEGER + REQUIRE_SLICE // default value DEFAULT_REPORT_USAGE = true @@ -68,9 +69,10 @@ type ( ) // you should add config item to itemset iff you want to: -// (1) check the configuration item value, like type, valid value OR -// (2) filter out the configuration item for service config OR -// (3) set the default value for configuration item +// +// (1) check the configuration item value, like type, valid value OR +// (2) filter out the configuration item for service config OR +// (3) set the default value for configuration item var ( itemset = &itemSet{ items: []*item{}, @@ -96,7 +98,7 @@ var ( DEFAULT_REPORT_USAGE, ) - CONFIG_CONTAINER_IMAGE = itemset.insert( + CONFIG_GLOBAL_CONTAINER_IMAGE = itemset.insert( "container_image", REQUIRE_STRING, true, diff --git a/internal/errno/errno.go b/internal/errno/errno.go index b6a281f71..78670e65e 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -193,35 +193,41 @@ var ( ERR_INIT_SQL_DATABASE_FAILED = EC(100000, "init SQLite database failed") // 110: database/SQL (execute SQL statement: hosts table) - ERR_GET_HOSTS_FAILED = EC(110000, "execute SQL failed which get hosts") - ERR_UPDATE_HOSTS_FAILED = EC(110001, "execute SQL failed which update hosts") + ERR_GET_HOSTS_FAILED = EC(110000, "execute SQL failed while get hosts") + ERR_UPDATE_HOSTS_FAILED = EC(110001, "execute SQL failed while update hosts") // 111: database/SQL (execute SQL statement: clusters table) - ERR_INSERT_CLUSTER_FAILED = EC(111000, "execute SQL failed which insert cluster") - ERR_GET_CURRENT_CLUSTER_FAILED = EC(111001, "execute SQL failed which get current cluster") - ERR_GET_CLUSTER_BY_NAME_FAILED = EC(111002, "execute SQL failed which get cluster by name") - ERR_GET_ALL_CLUSTERS_FAILED = EC(111003, "execute SQL failed which get all clusters") - ERR_CHECKOUT_CLUSTER_FAILED = EC(111004, "execute SQL failed which checkout cluster") - ERR_DELETE_CLUSTER_FAILED = EC(111005, "execute SQL failed which delete cluster") - ERR_UPDATE_CLUSTER_TOPOLOGY_FAILED = EC(111006, "execute SQL failed which update cluster topology") - ERR_UPDATE_CLUSTER_POOL_FAILED = EC(111007, "execute SQL failed which update cluster pool") + ERR_INSERT_CLUSTER_FAILED = EC(111000, "execute SQL failed while insert cluster") + ERR_GET_CURRENT_CLUSTER_FAILED = EC(111001, "execute SQL failed while get current cluster") + ERR_GET_CLUSTER_BY_NAME_FAILED = EC(111002, "execute SQL failed while get cluster by name") + ERR_GET_ALL_CLUSTERS_FAILED = EC(111003, "execute SQL failed while get all clusters") + ERR_CHECKOUT_CLUSTER_FAILED = EC(111004, "execute SQL failed while checkout cluster") + ERR_DELETE_CLUSTER_FAILED = EC(111005, "execute SQL failed while delete cluster") + ERR_UPDATE_CLUSTER_TOPOLOGY_FAILED = EC(111006, "execute SQL failed while update cluster topology") + ERR_UPDATE_CLUSTER_POOL_FAILED = EC(111007, "execute SQL failed while update cluster pool") // 112: database/SQL (execute SQL statement: containers table) - ERR_INSERT_SERVICE_CONTAINER_ID_FAILED = EC(112000, "execute SQL failed which insert service container id") - ERR_SET_SERVICE_CONTAINER_ID_FAILED = EC(112001, "execute SQL failed which set service container id") - ERR_GET_SERVICE_CONTAINER_ID_FAILED = EC(112002, "execute SQL failed which get service container id") - ERR_GET_ALL_SERVICES_CONTAINER_ID_FAILED = EC(112003, "execute SQL failed which get all services container id") + ERR_INSERT_SERVICE_CONTAINER_ID_FAILED = EC(112000, "execute SQL failed while insert service container id") + ERR_SET_SERVICE_CONTAINER_ID_FAILED = EC(112001, "execute SQL failed while set service container id") + ERR_GET_SERVICE_CONTAINER_ID_FAILED = EC(112002, "execute SQL failed while get service container id") + ERR_GET_ALL_SERVICES_CONTAINER_ID_FAILED = EC(112003, "execute SQL failed while get all services container id") // 113: database/SQL (execute SQL statement: clients table) - ERR_INSERT_CLIENT_FAILED = EC(113000, "execute SQL failed which insert client") - ERR_GET_CLIENT_CONTAINER_ID_FAILED = EC(113001, "execute SQL failed which get client container id") - ERR_GET_CLIENT_BY_ID_FAILED = EC(113002, "execute SQL failed which get client by id") - ERR_GET_ALL_CLIENTS_FAILED = EC(113003, "execute SQL failed which get all clients") - ERR_DELETE_CLIENT_FAILED = EC(113004, "execute SQL failed which delete client") + ERR_INSERT_CLIENT_FAILED = EC(113000, "execute SQL failed while insert client") + ERR_GET_CLIENT_CONTAINER_ID_FAILED = EC(113001, "execute SQL failed while get client container id") + ERR_GET_CLIENT_BY_ID_FAILED = EC(113002, "execute SQL failed while get client by id") + ERR_GET_ALL_CLIENTS_FAILED = EC(113003, "execute SQL failed while get all clients") + ERR_DELETE_CLIENT_FAILED = EC(113004, "execute SQL failed while delete client") // 114: database/SQL (execute SQL statement: playground table) - ERR_INSERT_PLAYGROUND_FAILED = EC(114000, "execute SQL failed which insert playground") - ERR_GET_ALL_PLAYGROUND_FAILED = EC(114001, "execute SQL failed which get all playgrounds") - ERR_GET_PLAYGROUND_BY_NAME_FAILED = EC(114002, "execute SQL failed which get playground by name") - ERR_DELETE_PLAYGROUND_FAILED = EC(114003, "execute SQL failed which delete playground") + ERR_INSERT_PLAYGROUND_FAILED = EC(114000, "execute SQL failed while insert playground") + ERR_GET_ALL_PLAYGROUND_FAILED = EC(114001, "execute SQL failed while get all playgrounds") + ERR_GET_PLAYGROUND_BY_NAME_FAILED = EC(114002, "execute SQL failed while get playground by name") + ERR_DELETE_PLAYGROUND_FAILED = EC(114003, "execute SQL failed while delete playground") // 115: database/SQL (execute SQL statement: audit table) - ERR_GET_AUDIT_LOGS_FAILE = EC(115000, "execute SQL failed which get audit logs") + ERR_GET_AUDIT_LOGS_FAILE = EC(115000, "execute SQL failed while get audit logs") + // 116: database/SQL (execute SQL statement: disk table) + ERR_UPDATE_DISK_FAILED = EC(116000, "execute SQL failed while updating disk") + ERR_GET_DISK_RECORDS_FAILED = EC(116001, "execute SQL failed while get disk records") + // 117: database/SQL (execute SQL statement: disks table) + ERR_GET_DISKS_FAILED = EC(117000, "execute SQL failed while get disks") + ERR_UPDATE_DISKS_FAILED = EC(117001, "execute SQL failed while updating disks") // 200: command options (hosts) @@ -274,7 +280,6 @@ var ( // 311: configure (curveadm.cfg: invalid configure value) ERR_UNSUPPORT_CURVEADM_LOG_LEVEL = EC(311000, "unsupport curveadm log level") ERR_UNSUPPORT_CURVEADM_CONFIGURE_ITEM = EC(311001, "unsupport curveadm configure item") - // 320: configure (hosts.yaml: parse failed) ERR_HOSTS_FILE_NOT_FOUND = EC(320000, "hosts file not found") ERR_READ_HOSTS_FILE_FAILED = EC(320001, "read hosts file failed") @@ -291,6 +296,24 @@ var ( ERR_DUPLICATE_HOST = EC(321007, "host is duplicate") ERR_HOSTNAME_REQUIRES_VALID_IP_ADDRESS = EC(321008, "hostname requires valid IP address") + // 322: configure (disks.yaml: parse failed) + ERR_DISKS_FILE_NOT_FOUND = EC(322000, "disks file not found") + ERR_READ_DISKS_FILE_FAILED = EC(322001, "read disks file failed") + ERR_PARSE_DISKS_FAILED = EC(322002, "parse disks failed") + + // 323: configure (disks.yaml: invalid configure value) + ERR_UNSUPPORT_DISKS_CONFIGURE_ITEM = EC(323000, "unsupport disks configure item") + ERR_GLOBAL_FIELD_MISSING = EC(323001, "global field missing") + ERR_DISK_FIELD_MISSING = EC(323002, "disk field missing") + ERR_DEVICE_FIELD_MISSING = EC(323003, "device field missing") + ERR_MOUNT_POINT_FIELD_MISSING = EC(323004, "mount field missing") + ERR_FORMAT_PERCENT_FIELD_MISSING = EC(323005, "format_percent field missing") + ERR_DUPLICATE_DISK = EC(323006, "disk is duplicated") + ERR_DUPLICATE_DISK_MOUNT_POINT = EC(323007, "disk mount point is duplicated") + ERR_DISK_FORMAT_PERCENT_EXCEED_100 = EC(323008, "disk format percent is greater than 100") + ERR_DELETE_SERVICE_BINDING_DISK = EC(323009, "cannot remove service binding disk") + ERR_INVALID_DISK_URI = EC(323010, "invalid disk uri") + // 330: configure (topology.yaml: parse failed) ERR_TOPOLOGY_FILE_NOT_FOUND = EC(330000, "topology file not found") ERR_READ_TOPOLOGY_FILE_FAILED = EC(330001, "read topology file failed") @@ -341,6 +364,10 @@ var ( ERR_REQUIRE_CURVEFS_KIND_CLIENT_CONFIGURE_FILE = EC(351003, "require curvefs kind client configure file") ERR_INVALID_CLUSTER_LISTEN_MDS_ADDRESS = EC(351004, "invalid cluster MDS listen address") + // 360: sqlite database + ERR_DATABASE_EMPTY_QUERY_RESULT = EC(360000, "empty query result") + ERR_DATABASE_EMPTY_DISK_UUID = EC(360001, "empty disk uuid") + // 400: common (hosts) ERR_HOST_NOT_FOUND = EC(400000, "host not found") @@ -515,6 +542,9 @@ var ( // 690: execuetr task (others) ERR_START_CRONTAB_IN_CONTAINER_FAILED = EC(690000, "start crontab in container failed") + // 800: deploy + ERR_DISK_DEVICE_NOT_FORMATTED = EC(800000, "disk device not formatted") + // 900: others ERR_CANCEL_OPERATION = EC(CODE_CANCEL_OPERATION, "cancel operation") // 999 diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index fbccaa87f..90524f51f 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -48,6 +48,7 @@ const ( CHECK_NETWORK_FIREWALL GET_HOST_DATE CHECK_HOST_DATE + CHECK_DISK_SIZE CHECK_CHUNKFILE_POOL CHECK_S3 CLEAN_PRECHECK_ENVIRONMENT @@ -175,6 +176,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = checker.NewGetHostDate(curveadm, config.GetDC(i)) case CHECK_HOST_DATE: t, err = checker.NewCheckDate(curveadm, nil) + case CHECK_DISK_SIZE: + t, err = checker.NewCheckDiskSizeTask(curveadm, config.GetDC(i)) case CHECK_CHUNKFILE_POOL: t, err = checker.NewCheckChunkfilePoolTask(curveadm, config.GetDC(i)) case CHECK_S3: diff --git a/internal/storage/sql.go b/internal/storage/sql.go index 9a0c191fc..79ccef34a 100644 --- a/internal/storage/sql.go +++ b/internal/storage/sql.go @@ -38,7 +38,7 @@ package storage var ( - // tables (hosts/clusters/containers(service)/clients/playrgound/audit) + // tables (hosts/clusters/containers(service)/clients/playrgound/audit/disk/disks) CREATE_VERSION_TABLE = ` CREATE TABLE IF NOT EXISTS version ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -54,6 +54,28 @@ var ( lastmodified_time DATE NOT NULL ) ` + CREATE_DISKS_TABLE = ` + CREATE TABLE IF NOT EXISTS disks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + data TEXT NOT NULL, + lastmodified_time DATE NOT NULL + ) + ` + + CREATE_DISK_TABLE = ` + CREATE TABLE IF NOT EXISTS disk ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + host TEXT NOT NULL, + device TEXT NOT NULL, + size TEXT NOT NULL, + uri TEXT NOT NULL, + disk_format_mount_point TEXT NOT NULL, + format_percent TEXT NOT NULL, + container_image_location TEXT NOT NULL, + chunkserver_id TEXT NOT NULL, + lastmodified_time DATE NOT NULL + ) + ` CREATE_CLUSTERS_TABLE = ` CREATE TABLE IF NOT EXISTS clusters ( @@ -138,6 +160,52 @@ var ( SELECT_HOSTS = `SELECT * FROM hosts` + // disks + INSERT_DISKS = `INSERT INTO disks(data, lastmodified_time) VALUES(?, datetime('now','localtime'))` + + SET_DISKS = `UPDATE disks SET data = ?, lastmodified_time = datetime('now','localtime') WHERE id = ?` + + SELECT_DISKS = `SELECT * FROM disks` + + // disk + INSERT_DISK = `INSERT INTO disk( + host, + device, + size, + uri, + disk_format_mount_point, + format_percent, + container_image_location, + chunkserver_id, + lastmodified_time + ) VALUES(?, ?, ?, ?, ?, ?, ?, ?, datetime('now','localtime'))` + + SET_DISK = `UPDATE disk SET disk_format_mount_point = ?, format_percent = ?, + container_image_location = ?,lastmodified_time = datetime('now','localtime') WHERE id = ?` + + SET_DISK_URI = `UPDATE disk SET uri = ?, + lastmodified_time = datetime('now','localtime') WHERE host = ? AND device = ?` + + SET_DISK_SIZE = `UPDATE disk SET size = ?, + lastmodified_time = datetime('now','localtime') WHERE host = ? AND device = ?` + + SET_DISK_CHUNKSERVER_ID = `UPDATE disk SET chunkserver_id = ?, + lastmodified_time = datetime('now','localtime') WHERE host = ? AND disk_format_mount_point = ?` + + SELECT_DISK_ALL = `SELECT * FROM disk` + + SELECT_DISK_BY_HOST = `SELECT * FROM disk where host = ?` + + SELECT_DISK_BY_CHUNKSERVER_ID = `SELECT * FROM disk where chunkserver_id = ?` + + SELECT_DISK_BY_DEVICE_PATH = `SELECT * from disk WHERE host = ? AND device = ?` + + SELECT_DISK_BY_MOUNTPOINT = `SELECT * from disk WHERE host = ? AND disk_format_mount_point = ?` + + DELETE_DISK_HOST = `DELETE from disk WHERE host = ?` + + DELETE_DISK_HOST_DEVICE = `DELETE from disk WHERE host = ? AND device = ?` + // cluster INSERT_CLUSTER = `INSERT INTO clusters(uuid, name, description, topology, pool, create_time) VALUES(hex(randomblob(16)), ?, ?, ?, "", datetime('now','localtime'))` diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 3d4a9c440..4a9951478 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -29,6 +29,8 @@ import ( "time" _ "github.com/mattn/go-sqlite3" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/errno" ) type Version struct { @@ -43,6 +45,25 @@ type Hosts struct { LastmodifiedTime time.Time } +type Disks struct { + Id int + Data string + LastmodifiedTime time.Time +} + +type Disk struct { + Id int + Host string + Device string + Size string + URI string + MountPoint string + FormatPercent int + ContainerImage string + ChunkServerID string + LastmodifiedTime time.Time +} + type Cluster struct { Id int UUId string @@ -121,6 +142,10 @@ func (s *Storage) init() error { return err } else if err := s.execSQL(CREATE_AUDIT_TABLE); err != nil { return err + } else if err := s.execSQL(CREATE_DISKS_TABLE); err != nil { + return err + } else if err := s.execSQL(CREATE_DISK_TABLE); err != nil { + return err } else if err := s.compatible(); err != nil { return err } @@ -246,6 +271,154 @@ func (s *Storage) GetHostses() ([]Hosts, error) { return hostses, err } +// disks +func (s *Storage) SetDisks(data string) error { + diskses, err := s.GetDisks() + if err != nil { + return err + } else if len(diskses) == 0 { + return s.execSQL(INSERT_DISKS, data) + } + return s.execSQL(SET_DISKS, data, diskses[0].Id) +} + +func (s *Storage) GetDisks() ([]Disks, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + rows, err := s.db.Query(SELECT_DISKS) + if err != nil { + return nil, err + } + defer rows.Close() + var diskses []Disks + var disks Disks + for rows.Next() { + err = rows.Scan(&disks.Id, &disks.Data, &disks.LastmodifiedTime) + diskses = append(diskses, disks) + break + } + return diskses, err +} + +// disk +func (s *Storage) SetDisk(host, device, mount, containerImage string, formatPercent int) error { + diskRecords, err := s.GetDisk(SELECT_DISK_BY_DEVICE_PATH, host, device) + if err != nil { + return err + } else if len(diskRecords) == 0 { + return s.execSQL( + INSERT_DISK, + host, + device, + comm.DISK_DEFAULT_NULL_SIZE, + comm.DISK_DEFAULT_NULL_URI, + mount, + formatPercent, + containerImage, + comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID, + ) + } + return s.execSQL(SET_DISK, mount, formatPercent, containerImage, diskRecords[0].Id) +} + +func (s *Storage) UpdateDiskURI(host, device, devUri string) error { + return s.execSQL(SET_DISK_URI, devUri, host, device) +} + +func (s *Storage) UpdateDiskSize(host, device, size string) error { + return s.execSQL(SET_DISK_SIZE, size, host, device) +} + +func (s *Storage) UpdateDiskChunkServerID(host, mountPoint, chunkserverId string) error { + _, err := s.GetDiskByMountPoint(host, mountPoint) + if err != nil { + return err + } + return s.execSQL(SET_DISK_CHUNKSERVER_ID, chunkserverId, host, mountPoint) +} + +func (s *Storage) DeleteDisk(host, device string) error { + if len(device) > 0 { + return s.execSQL(DELETE_DISK_HOST_DEVICE, host, device) + } else { + return s.execSQL(DELETE_DISK_HOST, host) + } +} + +func (s *Storage) GetDiskByMountPoint(host, mountPoint string) (Disk, error) { + var disk Disk + diskRecords, err := s.GetDisk(comm.DISK_FILTER_MOUNT, host, mountPoint) + if len(diskRecords) == 0 { + return disk, errno.ERR_DATABASE_EMPTY_QUERY_RESULT. + F("The disk[host=%s, disk_format_mount_point=%s] was not found in database.", + host, mountPoint) + } + disk = diskRecords[0] + return disk, err +} + +func (s *Storage) CleanDiskChunkServerId(serviceId string) error { + diskRecords, err := s.GetDisk(comm.DISK_FILTER_SERVICE, serviceId) + if err != nil { + return err + } + + for _, disk := range diskRecords { + if err := s.UpdateDiskChunkServerID( + disk.Host, + disk.MountPoint, + comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID, + ); err != nil { + return err + } + } + return nil +} + +func (s *Storage) GetDisk(filter string, args ...interface{}) ([]Disk, error) { + var query string + switch filter { + case comm.DISK_FILTER_ALL: + query = SELECT_DISK_ALL + case comm.DISK_FILTER_HOST: + query = SELECT_DISK_BY_HOST + case comm.DISK_FILTER_DEVICE: + query = SELECT_DISK_BY_DEVICE_PATH + case comm.DISK_FILTER_MOUNT: + query = SELECT_DISK_BY_MOUNTPOINT + case comm.DISK_FILTER_SERVICE: + query = SELECT_DISK_BY_CHUNKSERVER_ID + default: + query = filter + } + + s.mutex.Lock() + defer s.mutex.Unlock() + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + + defer rows.Close() + var diskRecords []Disk + var disk Disk + + for rows.Next() { + err = rows.Scan(&disk.Id, + &disk.Host, + &disk.Device, + &disk.Size, + &disk.URI, + &disk.MountPoint, + &disk.FormatPercent, + &disk.ContainerImage, + &disk.ChunkServerID, + &disk.LastmodifiedTime) + diskRecords = append(diskRecords, disk) + } + return diskRecords, err +} + // cluster func (s *Storage) InsertCluster(name, description, topology string) error { return s.execSQL(INSERT_CLUSTER, name, description, topology) diff --git a/internal/task/task/bs/format.go b/internal/task/task/bs/format.go index c630bd5bd..4b132048b 100644 --- a/internal/task/task/bs/format.go +++ b/internal/task/task/bs/format.go @@ -30,6 +30,7 @@ import ( "github.com/opencurve/curveadm/cli/cli" "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/disks" os "github.com/opencurve/curveadm/internal/configure/os" "github.com/opencurve/curveadm/internal/configure/topology" "github.com/opencurve/curveadm/internal/errno" @@ -62,6 +63,13 @@ type ( skipAdd bool curveadm *cli.CurveAdm } + step2UpdateDiskSizeUri struct { + host string + device string + size string + diskId string + curveadm *cli.CurveAdm + } ) func skipFormat(containerId *string) step.LambdaType { @@ -163,6 +171,54 @@ func (s *step2EditFSTab) Execute(ctx *context.Context) error { }) } +func (s *step2UpdateDiskSizeUri) Execute(ctx *context.Context) error { + curveadm := s.curveadm + steps := []task.Step{} + + var success bool + var diskUri string + + steps = append(steps, &step.ListBlockDevice{ // disk device size + Device: []string{s.device}, + Format: "SIZE -b", + NoHeadings: true, + Success: &success, + Out: &s.size, + ExecOptions: curveadm.ExecOptions(), + }) + steps = append(steps, &step.ListBlockDevice{ // disk device uuid + Device: []string{s.device}, + Format: "UUID", + NoHeadings: true, + Success: &success, + Out: &s.diskId, + ExecOptions: curveadm.ExecOptions(), + }) + steps = append(steps, &step.Lambda{ + Lambda: checkDeviceUUID(s.host, s.device, &success, &s.diskId), + }) + + for _, step := range steps { + err := step.Execute(ctx) + if err != nil { + return err + } + } + + if s.diskId != "" { + diskUri = disks.GenDiskURI(disks.DISK_URI_PROTO_FS_UUID, s.diskId) + } + + if err := curveadm.Storage().UpdateDiskSize(s.host, s.device, s.size); err != nil { + return err + } + + if err := curveadm.Storage().UpdateDiskURI(s.host, s.device, diskUri); err != nil { + return err + } + return nil +} + func device2ContainerName(device string) string { return fmt.Sprintf("curvebs-format-%s", utils.MD5Sum(device)) } @@ -238,6 +294,13 @@ func NewFormatChunkfilePoolTask(curveadm *cli.CurveAdm, fc *configure.FormatConf mountPoint: mountPoint, curveadm: curveadm, }) + if fc.UseDiskUri { + t.AddStep(&step2UpdateDiskSizeUri{ + host: host, + device: device, + curveadm: curveadm, + }) + } // 3: run container to format chunkfile pool t.AddStep(&step.PullImage{ Image: fc.GetContainerImage(), diff --git a/internal/task/task/checker/service.go b/internal/task/task/checker/service.go index f762f1942..a4c642765 100644 --- a/internal/task/task/checker/service.go +++ b/internal/task/task/checker/service.go @@ -53,6 +53,12 @@ type ( step2CheckClientS3Configure struct { config *configure.ClientConfig } + + step2CheckDiskSize struct { + dc *topology.DeployConfig + curveadm *cli.CurveAdm + execOptions module.ExecOptions + } ) func (s *step2CheckChunkfilePool) Execute(ctx *context.Context) error { @@ -131,6 +137,24 @@ func (s *step2CheckClientS3Configure) Execute(ctx *context.Context) error { return nil } +func (s *step2CheckDiskSize) Execute(ctx *context.Context) error { + dc := s.dc + curveadm := s.curveadm + host := dc.GetHost() + dataDir := dc.GetDataDir() + + disk, err := curveadm.Storage().GetDiskByMountPoint(host, dataDir) + if err != nil { + return err + } + if disk.Size == comm.DISK_DEFAULT_NULL_SIZE { + return errno.ERR_DISK_DEVICE_NOT_FORMATTED + } + + return nil + +} + func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { hc, err := curveadm.GetHost(dc.GetHost()) if err != nil { @@ -148,6 +172,19 @@ func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig return t, nil } +func NewCheckDiskSizeTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) + t := task.NewTask("Check Disk Size ", subname, nil) + + t.AddStep(&step2CheckDiskSize{ + dc: dc, + curveadm: curveadm, + execOptions: curveadm.ExecOptions(), + }) + + return t, nil +} + func NewCheckS3Task(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) t := task.NewTask("Check S3", subname, nil) diff --git a/internal/task/task/common/clean_service.go b/internal/task/task/common/clean_service.go index 47d031112..93bc8b122 100644 --- a/internal/task/task/common/clean_service.go +++ b/internal/task/task/common/clean_service.go @@ -68,6 +68,12 @@ type ( storage *storage.Storage execOptions module.ExecOptions } + + step2CleanDiskChunkServerId struct { + serviceId string + storage *storage.Storage + execOptions module.ExecOptions + } ) func (s *step2RecycleChunk) Execute(ctx *context.Context) error { @@ -112,6 +118,10 @@ func (s *step2CleanContainer) Execute(ctx *context.Context) error { return s.storage.SetContainId(s.serviceId, comm.CLEANED_CONTAINER_ID) } +func (s *step2CleanDiskChunkServerId) Execute(ctx *context.Context) error { + return s.storage.CleanDiskChunkServerId(s.serviceId) +} + func getCleanFiles(clean map[string]bool, dc *topology.DeployConfig, recycle bool) []string { files := []string{} for item := range clean { @@ -185,6 +195,11 @@ func NewCleanServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*ta execOptions: curveadm.ExecOptions(), }) } + t.AddStep(&step2CleanDiskChunkServerId{ + serviceId: serviceId, + storage: curveadm.Storage(), + execOptions: curveadm.ExecOptions(), + }) return t, nil } diff --git a/internal/task/task/common/create_container.go b/internal/task/task/common/create_container.go index 0f7bf6f81..266bb7b3d 100644 --- a/internal/task/task/common/create_container.go +++ b/internal/task/task/common/create_container.go @@ -226,6 +226,17 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( hostname := fmt.Sprintf("%s-%s-%s", kind, role, serviceId) options := curveadm.ExecOptions() options.ExecWithSudo = false + host := dc.GetHost() + dataDir := dc.GetDataDir() + diskRecords := curveadm.DiskRecords() + + useDiskRecords := role == topology.ROLE_CHUNKSERVER && len(diskRecords) > 0 + if useDiskRecords { + if err := curveadm.Storage().UpdateDiskChunkServerID( + host, dataDir, serviceId); err != nil { + return t, err + } + } t.AddStep(&step2GetService{ // if service exist, break task serviceId: serviceId, diff --git a/internal/tui/disks.go b/internal/tui/disks.go new file mode 100644 index 000000000..f8815ec93 --- /dev/null +++ b/internal/tui/disks.go @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-02-24 + * Author: Lijin Xiong (lijin.xiong@zstack.io) + */ + +package tui + +import ( + "sort" + + "github.com/opencurve/curveadm/internal/storage" + tuicommon "github.com/opencurve/curveadm/internal/tui/common" +) + +func SortDiskRecords(diskRecords []storage.Disk) { + sort.Slice(diskRecords, func(i, j int) bool { + d1, d2 := diskRecords[i], diskRecords[j] + if d1.Host == d2.Host { + return d1.Device < d2.Device + } + return d1.Host < d2.Host + }) +} + +func FormatDisks(diskRecords []storage.Disk) string { + lines := [][]interface{}{} + title := []string{ + "Host", + "Device Path", + "Device Size", + "Device URI", + "Disk Format Mount Point", + "Service ID", + } + first, second := tuicommon.FormatTitle(title) + lines = append(lines, first) + lines = append(lines, second) + + SortDiskRecords(diskRecords) + for _, dr := range diskRecords { + lines = append(lines, []interface{}{ + dr.Host, + dr.Device, + dr.Size, + dr.URI, + dr.MountPoint, + dr.ChunkServerID, + }) + } + + return tuicommon.FixedFormat(lines, 2) +} diff --git a/internal/utils/common.go b/internal/utils/common.go index 056a649a8..ec310a615 100644 --- a/internal/utils/common.go +++ b/internal/utils/common.go @@ -72,6 +72,8 @@ func Type(v interface{}) string { return "int64" case map[string]interface{}: return "string_interface_map" + case []interface{}: + return "any_slice" default: return "unknown" } @@ -97,6 +99,10 @@ func IsStringAnyMap(v interface{}) bool { return Type(v) == "string_interface_map" } +func IsAnySlice(v interface{}) bool { + return Type(v) == "any_slice" +} + func IsFunc(v interface{}) bool { return reflect.TypeOf(v).Kind() == reflect.Func } @@ -208,9 +214,9 @@ func NewCommand(format string, a ...interface{}) *exec.Cmd { return exec.Command(args[0], args[1:]...) } -func Slice2Map(s []string) map[string]bool { - m := map[string]bool{} - for _, item := range s { +func Slice2Map[T comparable](t []T) map[T]bool { + m := map[T]bool{} + for _, item := range t { m[item] = true } return m