Skip to content

Commit

Permalink
feat(instance): move to task v3 (#6022)
Browse files Browse the repository at this point in the history
Signed-off-by: liubo02 <[email protected]>
  • Loading branch information
liubog2008 authored Jan 7, 2025
1 parent 35d5ac7 commit 1dbb5b1
Show file tree
Hide file tree
Showing 44 changed files with 1,207 additions and 2,267 deletions.
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func setupControllers(mgr ctrl.Manager, c client.Client, pdcm pdm.PDClientManage
if err := tidbgroup.Setup(mgr, c); err != nil {
return fmt.Errorf("unable to create controller TiDBGroup: %w", err)
}
if err := tidb.Setup(mgr, c, vm); err != nil {
if err := tidb.Setup(mgr, c, pdcm, vm); err != nil {
return fmt.Errorf("unable to create controller TiDB: %w", err)
}
if err := tikvgroup.Setup(mgr, c); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/common/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func CondGroupHasBeenDeleted[RG runtime.GroupT[G], G runtime.GroupSet](state Gro
return state.Group() == nil
})
}

func CondInstanceIsDeleting[I runtime.Instance](state InstanceState[I]) task.Condition {
return task.CondFunc(func() bool {
return !state.Instance().GetDeletionTimestamp().IsZero()
})
}

func CondInstanceHasBeenDeleted[RI runtime.InstanceT[I], I runtime.InstanceSet](state InstanceState[RI]) task.Condition {
return task.CondFunc(func() bool {
return state.Instance() == nil
})
}
11 changes: 10 additions & 1 deletion pkg/controllers/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ type GroupState[G runtime.Group] interface {
Group() G
}

type InstanceState[I runtime.Instance] interface {
Instance() I
}

type InstanceAndPodState[I runtime.Instance] interface {
InstanceState[I]
PodState
}

type InstanceSliceState[I runtime.Instance] interface {
Slice() []I
}
Expand Down Expand Up @@ -180,7 +189,7 @@ type (
TiFlashGroup() *v1alpha1.TiFlashGroup
}
TiFlashStateInitializer interface {
TiFlashInitializer() TiFlashGroupInitializer
TiFlashInitializer() TiFlashInitializer
}
TiFlashState interface {
TiFlash() *v1alpha1.TiFlash
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/common/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ func TaskContextPD(state PDStateInitializer, c client.Client) task.Task {
return taskContextResource("PD", w, c, false)
}

func TaskContextTiKV(state TiKVStateInitializer, c client.Client) task.Task {
w := state.TiKVInitializer()
return taskContextResource("TiKV", w, c, false)
}

func TaskContextTiDB(state TiDBStateInitializer, c client.Client) task.Task {
w := state.TiDBInitializer()
return taskContextResource("TiDB", w, c, false)
}

func TaskContextTiFlash(state TiFlashStateInitializer, c client.Client) task.Task {
w := state.TiFlashInitializer()
return taskContextResource("TiFlash", w, c, false)
}

func TaskContextCluster(state ClusterStateInitializer, c client.Client) task.Task {
w := state.ClusterInitializer()
return taskContextResource("Cluster", w, c, true)
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/common/task_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func TaskGroupFinalizerAdd[
})
}

func TaskInstanceFinalizerAdd[
IT runtime.InstanceTuple[OI, RI],
OI client.Object,
RI runtime.Instance,
](state InstanceState[RI], c client.Client) task.Task {
return task.NameTaskFunc("FinalizerAdd", func(ctx context.Context) task.Result {
var t IT
if err := k8s.EnsureFinalizer(ctx, c, t.To(state.Instance())); err != nil {
return task.Fail().With("failed to ensure finalizer has been added: %v", err)
}
return task.Complete().With("finalizer is added")
})
}

const defaultDelWaitTime = 10 * time.Second

func TaskGroupFinalizerDel[
Expand Down
52 changes: 52 additions & 0 deletions pkg/controllers/common/task_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,58 @@ import (
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

func TaskInstanceStatusSuspend[
IT runtime.InstanceTuple[OI, RI],
OI client.Object,
RI runtime.Instance,
](state InstanceAndPodState[RI], c client.Client) task.Task {
var it IT
return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result {
instance := state.Instance()
var (
suspendStatus = metav1.ConditionFalse
suspendReason = v1alpha1.ReasonSuspending
suspendMessage = "instance is suspending"

// when suspending, the health status should be false
healthStatus = metav1.ConditionFalse
healthReason = "Suspend"
healthMessage = "instance is suspending or suspended, mark it as unhealthy"
)

if state.Pod() == nil {
suspendReason = v1alpha1.ReasonSuspended
suspendStatus = metav1.ConditionTrue
suspendMessage = "instance is suspended"
}

needUpdate := SetStatusObservedGeneration(instance)
needUpdate = SetStatusCondition(instance, &metav1.Condition{
Type: v1alpha1.CondSuspended,
Status: suspendStatus,
ObservedGeneration: instance.GetGeneration(),
Reason: suspendReason,
Message: suspendMessage,
}) || needUpdate

needUpdate = SetStatusCondition(instance, &metav1.Condition{
Type: v1alpha1.CondHealth,
Status: healthStatus,
ObservedGeneration: instance.GetGeneration(),
Reason: healthReason,
Message: healthMessage,
}) || needUpdate

if needUpdate {
if err := c.Status().Update(ctx, it.To(instance)); err != nil {
return task.Fail().With("cannot update status: %v", err)
}
}

return task.Complete().With("status is updated")
})
}

func TaskGroupStatusSuspend[
GT runtime.GroupTuple[OG, RG],
OG client.Object,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/pd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/pd/tasks"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

Expand Down Expand Up @@ -45,8 +46,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
task.IfBreak(
common.CondClusterIsSuspending(state),
common.TaskSuspendPod(state, r.Client),
// TODO: extract as a common task
tasks.TaskStatusSuspend(state, r.Client),
common.TaskInstanceStatusSuspend[runtime.PDTuple](state, r.Client),
),

common.TaskContextPDSlice(state, r.Client),
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/pd/tasks/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/runtime"
)

type state struct {
Expand All @@ -42,6 +43,7 @@ type State interface {
common.PodState
common.PDSliceState

common.InstanceState[*runtime.PD]
SetPod(*corev1.Pod)
}

Expand All @@ -64,6 +66,10 @@ func (s *state) Pod() *corev1.Pod {
return s.pod
}

func (s *state) Instance() *runtime.PD {
return runtime.FromPD(s.pd)
}

func (s *state) SetPod(pod *corev1.Pod) {
s.pod = pod
}
Expand Down
42 changes: 0 additions & 42 deletions pkg/controllers/pd/tasks/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,6 @@ import (
"github.com/pingcap/tidb-operator/third_party/kubernetes/pkg/controller/statefulset"
)

func TaskStatusSuspend(state *ReconcileContext, c client.Client) task.Task {
return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result {
state.PD().Status.ObservedGeneration = state.PD().Generation
var (
suspendStatus = metav1.ConditionFalse
suspendMessage = "pd is suspending"

// when suspending, the health status should be false
healthStatus = metav1.ConditionFalse
healthMessage = "pd is not healthy"
)

if state.Pod() == nil {
suspendStatus = metav1.ConditionTrue
suspendMessage = "pd is suspended"
}
needUpdate := meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{
Type: v1alpha1.PDCondSuspended,
Status: suspendStatus,
ObservedGeneration: state.PD().Generation,
// TODO: use different reason for suspending and suspended
Reason: v1alpha1.PDSuspendReason,
Message: suspendMessage,
})

needUpdate = meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{
Type: v1alpha1.PDCondHealth,
Status: healthStatus,
ObservedGeneration: state.PD().Generation,
Reason: v1alpha1.PDHealthReason,
Message: healthMessage,
}) || needUpdate
if needUpdate {
if err := c.Status().Update(ctx, state.PD()); err != nil {
return task.Fail().With("cannot update status: %v", err)
}
}

return task.Complete().With("status of suspend pd is updated")
})
}

