Skip to content

Commit

Permalink
implement priority queue for handlers
Browse files Browse the repository at this point in the history
Signed-off-by: Troy Connor <[email protected]>
  • Loading branch information
troy0820 committed Feb 19, 2025
1 parent ba53477 commit eaf978d
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 9 deletions.
35 changes: 29 additions & 6 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -52,25 +53,47 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{

item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}}

priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
if !isPriorityQueue {
q.Add(item)
return
}
addToPriorityQueueCreate(priorityQueue, evt, item)
}

// Update implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
switch {
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
}}

priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
if !isPriorityQueue {
q.Add(item)
return
}
addToPriorityQueueUpdate(priorityQueue, evt, item)
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
}}

priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
if !isPriorityQueue {
q.Add(item)
return
}
addToPriorityQueueUpdate(priorityQueue, evt, item)
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
// objects and both sets of Requests are enqueue.
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
return TypedEnqueueRequestsFromMapFunc(fn)
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
}

// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
//
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
}

// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created
Expand Down
26 changes: 26 additions & 0 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ var _ EventHandler = Funcs{}
// Funcs implements eventhandler.
type Funcs = TypedFuncs[client.Object, reconcile.Request]

// NewFuncsWithLowPriority creates a Funcs Handler wrapped in
// WithLowPriorityWhenUnchanged
func NewFuncsWithLowPriority(u Funcs) EventHandler {
return WithLowPriorityWhenUnchanged(u)
}

// TypedFuncs implements eventhandler.
//
// TypedFuncs is experimental and subject to future change.
Expand Down Expand Up @@ -199,3 +205,23 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}

// addToPriorityQueueCreate adds the reconcile.Request to the priorityqueue in the handler
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToPriorityQueueCreate[T client.Object](q priorityqueue.PriorityQueue[reconcile.Request], evt event.TypedCreateEvent[T], item reconcile.Request) {
var priority int
if isObjectUnchanged(evt) {
priority = LowPriority
}
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}

// addToPriorityQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToPriorityQueueUpdate[T client.Object](q priorityqueue.PriorityQueue[reconcile.Request], evt event.TypedUpdateEvent[T], item reconcile.Request) {
var priority int
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}
122 changes: 121 additions & 1 deletion pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
CreationTimestamp: metav1.Now(),
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
ResourceVersion: "1",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Create(ctx, event.CreateEvent{
Expand All @@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Update(ctx, event.UpdateEvent{
Expand All @@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})
})

It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})
})
})

type fakePriorityQueue struct {
Expand Down

0 comments on commit eaf978d

Please sign in to comment.