diff --git a/events/controller.go b/events/controller.go index 67a085f..493614c 100644 --- a/events/controller.go +++ b/events/controller.go @@ -8,48 +8,57 @@ import ( pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" + "github.com/awslabs/operatorpkg/singleton" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) type Controller[T client.Object] struct { - gvk schema.GroupVersionKind - startTime time.Time - kubeClient client.Client - EventCount pmetrics.CounterMetric + gvk schema.GroupVersionKind + startTime time.Time + kubeClient client.Client + EventCount pmetrics.CounterMetric + EventWatchChannel <-chan watch.Event } -func NewController[T client.Object](client client.Client, clock clock.Clock) *Controller[T] { +func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] { gvk := object.GVK(object.New[T]()) return &Controller[T]{ gvk: gvk, startTime: clock.Now(), kubeClient: client, EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), + EventWatchChannel: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ + // Only reconcile on the object kind we care about + FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", object.GVK(&corev1.Node{}).Kind, object.GVK(&corev1.Node{}).GroupVersion().String()), + })).ResultChan(), } } func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). - For(&v1.Event{}, builder.WithPredicates(predicate.NewTypedPredicateFuncs(func(o client.Object) bool { - // Only reconcile on the object kind we care about - event := o.(*v1.Event) - return event.InvolvedObject.Kind == c.gvk.Kind && event.InvolvedObject.APIVersion == c.gvk.GroupVersion().String() - }))). - WithOptions(controller.Options{MaxConcurrentReconciles: 10}). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). - Complete(reconcile.AsReconciler(m.GetClient(), c)) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } -func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { +func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) { + e := <-c.EventWatchChannel + if e.Object == nil { + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil + } + event := e.Object.(*v1.Event) + // We check if the event was created in the lifetime of this controller // since we don't duplicate metrics on controller restart or lease handover if c.startTime.Before(event.LastTimestamp.Time) { @@ -59,5 +68,5 @@ func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconci }) } - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } diff --git a/events/suite_test.go b/events/suite_test.go index ed4b161..0e1c272 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -9,6 +9,7 @@ import ( "github.com/awslabs/operatorpkg/events" pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" + "github.com/awslabs/operatorpkg/singleton" "github.com/awslabs/operatorpkg/test" . "github.com/awslabs/operatorpkg/test/expectations" "github.com/onsi/ginkgo/v2" @@ -19,10 +20,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + crfake "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -38,6 +41,7 @@ var ctx context.Context var fakeClock *clock.FakeClock var controller *events.Controller[*test.CustomObject] var kubeClient client.Client +var eventChannel chan watch.Event func Test(t *testing.T) { lo.Must0(SchemeBuilder.AddToScheme(scheme.Scheme)) @@ -46,13 +50,18 @@ func Test(t *testing.T) { } var _ = BeforeSuite(func() { + ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) + fakeClock = clock.NewFakeClock(time.Now()) - kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { + kubeClient = crfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { evt := o.(*corev1.Event) return []string{evt.InvolvedObject.Kind} }).Build() - controller = events.NewController[*test.CustomObject](kubeClient, fakeClock) - ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) + + client := fake.NewSimpleClientset() + eventChannel = make(chan watch.Event, 1000) + controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, client) + controller.EventWatchChannel = eventChannel }) var _ = Describe("Controller", func() { @@ -70,8 +79,11 @@ var _ = Describe("Controller", func() { // expect an metrics for custom object to be zero, waiting on controller reconcile Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil()) + eventChannel <- watch.Event{ + Object: events[i], + } // reconcile on the event - _, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -87,8 +99,11 @@ var _ = Describe("Controller", func() { // expect an metrics for custom object to be zero, waiting on controller reconcile Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil()) + eventChannel <- watch.Event{ + Object: event, + } // reconcile on the event - _, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect not have an emitted metric to for the event @@ -98,8 +113,11 @@ var _ = Describe("Controller", func() { event.LastTimestamp.Time = time.Now().Add(-30 * time.Minute) ExpectApplied(ctx, kubeClient, event) + eventChannel <- watch.Event{ + Object: event, + } // reconcile on the event - _, err = reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -110,12 +128,14 @@ var _ = Describe("Controller", func() { func createEvent(name string, eventType string, reason string) *corev1.Event { return &corev1.Event{ ObjectMeta: metav1.ObjectMeta{ - Name: test.RandomName(), + Name: test.RandomName(), + Namespace: "default", }, InvolvedObject: corev1.ObjectReference{ - Namespace: "default", - Name: name, - Kind: object.GVK(&test.CustomObject{}).Kind, + Namespace: "default", + Name: name, + Kind: object.GVK(&test.CustomObject{}).Kind, + APIVersion: object.GVK(&test.CustomObject{}).GroupVersion().String(), }, LastTimestamp: metav1.Time{Time: time.Now().Add(30 * time.Minute)}, Type: eventType,