diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 89d3b7aca..c2d511008 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -45,6 +45,8 @@ rules: - get - list - watch + - patch + - delete - apiGroups: - "" resources: diff --git a/config/webhook/manifests.v1beta1.yaml b/config/webhook/manifests.v1beta1.yaml index 0d8f161c0..88156e64a 100644 --- a/config/webhook/manifests.v1beta1.yaml +++ b/config/webhook/manifests.v1beta1.yaml @@ -48,6 +48,24 @@ metadata: creationTimestamp: null name: webhook webhooks: + - clientConfig: + caBundle: Cg== + service: + name: webhook-service + namespace: system + path: /validate-v1-pod + failurePolicy: Ignore + name: vpod.elbv2.k8s.aws + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - DELETE + resources: + - pods + sideEffects: NoneOnDryRun - clientConfig: caBundle: Cg== service: diff --git a/main.go b/main.go index 47f13bbcf..04a1f3ce1 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/aws-load-balancer-controller/pkg/aws" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle" "sigs.k8s.io/aws-load-balancer-controller/pkg/config" + "sigs.k8s.io/aws-load-balancer-controller/pkg/gracefuldrain" "sigs.k8s.io/aws-load-balancer-controller/pkg/inject" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" @@ -143,6 +144,14 @@ func main() { elbv2webhook.NewTargetGroupBindingValidator(ctrl.Log).SetupWithManager(mgr) //+kubebuilder:scaffold:builder + podGracefulDrain := gracefuldrain.NewPodGracefulDrain(controllerCFG.PodGracefulDrainConfig, mgr.GetClient(), + ctrl.Log.WithName("pod-graceful-drain")) + if err := mgr.Add(podGracefulDrain); err != nil { + setupLog.Error(err, "Unable to add pod-graceful-drain") + os.Exit(1) + } + corewebhook.NewPodValidator(podGracefulDrain, ctrl.Log).SetupWithManager(mgr) + stopChan := ctrl.SetupSignalHandler() go func() { setupLog.Info("starting podInfo repo") diff --git a/pkg/config/controller_config.go b/pkg/config/controller_config.go index 82ea8055a..169dbaf00 100644 --- a/pkg/config/controller_config.go +++ b/pkg/config/controller_config.go @@ -4,6 +4,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/pflag" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws" + "sigs.k8s.io/aws-load-balancer-controller/pkg/gracefuldrain" "sigs.k8s.io/aws-load-balancer-controller/pkg/inject" ) @@ -33,6 +34,8 @@ type ControllerConfig struct { IngressConfig IngressConfig // Configurations for Addons feature AddonsConfig AddonsConfig + // Configurations for Pod graaceful drain + PodGracefulDrainConfig gracefuldrain.Config // Default AWS Tags that will be applied to all AWS resources managed by this controller. DefaultTags map[string]string @@ -61,6 +64,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) { cfg.PodWebhookConfig.BindFlags(fs) cfg.IngressConfig.BindFlags(fs) cfg.AddonsConfig.BindFlags(fs) + cfg.PodGracefulDrainConfig.BindFlags(fs) } // Validate the controller configuration @@ -68,5 +72,8 @@ func (cfg *ControllerConfig) Validate() error { if len(cfg.ClusterName) == 0 { return errors.New("kubernetes cluster name must be specified") } + if cfg.PodGracefulDrainConfig.PodGracefulDrainDelay < 0 { + return errors.New("pod-graceful-drain-delay cannot be negative") + } return nil } diff --git a/pkg/gracefuldrain/config.go b/pkg/gracefuldrain/config.go new file mode 100644 index 000000000..cd14551df --- /dev/null +++ b/pkg/gracefuldrain/config.go @@ -0,0 +1,19 @@ +package gracefuldrain + +import ( + "github.com/spf13/pflag" + "time" +) + +const ( + flagPodGracefulDrainDelay = "pod-graceful-drain-delay" +) + +type Config struct { + PodGracefulDrainDelay time.Duration +} + +func (cfg *Config) BindFlags(fs *pflag.FlagSet) { + fs.DurationVar(&cfg.PodGracefulDrainDelay, flagPodGracefulDrainDelay, time.Duration(0), + `Deregistering pod's deletions are delayed while draining`) +} diff --git a/pkg/gracefuldrain/pod_graceful_drain.go b/pkg/gracefuldrain/pod_graceful_drain.go new file mode 100644 index 000000000..2cf5d0386 --- /dev/null +++ b/pkg/gracefuldrain/pod_graceful_drain.go @@ -0,0 +1,407 @@ +package gracefuldrain + +import ( + "context" + "encoding/json" + "github.com/go-logr/logr" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1" + "sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding" + "sigs.k8s.io/aws-load-balancer-controller/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/client" + "strings" + "sync" + "time" +) + +// NewPodGracefulDrain constructs new PodGracefulDrain +func NewPodGracefulDrain(config Config, k8sClient client.Client, logger logr.Logger) *PodGracefulDrain { + ctx, cancel := context.WithCancel(context.Background()) + + return &PodGracefulDrain{ + config: config, + k8sClient: k8sClient, + logger: logger, + + deleterStopper: make(chan struct{}, 1), + deleterContext: ctx, + deleterCancelFunc: cancel, + } +} + +// The controller is notified when a pod is deleted and it'll deregister the pod from an ELB. +// However, there is inheritant delay until the pod is fully deregistered. +// This is the cause of 5XX error: ELB send traffic to a already terminated pods during the delay. +// PodGracefulDrain will delay a pod deletion while trigger the deregistration by isolating the pod from Endpoints and ReplicaSets. +type PodGracefulDrain struct { + config Config + k8sClient client.Client + logger logr.Logger + + allowedReentry map[types.UID]bool + allowedReentryMutex sync.RWMutex + + deleterStopper chan struct{} + deleterWaitGroup sync.WaitGroup + deleterContext context.Context + deleterCancelFunc context.CancelFunc +} + +const ( + gracefulDrainPrefix = "graceful-drain.elbv2.k8s.aws" + // withLabelKey labels the pod is waiting for the draining. + waitLabelKey = gracefulDrainPrefix + "/wait" + deleteAtAnnotationKey = gracefulDrainPrefix + "/deleteAt" + originalLabelsAnnotationKey = gracefulDrainPrefix + "/originalLabels" + + defaultDeleterGracefulTerminationPeriod = 10 * time.Second +) + +// InterceptPodDeletion intercepts the pod deletion on validatingAdmissionWebhook, isolates the pod, and schedules the delayed deletion if needed. +// It won't bother if the pod is/was not bound to any TargetGroupBinding, or any errors have been occurred during this process. +func (d *PodGracefulDrain) InterceptPodDeletion(ctx context.Context, pod *corev1.Pod) error { + if d.config.PodGracefulDrainDelay == time.Duration(0) { + return nil + } + + logger := d.logger.WithValues("pod", types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }) + + canDeleteNow, err := d.canDeleteNow(ctx, pod) + if err != nil { + logger.Error(err, "unable to determine pod deletion should be delayed") + return nil + } else if canDeleteNow { + return nil + } + + // At this point, it might be one of the following cases: + // 1) The target pod is going to be deleted + // => we should patch to isolate them, schedule the delayed deletion and deny the admission. + // 2) apiserver immediately retried the deletion when we patched the pod and denied the admission + // since it is indistinguishable from the collision. + // => isolatePod should be idempotent. Keep denying the admission until it forgive. + // 3) GC tries to delete the pod if there is lingering ownerReferences. + // => isolatePod should patch properly so the GC doesn't kick in. + // 4) Users and controllers manually tries to delete the pod before deleteAt. + // => User can see the admission report message. Controller should handle admission failures. + // 5) We disabled wait sentinel label and deleted the pod, but the patch hasn't been propagated fast enough + // so ValidatingAdmissionWebhook read the wait label of the old version + // => deleteAfter will retry with back-offs, so we keep denying the admission. + + isolated, err := d.isolatePod(ctx, pod) + if err != nil { + logger.Error(err, "unable to isolate the pod") + return nil + } + + if isolated { + d.deleteAfter(pod, d.config.PodGracefulDrainDelay) + } + + return errors.New("pod-graceful-drain took over the pod's deletion. It will eventually be deleted") +} + +func (d *PodGracefulDrain) canDeleteNow(ctx context.Context, pod *corev1.Pod) (bool, error) { + req := webhook.ContextGetAdmissionRequest(ctx) + if req.DryRun != nil && *req.DryRun == true { + return true, nil + } + + waitLabelValue := pod.Labels[waitLabelKey] + deleteAt, err := getDeleteAtAnnotation(pod) + if len(waitLabelValue) > 0 { + now := time.Now() + if err != nil && now.Before(deleteAt) { + return false, nil + } + // deleteAt is missing or malformed, but it is okay to delete it later. + return false, nil + } else if err == nil { + // The wait label might be deleted by the user, or this controller. Allow its deletions. + return true, err + } + + tgbs, err := d.fetchTGBsForDelayedDeletion(ctx, pod) + if err != nil { + return true, err + } + + if len(tgbs) == 0 { + for _, item := range pod.Spec.ReadinessGates { + if strings.HasPrefix(string(item.ConditionType), targetgroupbinding.TargetHealthPodConditionTypePrefix) { + // The pod once had TargetGroupBindings, but it is somehow gone. + // We don't know whether its TargetType is IP, it's target group, etc. + // It might be worth to to give some time to ELB. + return false, nil + } + } + return true, nil + } + return false, nil +} + +func (d *PodGracefulDrain) fetchTGBsForDelayedDeletion(ctx context.Context, pod *corev1.Pod) ([]elbv2api.TargetGroupBinding, error) { + tgbList := &elbv2api.TargetGroupBindingList{} + if err := d.k8sClient.List(ctx, tgbList, client.InNamespace(pod.Namespace)); err != nil { + d.logger.V(1).Info("unable to list TargetGroupBindings", "namespace", pod.Namespace) + return nil, err + } + var tgbs []elbv2api.TargetGroupBinding + for _, tgb := range tgbList.Items { + if tgb.Spec.TargetType == nil || (*tgb.Spec.TargetType) != elbv2api.TargetTypeIP { + continue + } + + svcKey := types.NamespacedName{Namespace: tgb.Namespace, Name: tgb.Spec.ServiceRef.Name} + svc := &corev1.Service{} + if err := d.k8sClient.Get(ctx, svcKey, svc); err != nil { + // If the service is not found, ignore + if apierrors.IsNotFound(err) { + d.logger.Info("unable to lookup service", "service", svcKey) + continue + } + return nil, err + } + var svcSelector labels.Selector + if len(svc.Spec.Selector) == 0 { + svcSelector = labels.Nothing() + } else { + svcSelector = labels.SelectorFromSet(svc.Spec.Selector) + } + if svcSelector.Matches(labels.Set(pod.Labels)) { + tgbs = append(tgbs, tgb) + } + } + return tgbs, nil +} + +func (d *PodGracefulDrain) isolatePod(ctx context.Context, pod *corev1.Pod) (bool, error) { + patchCond := func(pod *corev1.Pod) bool { + existingLabel := pod.Labels[waitLabelKey] + return len(existingLabel) > 0 + } + patchMutate := func(pod *corev1.Pod) error { + deleteAt := time.Now().Add(d.config.PodGracefulDrainDelay) + + oldLabels, err := json.Marshal(pod.Labels) + if err != nil { + return err + } + + pod.Labels = map[string]string{ + waitLabelKey: "true", + } + pod.Annotations[deleteAtAnnotationKey] = deleteAt.Format(time.RFC3339) + pod.Annotations[originalLabelsAnnotationKey] = string(oldLabels) + + var newOwnerReferences []metav1.OwnerReference + // To stop the GC kicking in, we cut the OwnerReferences. + for _, item := range pod.OwnerReferences { + newItem := item.DeepCopy() + newItem.Controller = nil + newOwnerReferences = append(newOwnerReferences, *newItem) + } + pod.OwnerReferences = newOwnerReferences + + return nil + } + + return d.patchPod(ctx, pod, patchCond, patchMutate) +} + +func (d *PodGracefulDrain) deleteAfter(pod *corev1.Pod, dur time.Duration) { + logger := d.logger.WithValues("pod", types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }) + + d.deleterWaitGroup.Add(1) + ctx, cancel := context.WithCancel(d.deleterContext) + go func(pod corev1.Pod) { + defer d.deleterWaitGroup.Done() + defer cancel() + + select { + case <-d.deleterStopper: + case <-time.After(dur): + } + + patched, err := d.disableWaitLabel(ctx, &pod) + if err != nil { + logger.Error(err, "unable to disable the wait label") + return + } + if !patched { + return // pod have been deleted + } + + err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + if err := d.k8sClient.Delete(ctx, &pod, client.Preconditions{UID: &pod.UID}); err != nil { + if apierrors.IsNotFound(err) || apierrors.IsConflict(err) { + // The pod is already deleted. Okay to ignore + return true, nil + } + // InterceptPodDeletion might deny the deletion as TooEarly until disableWaitLabel patch is propagated. + // TODO: error is actually admission denial + return false, nil + } + return true, nil + }) + + if err != nil { + logger.Error(err, "unable to delete the pod") + } else { + logger.V(1).Info("successfully deleted the delayed pod") + } + }(*pod.DeepCopy()) + + logger.V(1).Info("scheduled pod deletion", "deleteAt", time.Now().Add(dur)) +} + +func (d *PodGracefulDrain) disableWaitLabel(ctx context.Context, pod *corev1.Pod) (bool, error) { + patchCond := func(pod *corev1.Pod) bool { + existingLabel := pod.Labels[waitLabelKey] + return len(existingLabel) == 0 + } + patchMutate := func(pod *corev1.Pod) error { + // set empty rather than removing it. It helps to manually find delayed pods. + pod.Labels[waitLabelKey] = "" + return nil + } + return d.patchPod(ctx, pod, patchCond, patchMutate) +} + +// returns true when it successfully patched. +// returns false when the pod is deleted or the condition is already met. +func (d *PodGracefulDrain) patchPod(ctx context.Context, pod *corev1.Pod, condition func(*corev1.Pod) bool, mutate func(*corev1.Pod) error) (bool, error) { + podUID := pod.UID + podKey := types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } + + for { + if condition(pod) { + return false, nil + } + + oldPod := pod.DeepCopy() + oldPod.UID = "" // only put the uid in the new object to ensure it appears in the patch as a precondition + + if err := mutate(pod); err != nil { + return false, nil + } + + podMergeOption := client.MergeFromWithOptions(oldPod, client.MergeFromWithOptimisticLock{}) + if err := d.k8sClient.Patch(ctx, pod, podMergeOption); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + if apierrors.IsConflict(err) { + if err := d.k8sClient.Get(ctx, podKey, pod); err != nil { + return false, err + } + if pod.UID != podUID { + return false, nil // UID conflict -> pod is gone + } + continue + } + return false, err + } + + // see https://github.com/kubernetes-sigs/controller-runtime/issues/1257 + err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) { + if condition(pod) { + return true, nil + } + if err := d.k8sClient.Get(ctx, podKey, pod); err != nil { + return false, err + } + if pod.UID != podUID { + return true, nil // UID conflict -> pod is gone + } + return false, nil + }) + if err != nil { + return false, err + } + return true, err + } +} + +// CleanupPreviousRun finds pods that are not deleted properly in the previous run, and reschedule them. +func (d *PodGracefulDrain) cleanupPreviousRun(ctx context.Context) error { + podList := &corev1.PodList{} + // select all pods regardless of its value. The pod was about to be deleted when its value is empty. + if err := d.k8sClient.List(ctx, podList, client.HasLabels{waitLabelKey}); err != nil { + return err + } + + now := time.Now() + for _, pod := range podList.Items { + deleteAfter := d.config.PodGracefulDrainDelay + deleteAt, err := getDeleteAtAnnotation(&pod) + if err == nil { + deleteAfter = deleteAt.Sub(now) + } + + d.deleteAfter(&pod, deleteAfter) + } + return nil +} + +func getDeleteAtAnnotation(pod *corev1.Pod) (time.Time, error) { + value, ok := pod.Annotations[deleteAtAnnotationKey] + if !ok { + return time.Time{}, errors.New("unable to lookup deleteAt annotation") + } + deleteAt, err := time.Parse(time.RFC3339, value) + if err != nil { + return time.Time{}, err + } + return deleteAt, nil +} + +func (d *PodGracefulDrain) Start(stop <-chan struct{}) error { + if d.config.PodGracefulDrainDelay == time.Duration(0) { + return nil + } + + d.logger.Info("Starting pod-graceful-drain") + + if err := d.cleanupPreviousRun(context.Background()); err != nil { + d.logger.Error(err, "problem while cleaning up pods") + } + <-stop + + stopped := make(chan struct{}, 1) + go func() { + d.deleterWaitGroup.Wait() + stopped <- struct{}{} + }() + + select { + case <-stopped: + // pod drained all deleter goroutines in time. + case <-time.After(d.config.PodGracefulDrainDelay): + // I gave them enough time, but they haven't finished their job, so I signal them to hurry up + close(d.deleterStopper) + // and give them a little more time to cleanup. + select { + case <-stopped: + case <-time.After(defaultDeleterGracefulTerminationPeriod): + } + } + d.deleterCancelFunc() + return nil +} diff --git a/webhooks/core/pod_validator.go b/webhooks/core/pod_validator.go new file mode 100644 index 000000000..01e31597b --- /dev/null +++ b/webhooks/core/pod_validator.go @@ -0,0 +1,62 @@ +package core + +import ( + "context" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/aws-load-balancer-controller/pkg/gracefuldrain" + "sigs.k8s.io/aws-load-balancer-controller/pkg/webhook" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +const ( + apiPathValidatePod = "/validate-v1-pod" +) + +// NewPodValidator returns a mutator for Pod. +func NewPodValidator(podGracefulDrain *gracefuldrain.PodGracefulDrain, logger logr.Logger) *podValidator { + return &podValidator{ + podGracefulDrain, + logger, + } +} + +var _ webhook.Validator = &podValidator{} + +type podValidator struct { + podGracefulDrain *gracefuldrain.PodGracefulDrain + logger logr.Logger +} + +func (v *podValidator) Prototype(_ admission.Request) (runtime.Object, error) { + return &corev1.Pod{}, nil +} + +func (v *podValidator) ValidateCreate(ctx context.Context, obj runtime.Object) error { + return nil +} + +func (v *podValidator) ValidateUpdate(ctx context.Context, obj runtime.Object, oldObj runtime.Object) error { + return nil +} + +func (v *podValidator) ValidateDelete(ctx context.Context, obj runtime.Object) error { + pod := obj.(*corev1.Pod) + // It is somewhat awkward to modify the pod in the validating webhook. + // However, podGracefulDrain doesn't modify the request on the fly, it listens and schedule delayed deletion (with some side-effects) + // So the mutating webhook would be awkward too. + // Here's the rationale: It just adds delay on the pod deletion. The pod will be deleted 'eventually' anyway. + if err := v.podGracefulDrain.InterceptPodDeletion(ctx, pod); err != nil { + return err + } + + return nil +} + +// +kubebuilder:webhook:path=/validate-v1-pod,mutating=false,failurePolicy=fail,groups="",resources=pods,verbs=delete,versions=v1,name=vpod.elbv2.k8s.aws,sideEffects=NoneOnDryRun,webhookVersions=v1beta1 + +func (v *podValidator) SetupWithManager(mgr ctrl.Manager) { + mgr.GetWebhookServer().Register(apiPathValidatePod, webhook.ValidatingWebhookForValidator(v)) +}