Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instance): move to task v3 #6022

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func setupControllers(mgr ctrl.Manager, c client.Client, pdcm pdm.PDClientManage
if err := tidbgroup.Setup(mgr, c); err != nil {
return fmt.Errorf("unable to create controller TiDBGroup: %w", err)
}
if err := tidb.Setup(mgr, c, vm); err != nil {
if err := tidb.Setup(mgr, c, pdcm, vm); err != nil {
return fmt.Errorf("unable to create controller TiDB: %w", err)
}
if err := tikvgroup.Setup(mgr, c); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/common/cond.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func CondGroupHasBeenDeleted[RG runtime.GroupT[G], G runtime.GroupSet](state Gro
return state.Group() == nil
})
}

func CondInstanceIsDeleting[I runtime.Instance](state InstanceState[I]) task.Condition {
return task.CondFunc(func() bool {
return !state.Instance().GetDeletionTimestamp().IsZero()
})
}

func CondInstanceHasBeenDeleted[RI runtime.InstanceT[I], I runtime.InstanceSet](state InstanceState[RI]) task.Condition {
return task.CondFunc(func() bool {
return state.Instance() == nil
})
}
11 changes: 10 additions & 1 deletion pkg/controllers/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ type GroupState[G runtime.Group] interface {
Group() G
}

type InstanceState[I runtime.Instance] interface {
Instance() I
}

type InstanceAndPodState[I runtime.Instance] interface {
InstanceState[I]
PodState
}

type InstanceSliceState[I runtime.Instance] interface {
Slice() []I
}
Expand Down Expand Up @@ -180,7 +189,7 @@ type (
TiFlashGroup() *v1alpha1.TiFlashGroup
}
TiFlashStateInitializer interface {
TiFlashInitializer() TiFlashGroupInitializer
TiFlashInitializer() TiFlashInitializer
}
TiFlashState interface {
TiFlash() *v1alpha1.TiFlash
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/common/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ func TaskContextPD(state PDStateInitializer, c client.Client) task.Task {
return taskContextResource("PD", w, c, false)
}

func TaskContextTiKV(state TiKVStateInitializer, c client.Client) task.Task {
w := state.TiKVInitializer()
return taskContextResource("TiKV", w, c, false)
}

func TaskContextTiDB(state TiDBStateInitializer, c client.Client) task.Task {
w := state.TiDBInitializer()
return taskContextResource("TiDB", w, c, false)
}

func TaskContextTiFlash(state TiFlashStateInitializer, c client.Client) task.Task {
w := state.TiFlashInitializer()
return taskContextResource("TiFlash", w, c, false)
}

func TaskContextCluster(state ClusterStateInitializer, c client.Client) task.Task {
w := state.ClusterInitializer()
return taskContextResource("Cluster", w, c, true)
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/common/task_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ func TaskGroupFinalizerAdd[
})
}

func TaskInstanceFinalizerAdd[
IT runtime.InstanceTuple[OI, RI],
OI client.Object,
RI runtime.Instance,
](state InstanceState[RI], c client.Client) task.Task {
return task.NameTaskFunc("FinalizerAdd", func(ctx context.Context) task.Result {
var t IT
if err := k8s.EnsureFinalizer(ctx, c, t.To(state.Instance())); err != nil {
return task.Fail().With("failed to ensure finalizer has been added: %v", err)
}
return task.Complete().With("finalizer is added")
})
}

const defaultDelWaitTime = 10 * time.Second