func TaskStatusUnknown() task.Task {
return task.NameTaskFunc("StatusUnknown", func(_ context.Context) task.Result {
return task.Wait().With("status of the pd is unknown")
Expand Down
45 changes: 22 additions & 23 deletions pkg/controllers/tidb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,43 @@
package tidb

import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/tidb/tasks"
"github.com/pingcap/tidb-operator/pkg/utils/task/v2"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

func (r *Reconciler) NewRunner(reporter task.TaskReporter) task.TaskRunner[tasks.ReconcileContext] {
func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner {
runner := task.NewTaskRunner(reporter,
// get tidb
tasks.TaskContextTiDB(r.Client),
common.TaskContextTiDB(state, r.Client),
// if it's deleted just return
task.NewSwitchTask(tasks.CondTiDBHasBeenDeleted()),
task.IfBreak(common.CondInstanceHasBeenDeleted(state)),

// get cluster info, FinalizerDel will use it
tasks.TaskContextCluster(r.Client),
task.NewSwitchTask(tasks.CondPDIsNotInitialized()),
common.TaskContextCluster(state, r.Client),
// check whether it's paused
task.IfBreak(common.CondClusterIsPaused(state)),

task.NewSwitchTask(tasks.CondTiDBIsDeleting(),
tasks.TaskFinalizerDel(r.Client),
task.IfBreak(common.CondInstanceIsDeleting(state),
tasks.TaskFinalizerDel(state, r.Client),
),

// check whether it's paused
task.NewSwitchTask(tasks.CondClusterIsPaused()),
common.TaskInstanceFinalizerAdd[runtime.TiDBTuple](state, r.Client),

// get pod and check whether the cluster is suspending
tasks.TaskContextPod(r.Client),
task.NewSwitchTask(tasks.CondClusterIsSuspending(),
tasks.TaskFinalizerAdd(r.Client),
tasks.TaskPodSuspend(r.Client),
tasks.TaskStatusSuspend(r.Client),
common.TaskContextPod(state, r.Client),
task.IfBreak(common.CondClusterIsSuspending(state),
common.TaskSuspendPod(state, r.Client),
common.TaskInstanceStatusSuspend[runtime.TiDBTuple](state, r.Client),
),

// normal process
tasks.TaskContextInfoFromPDAndTiDB(r.Client),
tasks.TaskFinalizerAdd(r.Client),
tasks.NewTaskConfigMap(r.Logger, r.Client),
tasks.NewTaskPVC(r.Logger, r.Client, r.VolumeModifier),
tasks.NewTaskPod(r.Logger, r.Client),
tasks.NewTaskServerLabels(r.Logger, r.Client),
tasks.NewTaskStatus(r.Logger, r.Client),
tasks.TaskContextInfoFromPDAndTiDB(state, r.Client, r.PDClientManager),
tasks.TaskConfigMap(state, r.Client),
tasks.TaskPVC(state, r.Logger, r.Client, r.VolumeModifier),
tasks.TaskPod(state, r.Client),
tasks.TaskServerLabels(state, r.Client),
tasks.TaskStatus(state, r.Client),
)

return runner
Expand Down
27 changes: 14 additions & 13 deletions pkg/controllers/tidb/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,25 @@ import (
"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/controllers/tidb/tasks"
pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd"
"github.com/pingcap/tidb-operator/pkg/utils/k8s"
"github.com/pingcap/tidb-operator/pkg/utils/task/v2"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
"github.com/pingcap/tidb-operator/pkg/volumes"
)

type Reconciler struct {
Logger logr.Logger
Client client.Client
VolumeModifier volumes.Modifier
Logger logr.Logger
Client client.Client
PDClientManager pdm.PDClientManager
VolumeModifier volumes.Modifier
}

func Setup(mgr manager.Manager, c client.Client, vm volumes.Modifier) error {
func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager, vm volumes.Modifier) error {
r := &Reconciler{
Logger: mgr.GetLogger().WithName("TiDB"),
Client: c,
VolumeModifier: vm,
Logger: mgr.GetLogger().WithName("TiDB"),
Client: c,
PDClientManager: pdcm,
VolumeModifier: vm,
}
return ctrl.NewControllerManagedBy(mgr).For(&v1alpha1.TiDB{}).
Owns(&corev1.Pod{}).
Expand All @@ -66,11 +69,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}()

rtx := &tasks.ReconcileContext{
// some fields will be set in the context task
Context: ctx,
Key: req.NamespacedName,
State: tasks.NewState(req.NamespacedName),
}

runner := r.NewRunner(reporter)
return runner.Run(rtx)
runner := r.NewRunner(rtx, reporter)
return runner.Run(ctx)
}
Loading

0 comments on commit 1dbb5b1

Please sign in to comment.