@@ -5,6 +5,10 @@ import (
55 "fmt"
66 "time"
77
8+ "k8s.io/apimachinery/pkg/util/uuid"
9+ "k8s.io/client-go/tools/leaderelection"
10+ "k8s.io/client-go/tools/leaderelection/resourcelock"
11+
812 appmeshv1beta1 "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/apis/appmesh/v1beta1"
913 "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws"
1014 meshclientset "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/client/clientset/versioned"
@@ -29,7 +33,11 @@ import (
2933
3034const (
3135 //setting default threadiness to number of go-routines
32- DefaultThreadiness = 5
36+ DefaultThreadiness = 5
37+ DefaultElection = true
38+ DefaultElectionID = "app-mesh-controller-leader"
39+ DefaultElectionNamespace = ""
40+
3341 controllerAgentName = "app-mesh-controller"
3442 meshDeletionFinalizerName = "meshDeletion.finalizers.appmesh.k8s.aws"
3543 virtualNodeDeletionFinalizerName = "virtualNodeDeletion.finalizers.appmesh.k8s.aws"
@@ -73,6 +81,18 @@ type Controller struct {
7381
7482 // stats records mesh Prometheus metrics
7583 stats * metrics.Recorder
84+
85+ // LeaderElection determines whether or not to use leader election when
86+ // starting the manager.
87+ leaderElection bool
88+
89+ // LeaderElectionID determines the name of the configmap that leader election
90+ // will use for holding the leader lock.
91+ leaderElectionID string
92+
93+ // LeaderElectionNamespace determines the namespace in which the leader
94+ // election configmap will be created.
95+ leaderElectionNamespace string
7696}
7797
7898func NewController (
@@ -83,7 +103,10 @@ func NewController(
83103 meshInformer meshinformers.MeshInformer ,
84104 virtualNodeInformer meshinformers.VirtualNodeInformer ,
85105 virtualServiceInformer meshinformers.VirtualServiceInformer ,
86- stats * metrics.Recorder ) (* Controller , error ) {
106+ stats * metrics.Recorder ,
107+ leaderElection bool ,
108+ leaderElectionID string ,
109+ leaderElectionNamespace string ) (* Controller , error ) {
87110
88111 utilruntime .Must (meshscheme .AddToScheme (scheme .Scheme ))
89112 klog .V (4 ).Info ("Creating event broadcaster" )
@@ -93,24 +116,27 @@ func NewController(
93116 recorder := eventBroadcaster .NewRecorder (scheme .Scheme , corev1.EventSource {Component : controllerAgentName })
94117
95118 controller := & Controller {
96- name : controllerAgentName ,
97- cloud : cloud ,
98- kubeclientset : kubeclientset ,
99- meshclientset : meshclientset ,
100- podsLister : podInformer .Lister (),
101- podsSynced : podInformer .Informer ().HasSynced ,
102- meshLister : meshInformer .Lister (),
103- meshSynced : meshInformer .Informer ().HasSynced ,
104- virtualNodeLister : virtualNodeInformer .Lister (),
105- virtualNodeSynced : virtualNodeInformer .Informer ().HasSynced ,
106- virtualServiceLister : virtualServiceInformer .Lister (),
107- virtualServiceSynced : virtualServiceInformer .Informer ().HasSynced ,
108- mq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
109- nq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
110- sq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
111- pq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
112- recorder : recorder ,
113- stats : stats ,
119+ name : controllerAgentName ,
120+ cloud : cloud ,
121+ kubeclientset : kubeclientset ,
122+ meshclientset : meshclientset ,
123+ podsLister : podInformer .Lister (),
124+ podsSynced : podInformer .Informer ().HasSynced ,
125+ meshLister : meshInformer .Lister (),
126+ meshSynced : meshInformer .Informer ().HasSynced ,
127+ virtualNodeLister : virtualNodeInformer .Lister (),
128+ virtualNodeSynced : virtualNodeInformer .Informer ().HasSynced ,
129+ virtualServiceLister : virtualServiceInformer .Lister (),
130+ virtualServiceSynced : virtualServiceInformer .Informer ().HasSynced ,
131+ mq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
132+ nq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
133+ sq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
134+ pq : workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ()),
135+ recorder : recorder ,
136+ stats : stats ,
137+ leaderElection : leaderElection ,
138+ leaderElectionID : leaderElectionID ,
139+ leaderElectionNamespace : leaderElectionNamespace ,
114140 }
115141
116142 podInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
@@ -198,21 +224,76 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) error {
198224 return fmt .Errorf ("failed to wait for caches to sync" )
199225 }
200226
227+ ctx , cancel := context .WithCancel (context .Background ())
228+ go func () {
229+ select {
230+ case <- stopCh :
231+ cancel ()
232+ case <- ctx .Done ():
233+ }
234+ }()
235+ if c .leaderElection {
236+ return c .runWorkersWithLeaderElection (ctx , threadiness )
237+ }
238+ c .runWorkers (ctx , threadiness )
239+ return nil
240+ }
241+
242+ func (c * Controller ) runWorkersWithLeaderElection (ctx context.Context , threadiness int ) error {
243+ leaderElectionNamespace := c .leaderElectionNamespace
244+ if leaderElectionNamespace == "" {
245+ var err error
246+ if leaderElectionNamespace , err = getInClusterNamespace (); err != nil {
247+ return err
248+ }
249+ }
250+
251+ leaderElectionIdentity := string (uuid .NewUUID ())
252+ leaderElectionLock , err := resourcelock .New (resourcelock .ConfigMapsResourceLock ,
253+ leaderElectionNamespace ,
254+ c .leaderElectionID ,
255+ c .kubeclientset .CoreV1 (),
256+ c .kubeclientset .CoordinationV1 (),
257+ resourcelock.ResourceLockConfig {
258+ Identity : leaderElectionIdentity ,
259+ })
260+ if err != nil {
261+ return err
262+ }
263+ elector , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
264+ Lock : leaderElectionLock ,
265+ LeaseDuration : 60 * time .Second ,
266+ RenewDeadline : 15 * time .Second ,
267+ RetryPeriod : 5 * time .Second ,
268+ Callbacks : leaderelection.LeaderCallbacks {
269+ OnStartedLeading : func (ctx context.Context ) {
270+ c .runWorkers (ctx , threadiness )
271+ },
272+ OnStoppedLeading : func () {
273+ klog .Info ("losing leader" )
274+ },
275+ },
276+ })
277+ if err != nil {
278+ return err
279+ }
280+ elector .Run (ctx )
281+ return nil
282+ }
283+
284+ func (c * Controller ) runWorkers (ctx context.Context , threadiness int ) {
201285 klog .Info ("Starting workers" )
202286 // Launch workers to process Mesh resources
203287 for i := 0 ; i < threadiness ; i ++ {
204- go wait .Until (c .meshWorker , time .Second , stopCh )
205- go wait .Until (c .vNodeWorker , time .Second , stopCh )
206- go wait .Until (c .vServiceWorker , time .Second , stopCh )
207- go wait .Until (c .podWorker , time .Second , stopCh )
208- go wait .Until (c .cloudmapReconciler , 1 * time .Minute , stopCh )
288+ go wait .Until (c .meshWorker , time .Second , ctx . Done () )
289+ go wait .Until (c .vNodeWorker , time .Second , ctx . Done () )
290+ go wait .Until (c .vServiceWorker , time .Second , ctx . Done () )
291+ go wait .Until (c .podWorker , time .Second , ctx . Done () )
292+ go wait .Until (c .cloudmapReconciler , 1 * time .Minute , ctx . Done () )
209293 }
210-
211294 klog .Info ("Started workers" )
212- <- stopCh
295+ <- ctx . Done ()
213296 klog .Info ("Shutting down workers" )
214-
215- return nil
216297}
217298
218299// podAdded adds the pods endpoint to matching CloudMap Services.
0 commit comments