func TaskGroupFinalizerDel[
Expand Down
52 changes: 52 additions & 0 deletions pkg/controllers/common/task_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,58 @@ import (
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

func TaskInstanceStatusSuspend[
IT runtime.InstanceTuple[OI, RI],
OI client.Object,
RI runtime.Instance,
](state InstanceAndPodState[RI], c client.Client) task.Task {
var it IT
return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result {
instance := state.Instance()
var (
suspendStatus = metav1.ConditionFalse
suspendReason = v1alpha1.ReasonSuspending
suspendMessage = "instance is suspending"

// when suspending, the health status should be false
healthStatus = metav1.ConditionFalse
healthReason = "Suspend"
healthMessage = "instance is suspending or suspended, mark it as unhealthy"
)

if state.Pod() == nil {
suspendReason = v1alpha1.ReasonSuspended
suspendStatus = metav1.ConditionTrue
suspendMessage = "instance is suspended"
}

needUpdate := SetStatusObservedGeneration(instance)
needUpdate = SetStatusCondition(instance, &metav1.Condition{
Type: v1alpha1.CondSuspended,
Status: suspendStatus,
ObservedGeneration: instance.GetGeneration(),
Reason: suspendReason,
Message: suspendMessage,
}) || needUpdate

needUpdate = SetStatusCondition(instance, &metav1.Condition{
Type: v1alpha1.CondHealth,
Status: healthStatus,
ObservedGeneration: instance.GetGeneration(),
Reason: healthReason,
Message: healthMessage,
}) || needUpdate

if needUpdate {
if err := c.Status().Update(ctx, it.To(instance)); err != nil {
return task.Fail().With("cannot update status: %v", err)
}
}

return task.Complete().With("status is updated")
})
}

func TaskGroupStatusSuspend[
GT runtime.GroupTuple[OG, RG],
OG client.Object,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/pd/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/pd/tasks"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

Expand Down Expand Up @@ -45,8 +46,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
task.IfBreak(
common.CondClusterIsSuspending(state),
common.TaskSuspendPod(state, r.Client),
// TODO: extract as a common task
tasks.TaskStatusSuspend(state, r.Client),
common.TaskInstanceStatusSuspend[runtime.PDTuple](state, r.Client),
),

common.TaskContextPDSlice(state, r.Client),
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/pd/tasks/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/runtime"
)

type state struct {
Expand All @@ -42,6 +43,7 @@ type State interface {
common.PodState
common.PDSliceState

common.InstanceState[*runtime.PD]
SetPod(*corev1.Pod)
}

Expand All @@ -64,6 +66,10 @@ func (s *state) Pod() *corev1.Pod {
return s.pod
}

func (s *state) Instance() *runtime.PD {
return runtime.FromPD(s.pd)
}

func (s *state) SetPod(pod *corev1.Pod) {
s.pod = pod
}
Expand Down
42 changes: 0 additions & 42 deletions pkg/controllers/pd/tasks/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,6 @@ import (
"github.com/pingcap/tidb-operator/third_party/kubernetes/pkg/controller/statefulset"
)

func TaskStatusSuspend(state *ReconcileContext, c client.Client) task.Task {
return task.NameTaskFunc("StatusSuspend", func(ctx context.Context) task.Result {
state.PD().Status.ObservedGeneration = state.PD().Generation
var (
suspendStatus = metav1.ConditionFalse
suspendMessage = "pd is suspending"

// when suspending, the health status should be false
healthStatus = metav1.ConditionFalse
healthMessage = "pd is not healthy"
)

if state.Pod() == nil {
suspendStatus = metav1.ConditionTrue
suspendMessage = "pd is suspended"
}
needUpdate := meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{
Type: v1alpha1.PDCondSuspended,
Status: suspendStatus,
ObservedGeneration: state.PD().Generation,
// TODO: use different reason for suspending and suspended
Reason: v1alpha1.PDSuspendReason,
Message: suspendMessage,
})

needUpdate = meta.SetStatusCondition(&state.PD().Status.Conditions, metav1.Condition{
Type: v1alpha1.PDCondHealth,
Status: healthStatus,
ObservedGeneration: state.PD().Generation,
Reason: v1alpha1.PDHealthReason,
Message: healthMessage,
}) || needUpdate
if needUpdate {
if err := c.Status().Update(ctx, state.PD()); err != nil {
return task.Fail().With("cannot update status: %v", err)
}
}

return task.Complete().With("status of suspend pd is updated")
})
}

func TaskStatusUnknown() task.Task {
return task.NameTaskFunc("StatusUnknown", func(_ context.Context) task.Result {
return task.Wait().With("status of the pd is unknown")
Expand Down
45 changes: 22 additions & 23 deletions pkg/controllers/tidb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,43 @@
package tidb

import (
"github.com/pingcap/tidb-operator/pkg/controllers/common"
"github.com/pingcap/tidb-operator/pkg/controllers/tidb/tasks"
"github.com/pingcap/tidb-operator/pkg/utils/task/v2"
"github.com/pingcap/tidb-operator/pkg/runtime"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
)

func (r *Reconciler) NewRunner(reporter task.TaskReporter) task.TaskRunner[tasks.ReconcileContext] {
func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.TaskReporter) task.TaskRunner {
runner := task.NewTaskRunner(reporter,
// get tidb
tasks.TaskContextTiDB(r.Client),
common.TaskContextTiDB(state, r.Client),
// if it's deleted just return
task.NewSwitchTask(tasks.CondTiDBHasBeenDeleted()),
task.IfBreak(common.CondInstanceHasBeenDeleted(state)),

// get cluster info, FinalizerDel will use it
tasks.TaskContextCluster(r.Client),
task.NewSwitchTask(tasks.CondPDIsNotInitialized()),
common.TaskContextCluster(state, r.Client),
// check whether it's paused
task.IfBreak(common.CondClusterIsPaused(state)),

task.NewSwitchTask(tasks.CondTiDBIsDeleting(),
tasks.TaskFinalizerDel(r.Client),
task.IfBreak(common.CondInstanceIsDeleting(state),
tasks.TaskFinalizerDel(state, r.Client),
),

// check whether it's paused
task.NewSwitchTask(tasks.CondClusterIsPaused()),
common.TaskInstanceFinalizerAdd[runtime.TiDBTuple](state, r.Client),

// get pod and check whether the cluster is suspending
tasks.TaskContextPod(r.Client),
task.NewSwitchTask(tasks.CondClusterIsSuspending(),
tasks.TaskFinalizerAdd(r.Client),
tasks.TaskPodSuspend(r.Client),
tasks.TaskStatusSuspend(r.Client),
common.TaskContextPod(state, r.Client),
task.IfBreak(common.CondClusterIsSuspending(state),
common.TaskSuspendPod(state, r.Client),
common.TaskInstanceStatusSuspend[runtime.TiDBTuple](state, r.Client),
),

// normal process
tasks.TaskContextInfoFromPDAndTiDB(r.Client),
tasks.TaskFinalizerAdd(r.Client),
tasks.NewTaskConfigMap(r.Logger, r.Client),
tasks.NewTaskPVC(r.Logger, r.Client, r.VolumeModifier),
tasks.NewTaskPod(r.Logger, r.Client),
tasks.NewTaskServerLabels(r.Logger, r.Client),
tasks.NewTaskStatus(r.Logger, r.Client),
tasks.TaskContextInfoFromPDAndTiDB(state, r.Client, r.PDClientManager),
tasks.TaskConfigMap(state, r.Client),
tasks.TaskPVC(state, r.Logger, r.Client, r.VolumeModifier),
tasks.TaskPod(state, r.Client),
tasks.TaskServerLabels(state, r.Client),
tasks.TaskStatus(state, r.Client),
)

return runner
Expand Down
27 changes: 14 additions & 13 deletions pkg/controllers/tidb/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,25 @@ import (
"github.com/pingcap/tidb-operator/apis/core/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client"
"github.com/pingcap/tidb-operator/pkg/controllers/tidb/tasks"
pdm "github.com/pingcap/tidb-operator/pkg/timanager/pd"
"github.com/pingcap/tidb-operator/pkg/utils/k8s"
"github.com/pingcap/tidb-operator/pkg/utils/task/v2"
"github.com/pingcap/tidb-operator/pkg/utils/task/v3"
"github.com/pingcap/tidb-operator/pkg/volumes"
)

type Reconciler struct {
Logger logr.Logger
Client client.Client
VolumeModifier volumes.Modifier
Logger logr.Logger
Client client.Client
PDClientManager pdm.PDClientManager
VolumeModifier volumes.Modifier
}

func Setup(mgr manager.Manager, c client.Client, vm volumes.Modifier) error {
func Setup(mgr manager.Manager, c client.Client, pdcm pdm.PDClientManager, vm volumes.Modifier) error {
r := &Reconciler{
Logger: mgr.GetLogger().WithName("TiDB"),
Client: c,
VolumeModifier: vm,
Logger: mgr.GetLogger().WithName("TiDB"),
Client: c,
PDClientManager: pdcm,
VolumeModifier: vm,
}
return ctrl.NewControllerManagedBy(mgr).For(&v1alpha1.TiDB{}).
Owns(&corev1.Pod{}).
Expand All @@ -66,11 +69,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}()

rtx := &tasks.ReconcileContext{
// some fields will be set in the context task
Context: ctx,
Key: req.NamespacedName,
State: tasks.NewState(req.NamespacedName),
}

runner := r.NewRunner(reporter)
return runner.Run(rtx)
runner := r.NewRunner(rtx, reporter)
return runner.Run(ctx)
}
Loading
Loading