Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Optimize event controller cache hydration for high-volume clusters #124

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions events/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,57 @@ import (

pmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type Controller[T client.Object] struct {
gvk schema.GroupVersionKind
startTime time.Time
kubeClient client.Client
EventCount pmetrics.CounterMetric
gvk schema.GroupVersionKind
startTime time.Time
kubeClient client.Client
EventCount pmetrics.CounterMetric
EventWatchChannel <-chan watch.Event
}

func NewController[T client.Object](client client.Client, clock clock.Clock) *Controller[T] {
func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] {
gvk := object.GVK(object.New[T]())
return &Controller[T]{
gvk: gvk,
startTime: clock.Now(),
kubeClient: client,
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
EventWatchChannel: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{
// Only reconcile on the object kind we care about
FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", object.GVK(&corev1.Node{}).Kind, object.GVK(&corev1.Node{}).GroupVersion().String()),
})).ResultChan(),
}
}

func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
For(&v1.Event{}, builder.WithPredicates(predicate.NewTypedPredicateFuncs(func(o client.Object) bool {
// Only reconcile on the object kind we care about
event := o.(*v1.Event)
return event.InvolvedObject.Kind == c.gvk.Kind && event.InvolvedObject.APIVersion == c.gvk.GroupVersion().String()
}))).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
Complete(reconcile.AsReconciler(m.GetClient(), c))
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}

func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {
func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) {
e := <-c.EventWatchChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took a look at the underlying code and the implementation of the ResultChan uses an unbuffered channel -- that means new writes are going to get blocked on reads here -- I'm concerned that we may block the watch stream if we don't either pull these events into a buffered channel OR use multiple goroutines to process these events.

I'm trying to think of a good way to handle this while keeping the same interface so that we keep the same logging behavior and metrics -- nothing comes to mind immediately, but I think it's something that we should explore. Minimally, we can kick off a goroutine in the register that pulls data off of the watch channel and puts it in a separate buffered channel -- another option is to keep a static cache of events as we see them, enqueue a reconcile.Request and then read from that local cache (this is basically exactly what controller-runtime does with a read cache without the deletions)

if e.Object == nil {
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
event := e.Object.(*v1.Event)

// We check if the event was created in the lifetime of this controller
// since we don't duplicate metrics on controller restart or lease handover
if c.startTime.Before(event.LastTimestamp.Time) {
Expand All @@ -59,5 +68,5 @@ func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconci
})
}

return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
42 changes: 31 additions & 11 deletions events/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/awslabs/operatorpkg/events"
pmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/singleton"
"github.com/awslabs/operatorpkg/test"
. "github.com/awslabs/operatorpkg/test/expectations"
"github.com/onsi/ginkgo/v2"
Expand All @@ -19,10 +20,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
crfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand All @@ -38,6 +41,7 @@ var ctx context.Context
var fakeClock *clock.FakeClock
var controller *events.Controller[*test.CustomObject]
var kubeClient client.Client
var eventChannel chan watch.Event

func Test(t *testing.T) {
lo.Must0(SchemeBuilder.AddToScheme(scheme.Scheme))
Expand All @@ -46,13 +50,18 @@ func Test(t *testing.T) {
}

var _ = BeforeSuite(func() {
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)

fakeClock = clock.NewFakeClock(time.Now())
kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
kubeClient = crfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
evt := o.(*corev1.Event)
return []string{evt.InvolvedObject.Kind}
}).Build()
controller = events.NewController[*test.CustomObject](kubeClient, fakeClock)
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)

client := fake.NewSimpleClientset()
eventChannel = make(chan watch.Event, 1000)
controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, client)
controller.EventWatchChannel = eventChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

})

var _ = Describe("Controller", func() {
Expand All @@ -70,8 +79,11 @@ var _ = Describe("Controller", func() {
// expect an metrics for custom object to be zero, waiting on controller reconcile
Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil())

eventChannel <- watch.Event{
Object: events[i],
}
// reconcile on the event
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])})
_, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())

// expect an emitted metric to for the event
Expand All @@ -87,8 +99,11 @@ var _ = Describe("Controller", func() {
// expect an metrics for custom object to be zero, waiting on controller reconcile
Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil())

eventChannel <- watch.Event{
Object: event,
}
// reconcile on the event
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
_, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
Expect(err).ToNot(HaveOccurred())

// expect not have an emitted metric to for the event
Expand All @@ -98,8 +113,11 @@ var _ = Describe("Controller", func() {
event.LastTimestamp.Time = time.Now().Add(-30 * time.Minute)
ExpectApplied(ctx, kubeClient, event)

eventChannel <- watch.Event{
Object: event,
}
// reconcile on the event
_, err = reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
_, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
Expect(err).ToNot(HaveOccurred())

// expect an emitted metric to for the event
Expand All @@ -110,12 +128,14 @@ var _ = Describe("Controller", func() {
func createEvent(name string, eventType string, reason string) *corev1.Event {
return &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: test.RandomName(),
Name: test.RandomName(),
Namespace: "default",
},
InvolvedObject: corev1.ObjectReference{
Namespace: "default",
Name: name,
Kind: object.GVK(&test.CustomObject{}).Kind,
Namespace: "default",
Name: name,
Kind: object.GVK(&test.CustomObject{}).Kind,
APIVersion: object.GVK(&test.CustomObject{}).GroupVersion().String(),
},
LastTimestamp: metav1.Time{Time: time.Now().Add(30 * time.Minute)},
Type: eventType,
Expand Down