Skip to content

Commit 4ad4ee3

Browse files
jsafranepospispa
authored andcommitted
Added PVC Protection Controller
This controller removes protection finalizer from PVCs that are being deleted and are not referenced by any pod.
1 parent a06901a commit 4ad4ee3

File tree

10 files changed

+967
-0
lines changed

10 files changed

+967
-0
lines changed

cmd/kube-controller-manager/app/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ go_library(
7979
"//pkg/controller/volume/attachdetach:go_default_library",
8080
"//pkg/controller/volume/expand:go_default_library",
8181
"//pkg/controller/volume/persistentvolume:go_default_library",
82+
"//pkg/controller/volume/pvcprotection:go_default_library",
8283
"//pkg/features:go_default_library",
8384
"//pkg/quota/generic:go_default_library",
8485
"//pkg/quota/install:go_default_library",

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ func NewControllerInitializers() map[string]InitFunc {
359359
controllers["attachdetach"] = startAttachDetachController
360360
controllers["persistentvolume-expander"] = startVolumeExpandController
361361
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
362+
controllers["pvc-protection"] = startPVCProtectionController
362363

363364
return controllers
364365
}

cmd/kube-controller-manager/app/core.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
5454
"k8s.io/kubernetes/pkg/controller/volume/expand"
5555
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
56+
"k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
5657
"k8s.io/kubernetes/pkg/features"
5758
"k8s.io/kubernetes/pkg/quota/generic"
5859
quotainstall "k8s.io/kubernetes/pkg/quota/install"
@@ -376,3 +377,15 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
376377

377378
return true, nil
378379
}
380+
381+
func startPVCProtectionController(ctx ControllerContext) (bool, error) {
382+
if utilfeature.DefaultFeatureGate.Enabled(features.PVCProtection) {
383+
go pvcprotection.NewPVCProtectionController(
384+
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
385+
ctx.InformerFactory.Core().V1().Pods(),
386+
ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"),
387+
).Run(1, ctx.Stop)
388+
return true, nil
389+
}
390+
return false, nil
391+
}

