diff --git a/CHANGELOG.md b/CHANGELOG.md index 66df5e684..960440441 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - (Feature) UpgradeByReplace Flow - (Feature) (Platform) ArangoRoute Timeout option - (Feature) Delay Action +- (Feature) MigrateMember Action ## [1.2.44](https://github.com/arangodb/kube-arangodb/tree/1.2.44) (2025-02-03) - (Maintenance) Kubernetes 1.31.1 libraries diff --git a/README.md b/README.md index 5b202423e..b56080aab 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ Flags: --deployment.feature.local-volume-replacement-check Replace volume for local-storage if volume is unschedulable (ex. node is gone) - Required ArangoDB >= 3.8.0 --deployment.feature.random-pod-names Enables generating random pod names - Required ArangoDB >= 3.8.0 --deployment.feature.rebalancer-v2 Rebalancer V2 feature - Required ArangoDB >= 3.10.0 + --deployment.feature.replace-migration During member replacement shards are migrated directly to the new server - Required ArangoDB >= 3.8.0 (default true) --deployment.feature.restart-policy-always Allow to restart containers with always restart policy - Required ArangoDB >= 3.8.0 --deployment.feature.secured-containers Create server's containers with non root privileges. It enables 'ephemeral-volumes' feature implicitly - Required ArangoDB >= 3.8.0 --deployment.feature.sensitive-information-protection Hide sensitive information from metrics and logs - Required ArangoDB >= 3.8.0 diff --git a/docs/cli/arangodb_operator.md b/docs/cli/arangodb_operator.md index 42fefbb9c..24d73125d 100644 --- a/docs/cli/arangodb_operator.md +++ b/docs/cli/arangodb_operator.md @@ -57,6 +57,7 @@ Flags: --deployment.feature.local-volume-replacement-check Replace volume for local-storage if volume is unschedulable (ex. node is gone) - Required ArangoDB >= 3.8.0 --deployment.feature.random-pod-names Enables generating random pod names - Required ArangoDB >= 3.8.0 --deployment.feature.rebalancer-v2 Rebalancer V2 feature - Required ArangoDB >= 3.10.0 + --deployment.feature.replace-migration During member replacement shards are migrated directly to the new server - Required ArangoDB >= 3.8.0 (default true) --deployment.feature.restart-policy-always Allow to restart containers with always restart policy - Required ArangoDB >= 3.8.0 --deployment.feature.secured-containers Create server's containers with non root privileges. It enables 'ephemeral-volumes' feature implicitly - Required ArangoDB >= 3.8.0 --deployment.feature.sensitive-information-protection Hide sensitive information from metrics and logs - Required ArangoDB >= 3.8.0 diff --git a/docs/generated/actions.md b/docs/generated/actions.md index 2b0d3c386..610ca5a0a 100644 --- a/docs/generated/actions.md +++ b/docs/generated/actions.md @@ -51,6 +51,7 @@ nav_order: 11 | MemberPhaseUpdate | no | 10m0s | no | Community & Enterprise | Change member phase | | ~~MemberRIDUpdate~~ | no | 10m0s | no | Community & Enterprise | Update Run ID of member | | MemberStatusSync | no | 10m0s | no | Community & Enterprise | Sync ArangoMember Status with ArangoDeployment Status, to keep Member information up to date | +| MigrateMember | no | 48h0m0s | yes | Community & Enterprise | Run the data movement actions on the member (migration) | | PVCResize | no | 30m0s | no | Community & Enterprise | Start the resize procedure. Updates PVC Requests field | | PVCResized | no | 15m0s | no | Community & Enterprise | Waits for PVC resize to be completed | | PlaceHolder | no | 10m0s | no | Community & Enterprise | Empty placeholder action | @@ -148,6 +149,7 @@ spec: MemberPhaseUpdate: 10m0s MemberRIDUpdate: 10m0s MemberStatusSync: 10m0s + MigrateMember: 48h0m0s PVCResize: 30m0s PVCResized: 15m0s PlaceHolder: 10m0s diff --git a/internal/actions.yaml b/internal/actions.yaml index 5987c48ae..39d955ad3 100644 --- a/internal/actions.yaml +++ b/internal/actions.yaml @@ -27,6 +27,10 @@ actions: CleanOutMember: description: Run the CleanOut job on member timeout: 48h + MigrateMember: + description: Run the data movement actions on the member (migration) + timeout: 48h + optional: true ShutdownMember: description: Sends Shutdown requests and waits for container to be stopped timeout: 30m diff --git a/pkg/apis/deployment/v1/actions.generated.go b/pkg/apis/deployment/v1/actions.generated.go index 9657c144a..7a80dca1e 100644 --- a/pkg/apis/deployment/v1/actions.generated.go +++ b/pkg/apis/deployment/v1/actions.generated.go @@ -143,6 +143,9 @@ const ( // ActionMemberStatusSyncDefaultTimeout define default timeout for action ActionMemberStatusSync ActionMemberStatusSyncDefaultTimeout time.Duration = ActionsDefaultTimeout + // ActionMigrateMemberDefaultTimeout define default timeout for action ActionMigrateMember + ActionMigrateMemberDefaultTimeout time.Duration = 172800 * time.Second // 48h0m0s + // ActionPVCResizeDefaultTimeout define default timeout for action ActionPVCResize ActionPVCResizeDefaultTimeout time.Duration = 1800 * time.Second // 30m0s @@ -409,6 +412,9 @@ const ( // ActionTypeMemberStatusSync in scopes High. Sync ArangoMember Status with ArangoDeployment Status, to keep Member information up to date ActionTypeMemberStatusSync ActionType = "MemberStatusSync" + // ActionTypeMigrateMember in scopes Normal. Run the data movement actions on the member (migration) + ActionTypeMigrateMember ActionType = "MigrateMember" + // ActionTypePVCResize in scopes Normal. Start the resize procedure. Updates PVC Requests field ActionTypePVCResize ActionType = "PVCResize" @@ -635,6 +641,8 @@ func (a ActionType) DefaultTimeout() time.Duration { return ActionMemberRIDUpdateDefaultTimeout case ActionTypeMemberStatusSync: return ActionMemberStatusSyncDefaultTimeout + case ActionTypeMigrateMember: + return ActionMigrateMemberDefaultTimeout case ActionTypePVCResize: return ActionPVCResizeDefaultTimeout case ActionTypePVCResized: @@ -815,6 +823,8 @@ func (a ActionType) Priority() ActionPriority { return ActionPriorityHigh case ActionTypeMemberStatusSync: return ActionPriorityHigh + case ActionTypeMigrateMember: + return ActionPriorityNormal case ActionTypePVCResize: return ActionPriorityNormal case ActionTypePVCResized: @@ -1007,6 +1017,8 @@ func (a ActionType) Optional() bool { return false case ActionTypeMemberStatusSync: return false + case ActionTypeMigrateMember: + return true case ActionTypePVCResize: return false case ActionTypePVCResized: diff --git a/pkg/apis/deployment/v2alpha1/actions.generated.go b/pkg/apis/deployment/v2alpha1/actions.generated.go index d2e3928e7..3382fa708 100644 --- a/pkg/apis/deployment/v2alpha1/actions.generated.go +++ b/pkg/apis/deployment/v2alpha1/actions.generated.go @@ -143,6 +143,9 @@ const ( // ActionMemberStatusSyncDefaultTimeout define default timeout for action ActionMemberStatusSync ActionMemberStatusSyncDefaultTimeout time.Duration = ActionsDefaultTimeout + // ActionMigrateMemberDefaultTimeout define default timeout for action ActionMigrateMember + ActionMigrateMemberDefaultTimeout time.Duration = 172800 * time.Second // 48h0m0s + // ActionPVCResizeDefaultTimeout define default timeout for action ActionPVCResize ActionPVCResizeDefaultTimeout time.Duration = 1800 * time.Second // 30m0s @@ -409,6 +412,9 @@ const ( // ActionTypeMemberStatusSync in scopes High. Sync ArangoMember Status with ArangoDeployment Status, to keep Member information up to date ActionTypeMemberStatusSync ActionType = "MemberStatusSync" + // ActionTypeMigrateMember in scopes Normal. Run the data movement actions on the member (migration) + ActionTypeMigrateMember ActionType = "MigrateMember" + // ActionTypePVCResize in scopes Normal. Start the resize procedure. Updates PVC Requests field ActionTypePVCResize ActionType = "PVCResize" @@ -635,6 +641,8 @@ func (a ActionType) DefaultTimeout() time.Duration { return ActionMemberRIDUpdateDefaultTimeout case ActionTypeMemberStatusSync: return ActionMemberStatusSyncDefaultTimeout + case ActionTypeMigrateMember: + return ActionMigrateMemberDefaultTimeout case ActionTypePVCResize: return ActionPVCResizeDefaultTimeout case ActionTypePVCResized: @@ -815,6 +823,8 @@ func (a ActionType) Priority() ActionPriority { return ActionPriorityHigh case ActionTypeMemberStatusSync: return ActionPriorityHigh + case ActionTypeMigrateMember: + return ActionPriorityNormal case ActionTypePVCResize: return ActionPriorityNormal case ActionTypePVCResized: @@ -1007,6 +1017,8 @@ func (a ActionType) Optional() bool { return false case ActionTypeMemberStatusSync: return false + case ActionTypeMigrateMember: + return true case ActionTypePVCResize: return false case ActionTypePVCResized: diff --git a/pkg/deployment/agency/definitions.go b/pkg/deployment/agency/definitions.go index 42b9122c6..2c14f828f 100644 --- a/pkg/deployment/agency/definitions.go +++ b/pkg/deployment/agency/definitions.go @@ -47,6 +47,7 @@ const ( SupervisionKey = "Supervision" SupervisionMaintenanceKey = "Maintenance" + SupervisionHealthKey = "Health" TargetJobToDoKey = "ToDo" TargetJobPendingKey = "Pending" @@ -74,6 +75,7 @@ func GetAgencyReadRequest(elements ...[]string) ReadRequest { func GetAgencyReadRequestFields() ReadRequest { return GetAgencyReadRequest([]string{ GetAgencyKey(ArangoKey, SupervisionKey, SupervisionMaintenanceKey), + GetAgencyKey(ArangoKey, SupervisionKey, SupervisionHealthKey), GetAgencyKey(ArangoKey, PlanKey, PlanCollectionsKey), GetAgencyKey(ArangoKey, PlanKey, PlanDatabasesKey), GetAgencyKey(ArangoKey, PlanKey, PlanDBServersKey), diff --git a/pkg/deployment/agency/state/state.go b/pkg/deployment/agency/state/state.go index d064870d3..0e2e378b8 100644 --- a/pkg/deployment/agency/state/state.go +++ b/pkg/deployment/agency/state/state.go @@ -61,10 +61,6 @@ type Plan struct { Coordinators ServerMap[string] `json:"Coordinators,omitempty"` } -type Supervision struct { - Maintenance Timestamp `json:"Maintenance,omitempty"` -} - type ShardCountDetails struct { Leader, Follower int } diff --git a/pkg/deployment/agency/state/supervision.go b/pkg/deployment/agency/state/supervision.go new file mode 100644 index 000000000..965e28095 --- /dev/null +++ b/pkg/deployment/agency/state/supervision.go @@ -0,0 +1,48 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package state + +type Supervision struct { + Maintenance Timestamp `json:"Maintenance,omitempty"` + + Health ServerMap[SupervisionHealthServer] `json:"Health,omitempty"` +} + +type SupervisionHealthServerStatus string + +const ( + SupervisionHealthServerStatusGood SupervisionHealthServerStatus = "GOOD" +) + +type SupervisionHealthServerSyncStatus string + +const ( + SupervisionHealthServerSyncStatusServing SupervisionHealthServerSyncStatus = "SERVING" +) + +type SupervisionHealthServer struct { + Status SupervisionHealthServerStatus `json:"Status,omitempty"` + SyncStatus SupervisionHealthServerSyncStatus `json:"SyncStatus,omitempty"` +} + +func (s SupervisionHealthServer) IsHealthy() bool { + return s.Status == SupervisionHealthServerStatusGood && s.SyncStatus == SupervisionHealthServerSyncStatusServing +} diff --git a/pkg/deployment/client/rebalance.execute.go b/pkg/deployment/client/rebalance.execute.go new file mode 100644 index 000000000..f512d71f2 --- /dev/null +++ b/pkg/deployment/client/rebalance.execute.go @@ -0,0 +1,83 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package client + +import ( + "context" + "net/http" + + "github.com/arangodb/kube-arangodb/pkg/util" +) + +type RebalanceExecuteRequest struct { + Version int `json:"version"` + + Moves RebalanceExecuteRequestMoves `json:"moves,omitempty"` +} + +type RebalanceExecuteRequestMoves []RebalanceExecuteRequestMove + +type RebalanceExecuteRequestMove struct { + Database string `json:"database"` + Collection string `json:"collection"` + Shard string `json:"shard"` + + From string `json:"from"` + To string `json:"to"` + + IsLeader bool `json:"isLeader"` +} + +func (c *client) RebalanceExecuteMoves(ctx context.Context, moves ...RebalanceExecuteRequestMove) error { + return c.RebalanceExecute(ctx, &RebalanceExecuteRequest{ + Version: 1, + Moves: moves, + }) +} + +func (c *client) RebalanceExecute(ctx context.Context, request *RebalanceExecuteRequest) error { + req, err := c.c.NewRequest(http.MethodPost, "/_admin/cluster/rebalance/execute") + if err != nil { + return err + } + + request = util.InitType(request) + + // Always set to 1 + request.Version = 1 + + if r, err := req.SetBody(request); err != nil { + return err + } else { + req = r + } + + resp, err := c.c.Do(ctx, req) + if err != nil { + return err + } + + if err := resp.CheckStatus(http.StatusAccepted); err != nil { + return err + } + + return nil +} diff --git a/pkg/deployment/client/rebalance.get.go b/pkg/deployment/client/rebalance.get.go new file mode 100644 index 000000000..30e192a2d --- /dev/null +++ b/pkg/deployment/client/rebalance.get.go @@ -0,0 +1,59 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package client + +import ( + "context" + "net/http" +) + +type RebalanceGetResponse struct { + Result RebalanceGetResponseResult `json:"result,omitempty"` +} + +type RebalanceGetResponseResult struct { + PendingMoveShards int `json:"pendingMoveShards"` + TodoMoveShards int `json:"todoMoveShards"` +} + +func (c *client) RebalanceGet(ctx context.Context) (RebalanceGetResponse, error) { + req, err := c.c.NewRequest(http.MethodGet, "/_admin/cluster/rebalance") + if err != nil { + return RebalanceGetResponse{}, err + } + + resp, err := c.c.Do(ctx, req) + if err != nil { + return RebalanceGetResponse{}, err + } + + if err := resp.CheckStatus(http.StatusOK); err != nil { + return RebalanceGetResponse{}, err + } + + var d RebalanceGetResponse + + if err := resp.ParseBody("", &d); err != nil { + return RebalanceGetResponse{}, err + } + + return d, nil +} diff --git a/pkg/deployment/client/rebalance.go b/pkg/deployment/client/rebalance.go index 48c4861f5..a289e45ea 100644 --- a/pkg/deployment/client/rebalance.go +++ b/pkg/deployment/client/rebalance.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2023-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,74 +22,12 @@ package client import ( "context" - "net/http" - - "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/arangodb/kube-arangodb/pkg/util" ) type RebalanceClient interface { - GenerateRebalanceMoves(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error) -} - -type RebalancePlanRequest struct { - Version int `json:"version"` - MaximumNumberOfMoves *int `json:"maximumNumberOfMoves,omitempty"` - LeaderChanges *bool `json:"leaderChanges,omitempty"` - MoveLeaders *bool `json:"moveLeaders,omitempty"` - MoveFollowers *bool `json:"moveFollowers,omitempty"` -} - -type RebalancePlanResponse struct { - Result RebalancePlanResponseResult `json:"result"` -} - -type RebalancePlanResponseResult struct { - Moves RebalancePlanMoves `json:"moves"` -} - -type RebalancePlanMoves []RebalancePlanMove - -type RebalancePlanMove struct { - From string `json:"from"` - To string `json:"to"` - Shard string `json:"shard"` - - Collection intstr.IntOrString `json:"collection"` -} - -func (c *client) GenerateRebalanceMoves(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error) { - req, err := c.c.NewRequest(http.MethodPost, "/_admin/cluster/rebalance") - if err != nil { - return RebalancePlanResponse{}, err - } - - request = util.InitType(request) - - // Always set to 1 - request.Version = 1 - - if r, err := req.SetBody(request); err != nil { - return RebalancePlanResponse{}, err - } else { - req = r - } - - resp, err := c.c.Do(ctx, req) - if err != nil { - return RebalancePlanResponse{}, err - } - - if err := resp.CheckStatus(http.StatusOK); err != nil { - return RebalancePlanResponse{}, err - } - - var d RebalancePlanResponse - - if err := resp.ParseBody("", &d); err != nil { - return RebalancePlanResponse{}, err - } + RebalancePlan(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error) + RebalanceGet(ctx context.Context) (RebalanceGetResponse, error) - return d, nil + RebalanceExecute(ctx context.Context, request *RebalanceExecuteRequest) error + RebalanceExecuteMoves(ctx context.Context, moves ...RebalanceExecuteRequestMove) error } diff --git a/pkg/deployment/client/rebalance.plan.go b/pkg/deployment/client/rebalance.plan.go new file mode 100644 index 000000000..5967846c0 --- /dev/null +++ b/pkg/deployment/client/rebalance.plan.go @@ -0,0 +1,91 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package client + +import ( + "context" + "net/http" + + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/arangodb/kube-arangodb/pkg/util" +) + +type RebalancePlanRequest struct { + Version int `json:"version"` + MaximumNumberOfMoves *int `json:"maximumNumberOfMoves,omitempty"` + LeaderChanges *bool `json:"leaderChanges,omitempty"` + MoveLeaders *bool `json:"moveLeaders,omitempty"` + MoveFollowers *bool `json:"moveFollowers,omitempty"` +} + +type RebalancePlanResponse struct { + Result RebalancePlanResponseResult `json:"result"` +} + +type RebalancePlanResponseResult struct { + Moves RebalancePlanMoves `json:"moves"` +} + +type RebalancePlanMoves []RebalancePlanMove + +type RebalancePlanMove struct { + From string `json:"from"` + To string `json:"to"` + Shard string `json:"shard"` + + Collection intstr.IntOrString `json:"collection"` +} + +func (c *client) RebalancePlan(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error) { + req, err := c.c.NewRequest(http.MethodPost, "/_admin/cluster/rebalance") + if err != nil { + return RebalancePlanResponse{}, err + } + + request = util.InitType(request) + + // Always set to 1 + request.Version = 1 + + if r, err := req.SetBody(request); err != nil { + return RebalancePlanResponse{}, err + } else { + req = r + } + + resp, err := c.c.Do(ctx, req) + if err != nil { + return RebalancePlanResponse{}, err + } + + if err := resp.CheckStatus(http.StatusOK); err != nil { + return RebalancePlanResponse{}, err + } + + var d RebalancePlanResponse + + if err := resp.ParseBody("", &d); err != nil { + return RebalancePlanResponse{}, err + } + + return d, nil +} diff --git a/pkg/deployment/features/resign_leadership.go b/pkg/deployment/features/resign_leadership.go index 8e44bfb04..b44d4b845 100644 --- a/pkg/deployment/features/resign_leadership.go +++ b/pkg/deployment/features/resign_leadership.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany +// Copyright 2023-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ package features func init() { registerFeature(enforcedResignLeadership) + registerFeature(memberReplaceMigration) } var enforcedResignLeadership = &feature{ @@ -31,7 +32,19 @@ var enforcedResignLeadership = &feature{ enabledByDefault: true, } +var memberReplaceMigration = &feature{ + name: "replace-migration", + description: "During member replacement shards are migrated directly to the new server", + enterpriseRequired: false, + enabledByDefault: true, +} + // EnforcedResignLeadership returns enforced ResignLeadership. func EnforcedResignLeadership() Feature { return enforcedResignLeadership } + +// MemberReplaceMigration returns enforced MemberReplaceMigration. +func MemberReplaceMigration() Feature { + return memberReplaceMigration +} diff --git a/pkg/deployment/reconcile/action.register.generated.go b/pkg/deployment/reconcile/action.register.generated.go index aa379fd94..8e4213231 100644 --- a/pkg/deployment/reconcile/action.register.generated.go +++ b/pkg/deployment/reconcile/action.register.generated.go @@ -135,6 +135,9 @@ var ( _ Action = &actionMemberStatusSync{} _ actionFactory = newMemberStatusSyncAction + _ Action = &actionMigrateMember{} + _ actionFactory = newMigrateMemberAction + _ Action = &actionPVCResize{} _ actionFactory = newPVCResizeAction @@ -817,6 +820,20 @@ func init() { registerAction(action, function) } + // MigrateMember + { + // Get Action type + action := api.ActionTypeMigrateMember + + // Get Action defition + function := newMigrateMemberAction + + // Wrap action main function + + // Register action + registerAction(action, function) + } + // PVCResize { // Get Action type diff --git a/pkg/deployment/reconcile/action.register.generated_test.go b/pkg/deployment/reconcile/action.register.generated_test.go index 87fd6e315..96522e5ce 100644 --- a/pkg/deployment/reconcile/action.register.generated_test.go +++ b/pkg/deployment/reconcile/action.register.generated_test.go @@ -429,6 +429,16 @@ func Test_Actions(t *testing.T) { }) }) + t.Run("MigrateMember", func(t *testing.T) { + ActionsExistence(t, api.ActionTypeMigrateMember) + t.Run("Internal", func(t *testing.T) { + require.False(t, api.ActionTypeMigrateMember.Internal()) + }) + t.Run("Optional", func(t *testing.T) { + require.True(t, api.ActionTypeMigrateMember.Optional()) + }) + }) + t.Run("PVCResize", func(t *testing.T) { ActionsExistence(t, api.ActionTypePVCResize) t.Run("Internal", func(t *testing.T) { diff --git a/pkg/deployment/reconcile/action_add_member.go b/pkg/deployment/reconcile/action_add_member.go index a90a19fb8..3ba0d092c 100644 --- a/pkg/deployment/reconcile/action_add_member.go +++ b/pkg/deployment/reconcile/action_add_member.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -68,5 +68,5 @@ func (a *actionAddMember) Start(ctx context.Context) (bool, error) { // ActionPlanAppender appends wait methods to the plan func (a *actionAddMember) ActionPlanAppender(current api.Plan) (api.Plan, bool) { - return withWaitForMember(current, a.action.Group, sharedReconcile.WithPredefinedMember(a.action.MemberID)), true + return withWaitForMember(current, a.action.Group, sharedReconcile.WithPredefinedMember(a.newMemberID)), true } diff --git a/pkg/deployment/reconcile/action_migrate_member.go b/pkg/deployment/reconcile/action_migrate_member.go new file mode 100644 index 000000000..a44776377 --- /dev/null +++ b/pkg/deployment/reconcile/action_migrate_member.go @@ -0,0 +1,209 @@ +// +// DISCLAIMER +// +// Copyright 2016-2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package reconcile + +import ( + "context" + "fmt" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/deployment/agency/state" + "github.com/arangodb/kube-arangodb/pkg/deployment/client" + "github.com/arangodb/kube-arangodb/pkg/deployment/features" +) + +const ( + actionMigrateMemberSourceKey string = "source-member-id" + actionMigrateMemberBatchSize int = 64 +) + +// newMigrateMemberAction creates a new Action that implements the given +// planned MigrateMember action. +func newMigrateMemberAction(action api.Action, actionCtx ActionContext) Action { + a := &actionMigrateMember{} + + a.actionImpl = newActionImplDefRef(action, actionCtx) + + return a +} + +// actionMigrateMember implements an MigrateMemberAction. +type actionMigrateMember struct { + // actionImpl implement timeout and member id functions + actionImpl +} + +// Start performs the start of the action.git +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionMigrateMember) Start(ctx context.Context) (bool, error) { + ready, _, err := a.CheckProgress(ctx) + return ready, err +} + +// CheckProgress checks the progress of the action. +// Returns: ready, abort, error. +func (a *actionMigrateMember) CheckProgress(ctx context.Context) (bool, bool, error) { + if !features.MemberReplaceMigration().Enabled() { + // Feature disabled + return true, false, nil + } + + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + a.log.Warn("Unable to find MemberID %s", a.action.MemberID) + return true, false, nil + } + + sourceMemberID, ok := a.action.GetParam(actionMigrateMemberSourceKey) + if !ok { + a.log.Warn("Unable to find action's param %s", actionMigrateMemberSourceKey) + return true, false, nil + } + + sourceMember, ok := a.actionCtx.GetMemberStatusByID(sourceMemberID) + if !ok { + a.log.Warn("Unable to find member %s", sourceMemberID) + return true, false, nil + } + + if a.action.Group != api.ServerGroupDBServers { + // Proceed only on DBServers + a.log.Warn("Member %s is not DBServer", a.action.MemberID) + return true, false, nil + } + + cache, ok := a.actionCtx.GetAgencyCache() + if !ok { + a.log.Debug("AgencyCache is not ready") + return false, false, nil + } + + if cache.Supervision.Maintenance.Exists() { + // We are done, action cannot be handled on maintenance mode + a.log.Warn("Maintenance is enabled, skipping action") + return true, false, nil + } + + if !cache.Plan.DBServers.Exists(state.Server(sourceMember.ID)) { + a.log.JSON("databases", cache.Plan.DBServers).Str("id", sourceMember.ID).Debug("Source DBServer not yet present") + return true, false, nil + } + + if !cache.Plan.DBServers.Exists(state.Server(m.ID)) { + a.log.JSON("databases", cache.Plan.DBServers).Str("id", m.ID).Debug("Destination DBServer not yet present") + return false, false, nil + } + + stats := cache.PlanServerUsage(state.Server(sourceMember.ID)) + + if stats.Count() == 0 { + a.log.Debug("DBServer not in use anymore") + // Server not in use anymore + return true, false, nil + } + + c, err := a.actionCtx.GetMembersState().State().GetDatabaseClient() + if err != nil { + a.log.Err(err).Error("Unable to get client") + return false, false, nil + } + + clusterClient := client.NewClient(c.Connection(), a.log) + + resp, err := clusterClient.RebalanceGet(ctx) + if err != nil { + a.log.Err(err).Error("Unable to get rebalance status") + return false, false, nil + } + + if resp.Result.PendingMoveShards != 0 || resp.Result.TodoMoveShards != 0 { + a.actionCtx.SetProgress(fmt.Sprintf("Currently rebalance in progress, Shards: %d. Jobs: %d", stats.Count(), resp.Result.PendingMoveShards+resp.Result.TodoMoveShards)) + return false, false, nil + } + + var moves = make(client.RebalanceExecuteRequestMoves, 0, actionMigrateMemberBatchSize) + + for db, collections := range cache.Plan.Collections { + if len(moves) >= actionMigrateMemberBatchSize { + break + } + for collection, details := range collections { + if len(moves) >= actionMigrateMemberBatchSize { + break + } + + if details.DistributeShardsLike != nil { + continue + } + + if details.ReplicationFactor.IsSatellite() { + continue + } + + for shard, servers := range details.Shards { + if len(moves) >= actionMigrateMemberBatchSize { + break + } + + if len(servers) == 0 { + continue + } + + if !servers.Contains(state.Server(sourceMember.ID)) { + continue + } + + if servers.Contains(state.Server(m.ID)) { + continue + } + + a.log.Str("db", db). + Str("collection", collection). + Str("shard", shard). + Str("from", sourceMember.ID). + Str("to", m.ID). + Bool("leader", string(servers[0]) == sourceMember.ID).Debug("Migrating shard") + + moves = append(moves, client.RebalanceExecuteRequestMove{ + Database: db, + Collection: collection, + Shard: shard, + From: sourceMember.ID, + To: m.ID, + IsLeader: string(servers[0]) == sourceMember.ID, + }) + } + } + } + + if len(moves) > 0 { + if err := clusterClient.RebalanceExecuteMoves(ctx, moves...); err != nil { + a.log.Err(err).Error("Unable to execute rebalance status") + return false, false, nil + } + + return false, false, nil + } + + // Cleanout completed + return true, false, nil +} diff --git a/pkg/deployment/reconcile/action_rebalancer_generate_v2.go b/pkg/deployment/reconcile/action_rebalancer_generate_v2.go index 519ee7927..b6a162afc 100644 --- a/pkg/deployment/reconcile/action_rebalancer_generate_v2.go +++ b/pkg/deployment/reconcile/action_rebalancer_generate_v2.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -76,7 +76,7 @@ func (r actionRebalancerGenerateV2) Start(ctx context.Context) (bool, error) { nctx, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() - resp, err := client.NewClient(c.Connection(), r.log).GenerateRebalanceMoves(nctx, &client.RebalancePlanRequest{ + resp, err := client.NewClient(c.Connection(), r.log).RebalancePlan(nctx, &client.RebalancePlanRequest{ MaximumNumberOfMoves: util.NewType(spec.Rebalancer.GetParallelMoves()), }) if err != nil { diff --git a/pkg/deployment/reconcile/action_wait_for_member_ready.go b/pkg/deployment/reconcile/action_wait_for_member_ready.go index f0f2db4e7..83a4da44a 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_ready.go +++ b/pkg/deployment/reconcile/action_wait_for_member_ready.go @@ -80,11 +80,28 @@ func (a *actionWaitForMemberReady) CheckProgress(ctx context.Context) (bool, boo a.log.Debug("DBServer not yet present") return false, false, nil } + + if s, ok := cache.Supervision.Health[state.Server(member.ID)]; !ok { + a.log.Debug("DBServer not yet present on the health") + return false, false, nil + } else if !s.IsHealthy() { + a.log.Str("Status", string(s.Status)).Str("SyncStatus", string(s.SyncStatus)).Debug("DBServer not yet healthy") + return false, false, nil + } + case api.ServerGroupCoordinators: if !cache.Plan.Coordinators.Exists(state.Server(member.ID)) { a.log.Debug("Coordinator not yet present") return false, false, nil } + + if s, ok := cache.Supervision.Health[state.Server(member.ID)]; !ok { + a.log.Debug("Coordinator not yet present on the health") + return false, false, nil + } else if !s.IsHealthy() { + a.log.Str("Status", string(s.Status)).Str("SyncStatus", string(s.SyncStatus)).Debug("Coordinator not yet healthy") + return false, false, nil + } } return member.Conditions.IsTrue(api.ConditionTypeReady), false, nil diff --git a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go index 4879fb0de..649c84750 100644 --- a/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go +++ b/pkg/deployment/reconcile/plan_builder_rotate_upgrade.go @@ -617,7 +617,10 @@ func skipResignLeadership(mode api.DeploymentMode, v driver.Version) bool { } func withWaitForMember(plan api.Plan, group api.ServerGroup, member api.MemberStatus) api.Plan { - return append(plan, waitForMemberActions(group, member)...) + return plan.AfterFirst(func(a api.Action) bool { + return a.Type == api.ActionTypeAddMember + }, waitForMemberActions(group, member)...) + } func waitForMemberActions(group api.ServerGroup, member api.MemberStatus) api.Plan { diff --git a/pkg/deployment/reconcile/plan_builder_scale.go b/pkg/deployment/reconcile/plan_builder_scale.go index 2744263b1..0a454dc1c 100644 --- a/pkg/deployment/reconcile/plan_builder_scale.go +++ b/pkg/deployment/reconcile/plan_builder_scale.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/actions" @@ -147,7 +148,11 @@ func (r *Reconciler) createReplaceMemberPlan(ctx context.Context, apiObject k8su switch group { case api.ServerGroupDBServers: if len(status.Members.DBServers) <= spec.DBServers.GetCount() { - plan = append(plan, actions.NewAction(api.ActionTypeAddMember, group, sharedReconcile.WithPredefinedMember(""))) + plan = append(plan, + actions.NewAction(api.ActionTypeAddMember, group, sharedReconcile.WithPredefinedMember("")), + actions.NewAction(api.ActionTypeDelay, group, sharedReconcile.WithPredefinedMember(api.MemberIDPreviousAction)).AddParam(DelayActionDuration, (15*time.Second).String()), + actions.NewAction(api.ActionTypeMigrateMember, group, sharedReconcile.WithPredefinedMember(api.MemberIDPreviousAction)).AddParam(actionMigrateMemberSourceKey, member.ID), + ) r.planLogger. Str("role", group.AsRole()). Debug("Creating replacement plan") diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 5d30ba6de..0c9b489f9 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2025 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -446,6 +446,8 @@ func (d *Reconciler) executeActionStart(ctx context.Context, action Action) (don func (d *Reconciler) createAction(action api.Action) (Action, ActionContext) { actionCtx := newActionContext(d.log, d.context, &d.metrics) + actionCtx.SetProgress(action.Progress) + f, ok := getActionFactory(action.Type) if !ok { panic(fmt.Sprintf("Unknown action type '%s'", action.Type))