pkg/controller/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ filegroup(
133133
"//pkg/controller/volume/events:all-srcs",
134134
"//pkg/controller/volume/expand:all-srcs",
135135
"//pkg/controller/volume/persistentvolume:all-srcs",
136+
"//pkg/controller/volume/pvcprotection:all-srcs",
136137
],
137138
tags = ["automanaged"],
138139
)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = ["pvc_protection_controller.go"],
6+
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/controller:go_default_library",
10+
"//pkg/util/metrics:go_default_library",
11+
"//pkg/volume/util:go_default_library",
12+
"//pkg/volume/util/volumehelper:go_default_library",
13+
"//vendor/github.com/golang/glog:go_default_library",
14+
"//vendor/k8s.io/api/core/v1:go_default_library",
15+
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
16+
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
17+
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
18+
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
19+
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
20+
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
21+
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
22+
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
23+
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
24+
],
25+
)
26+
27+
go_test(
28+
name = "go_default_test",
29+
srcs = ["pvc_protection_controller_test.go"],
30+
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
31+
library = ":go_default_library",
32+
deps = [
33+
"//pkg/controller:go_default_library",
34+
"//pkg/volume/util:go_default_library",
35+
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
36+
"//vendor/github.com/golang/glog:go_default_library",
37+
"//vendor/k8s.io/api/core/v1:go_default_library",
38+
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
39+
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
40+
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
41+
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
42+
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
43+
"//vendor/k8s.io/client-go/informers:go_default_library",
44+
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
45+
"//vendor/k8s.io/client-go/testing:go_default_library",
46+
],
47+
)
48+
49+
filegroup(
50+
name = "package-srcs",
51+
srcs = glob(["**"]),
52+
tags = ["automanaged"],
53+
visibility = ["//visibility:private"],
54+
)
55+
56+
filegroup(
57+
name = "all-srcs",
58+
srcs = [":package-srcs"],
59+
tags = ["automanaged"],
60+
visibility = ["//visibility:public"],
61+
)
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package pvcprotection
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"github.com/golang/glog"
24+
"k8s.io/api/core/v1"
25+
apierrs "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/labels"
27+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
coreinformers "k8s.io/client-go/informers/core/v1"
30+
clientset "k8s.io/client-go/kubernetes"
31+
corelisters "k8s.io/client-go/listers/core/v1"
32+
"k8s.io/client-go/tools/cache"
33+
"k8s.io/client-go/util/workqueue"
34+
"k8s.io/kubernetes/pkg/controller"
35+
"k8s.io/kubernetes/pkg/util/metrics"
36+
volumeutil "k8s.io/kubernetes/pkg/volume/util"
37+
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
38+
)
39+
40+
// Controller is controller that removes PVCProtectionFinalizer
41+
// from PVCs that are used by no pods.
42+
type Controller struct {
43+
client clientset.Interface
44+
45+
pvcLister corelisters.PersistentVolumeClaimLister
46+
pvcListerSynced cache.InformerSynced
47+
48+
podLister corelisters.PodLister
49+
podListerSynced cache.InformerSynced
50+
51+
queue workqueue.RateLimitingInterface
52+
}
53+
54+
// NewPVCProtectionController returns a new *{VCProtectionController.
55+
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface) *Controller {
56+
e := &Controller{
57+
client: cl,
58+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
59+
}
60+
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
61+
metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolumeclaim_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
62+
}
63+
64+
e.pvcLister = pvcInformer.Lister()
65+
e.pvcListerSynced = pvcInformer.Informer().HasSynced
66+
pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
67+
AddFunc: e.pvcAddedUpdated,
68+
UpdateFunc: func(old, new interface{}) {
69+
e.pvcAddedUpdated(new)
70+
},
71+
})
72+
73+
e.podLister = podInformer.Lister()
74+
e.podListerSynced = podInformer.Informer().HasSynced
75+
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
76+
AddFunc: func(obj interface{}) {
77+
e.podAddedDeletedUpdated(obj, false)
78+
},
79+
DeleteFunc: func(obj interface{}) {
80+
e.podAddedDeletedUpdated(obj, true)
81+
},
82+
UpdateFunc: func(old, new interface{}) {
83+
e.podAddedDeletedUpdated(new, false)
84+
},
85+
})
86+
87+
return e
88+
}
89+
90+
// Run runs the controller goroutines.
91+
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
92+
defer utilruntime.HandleCrash()
93+
defer c.queue.ShutDown()
94+
95+
glog.Infof("Starting PVC protection controller")
96+
defer glog.Infof("Shutting down PVC protection controller")
97+
98+
if !controller.WaitForCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
99+
return
100+
}
101+
102+
for i := 0; i < workers; i++ {
103+
go wait.Until(c.runWorker, time.Second, stopCh)
104+
}
105+
106+
<-stopCh
107+
}
108+
109+
func (c *Controller) runWorker() {
110+
for c.processNextWorkItem() {
111+
}
112+
}
113+
114+
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
115+
func (c *Controller) processNextWorkItem() bool {
116+
pvcKey, quit := c.queue.Get()
117+
if quit {
118+
return false
119+
}
120+
defer c.queue.Done(pvcKey)
121+
122+
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string))
123+
if err != nil {
124+
utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err))
125+
return true
126+
}
127+
128+
err = c.processPVC(pvcNamespace, pvcName)
129+
if err == nil {
130+
c.queue.Forget(pvcKey)
131+
return true
132+
}
133+
134+
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %v", pvcKey, err))
135+
c.queue.AddRateLimited(pvcKey)
136+
137+
return true
138+
}
139+
140+
func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
141+
glog.V(4).Infof("Processing PVC %s/%s", pvcNamespace, pvcName)
142+
startTime := time.Now()
143+
defer func() {
144+
glog.V(4).Infof("Finished processing PVC %s/%s (%v)", pvcNamespace, pvcName, time.Now().Sub(startTime))
145+
}()
146+
147+
pvc, err := c.pvcLister.PersistentVolumeClaims(pvcNamespace).Get(pvcName)
148+
if apierrs.IsNotFound(err) {
149+
glog.V(4).Infof("PVC %s/%s not found, ignoring", pvcNamespace, pvcName)
150+
return nil
151+
}
152+
if err != nil {
153+
return err
154+
}
155+
156+
if volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc) {
157+
// PVC should be deleted. Check if it's used and remove finalizer if
158+
// it's not.
159+
isUsed, err := c.isBeingUsed(pvc)
160+
if err != nil {
161+
return err
162+
}
163+
if !isUsed {
164+
return c.removeFinalizer(pvc)
165+
}
166+
}
167+
168+
if !volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc) {
169+
// PVC is not being deleted -> it should have the finalizer. The
170+
// finalizer should be added by admission plugin, this is just to add
171+
// the finalizer to old PVCs that were created before the admission
172+
// plugin was enabled.
173+
return c.addFinalizer(pvc)
174+
}
175+
return nil
176+
}
177+
178+
func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
179+
claimClone := pvc.DeepCopy()
180+
volumeutil.AddProtectionFinalizer(claimClone)
181+
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
182+
if err != nil {
183+
glog.V(3).Infof("Error adding protection finalizer to PVC %s/%s: %v", pvc.Namespace, pvc.Name)
184+
return err
185+
}
186+
glog.V(3).Infof("Added protection finalizer to PVC %s/%s", pvc.Namespace, pvc.Name)
187+
return nil
188+
}
189+
190+
func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
191+
claimClone := pvc.DeepCopy()
192+
volumeutil.RemoveProtectionFinalizer(claimClone)
193+
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
194+
if err != nil {
195+
glog.V(3).Infof("Error removing protection finalizer from PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
196+
return err
197+
}
198+
glog.V(3).Infof("Removed protection finalizer from PVC %s/%s", pvc.Namespace, pvc.Name)
199+
return nil
200+
}
201+
202+
func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
203+
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
204+
if err != nil {
205+
return false, err
206+
}
207+
for _, pod := range pods {
208+
if pod.Spec.NodeName == "" {
209+
// This pod is not scheduled. We have a predicated in scheduler that
210+
// prevents scheduling pods with deletion timestamp, so we can be
211+
// pretty sure it won't be scheduled in parallel to this check.
212+
// Therefore this pod does not block the PVC from deletion.
213+
glog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
214+
continue
215+
}
216+
if volumehelper.IsPodTerminated(pod, pod.Status) {
217+
// This pod is being unmounted/detached or is already
218+
// unmounted/detached. It does not block the PVC from deletion.
219+
continue
220+
}
221+
for _, volume := range pod.Spec.Volumes {
222+
if volume.PersistentVolumeClaim == nil {
223+
continue
224+
}
225+
if volume.PersistentVolumeClaim.ClaimName == pvc.Name {
226+
glog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name)
227+
return true, nil
228+
}
229+
}
230+
}
231+
232+
glog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
233+
return false, nil
234+
}
235+
236+
// pvcAddedUpdated reacts to pvc added/updated/deleted events
237+
func (c *Controller) pvcAddedUpdated(obj interface{}) {
238+
pvc, ok := obj.(*v1.PersistentVolumeClaim)
239+
if !ok {
240+
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
241+
return
242+
}
243+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
244+
if err != nil {
245+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err))
246+
return
247+
}
248+
glog.V(4).Infof("Got event on PVC %s", key)
249+
250+
if (!volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc)) || (volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc)) {
251+
c.queue.Add(key)
252+
}
253+
}
254+
255+
// podAddedDeletedUpdated reacts to Pod events
256+
func (c *Controller) podAddedDeletedUpdated(obj interface{}, deleted bool) {
257+
pod, ok := obj.(*v1.Pod)
258+
if !ok {
259+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
260+
if !ok {
261+
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
262+
return
263+
}
264+
pod, ok = tombstone.Obj.(*v1.Pod)
265+
if !ok {
266+
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj))
267+
return
268+
}
269+
}
270+
271+
// Filter out pods that can't help us to remove a finalizer on PVC
272+
if !deleted && !volumehelper.IsPodTerminated(pod, pod.Status) && pod.Spec.NodeName != "" {
273+
return
274+
}
275+
276+
glog.V(4).Infof("Got event on pod %s/%s", pod.Namespace, pod.Name)
277+
278+
// Enqueue all PVCs that the pod uses
279+
for _, volume := range pod.Spec.Volumes {
280+
if volume.PersistentVolumeClaim != nil {
281+
c.queue.Add(pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName)
282+
}
283+
}
284+
}

0 commit comments

Comments
 (0)