From 062432a7c88b33ce672fceea26ed2af8b26113c4 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Thu, 23 Jan 2025 16:53:59 -0800 Subject: [PATCH 01/21] Avoid Event cache hydration by using clinet go watcher --- events/controller.go | 27 +++++++++++++++------------ events/suite_test.go | 11 +++++++++-- singleton/controller.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/events/controller.go b/events/controller.go index 67a085f..7031621 100644 --- a/events/controller.go +++ b/events/controller.go @@ -8,15 +8,17 @@ import ( pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" + "github.com/awslabs/operatorpkg/singleton" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -25,31 +27,32 @@ type Controller[T client.Object] struct { startTime time.Time kubeClient client.Client EventCount pmetrics.CounterMetric + eventWatch watch.Interface } -func NewController[T client.Object](client client.Client, clock clock.Clock) *Controller[T] { +func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] { gvk := object.GVK(object.New[T]()) return &Controller[T]{ gvk: gvk, startTime: clock.Now(), kubeClient: client, EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), + eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{})), } } -func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error { +func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). - For(&v1.Event{}, builder.WithPredicates(predicate.NewTypedPredicateFuncs(func(o client.Object) bool { - // Only reconcile on the object kind we care about - event := o.(*v1.Event) - return event.InvolvedObject.Kind == c.gvk.Kind && event.InvolvedObject.APIVersion == c.gvk.GroupVersion().String() - }))). - WithOptions(controller.Options{MaxConcurrentReconciles: 10}). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). - Complete(reconcile.AsReconciler(m.GetClient(), c)) + WatchesRawSource(singleton.ChannelSource[*v1.Event](ctx, c.eventWatch.ResultChan())). + Complete(reconcile.AsReconciler(c.kubeClient, c)) } func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { + if event.InvolvedObject.Kind != c.gvk.Kind && event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() { + return reconcile.Result{}, nil + } + // We check if the event was created in the lifetime of this controller // since we don't duplicate metrics on controller restart or lease handover if c.startTime.Before(event.LastTimestamp.Time) { diff --git a/events/suite_test.go b/events/suite_test.go index ed4b161..6b41b74 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -19,10 +19,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -46,13 +48,18 @@ func Test(t *testing.T) { } var _ = BeforeSuite(func() { + ctx = context.Background() + ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) + fakeClock = clock.NewFakeClock(time.Now()) kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { evt := o.(*corev1.Event) return []string{evt.InvolvedObject.Kind} }).Build() - controller = events.NewController[*test.CustomObject](kubeClient, fakeClock) - ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) + environment := envtest.Environment{Scheme: scheme.Scheme} + _ = lo.Must(environment.Start()) + + controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config)) }) var _ = Describe("Controller", func() { diff --git a/singleton/controller.go b/singleton/controller.go index d1cb1dc..ecb28eb 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -4,7 +4,9 @@ import ( "context" "time" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -36,3 +38,33 @@ func Source() source.Source { }, }) } + +func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch.Event) source.Source { + eventSource := make(chan event.GenericEvent, 1) + + go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) { + + for { + select { + case delta, ok := <-objectChan: + if !ok { + return + } + // Convert the delta to GenericEvent + eventSource <- event.GenericEvent{ + Object: delta.Object.(T), + } + case <-ctx.Done(): + return + } + } + }(ctx, objectChan, eventSource) + + return source.Channel(eventSource, handler.Funcs{ + GenericFunc: func(_ context.Context, event event.GenericEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + queue.Add(reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(event.Object), + }) + }, + }) +} From 1b94c3b3ae5293356113b252d6f8ec2b30069a47 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Thu, 23 Jan 2025 16:55:57 -0800 Subject: [PATCH 02/21] Fix conditional --- events/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/controller.go b/events/controller.go index 7031621..e12ef09 100644 --- a/events/controller.go +++ b/events/controller.go @@ -49,7 +49,7 @@ func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { } func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { - if event.InvolvedObject.Kind != c.gvk.Kind && event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() { + if event.InvolvedObject.Kind != c.gvk.Kind || event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() { return reconcile.Result{}, nil } From fb096daf5f4da17ffb4b6747d33888b92fa6476b Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Thu, 23 Jan 2025 17:54:23 -0800 Subject: [PATCH 03/21] Update tests --- events/suite_test.go | 17 +++++++---------- singleton/controller.go | 1 - 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/events/suite_test.go b/events/suite_test.go index 6b41b74..71f4c33 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -23,7 +23,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -48,16 +47,12 @@ func Test(t *testing.T) { } var _ = BeforeSuite(func() { - ctx = context.Background() ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) fakeClock = clock.NewFakeClock(time.Now()) - kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { - evt := o.(*corev1.Event) - return []string{evt.InvolvedObject.Kind} - }).Build() environment := envtest.Environment{Scheme: scheme.Scheme} _ = lo.Must(environment.Start()) + kubeClient = lo.Must(client.New(environment.Config, client.Options{Scheme: scheme.Scheme})) controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config)) }) @@ -117,12 +112,14 @@ var _ = Describe("Controller", func() { func createEvent(name string, eventType string, reason string) *corev1.Event { return &corev1.Event{ ObjectMeta: metav1.ObjectMeta{ - Name: test.RandomName(), + Name: test.RandomName(), + Namespace: "default", }, InvolvedObject: corev1.ObjectReference{ - Namespace: "default", - Name: name, - Kind: object.GVK(&test.CustomObject{}).Kind, + Namespace: "default", + Name: name, + Kind: object.GVK(&test.CustomObject{}).Kind, + APIVersion: object.GVK(&test.CustomObject{}).GroupVersion().String(), }, LastTimestamp: metav1.Time{Time: time.Now().Add(30 * time.Minute)}, Type: eventType, diff --git a/singleton/controller.go b/singleton/controller.go index ecb28eb..e70a707 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -43,7 +43,6 @@ func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch eventSource := make(chan event.GenericEvent, 1) go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) { - for { select { case delta, ok := <-objectChan: From 6d06c8bf0c92d5c9fd423a740e4b454780d0944e Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 06:49:59 -0800 Subject: [PATCH 04/21] Add kubebuilder installation for presubmit --- .github/actions/install-deps/actions.yaml | 30 +++++++++++++++++++++++ .github/toolchain.sh | 30 +++++++++++++++++++++++ .github/workflows/presubmit.yaml | 3 +++ Makefile | 6 ++++- singleton/controller.go | 2 +- 5 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 .github/actions/install-deps/actions.yaml create mode 100755 .github/toolchain.sh diff --git a/.github/actions/install-deps/actions.yaml b/.github/actions/install-deps/actions.yaml new file mode 100644 index 0000000..0307801 --- /dev/null +++ b/.github/actions/install-deps/actions.yaml @@ -0,0 +1,30 @@ +name: InstallDependencies +description: 'Installs Go Downloads and installs Karpenter Dependencies' +inputs: + k8sVersion: + description: Kubernetes version to use when installing the toolchain + default: "1.24.x" +runs: + using: "composite" + steps: + - uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0 + id: setup-go + with: + go-version-file: go.mod + check-latest: true + cache-dependency-path: "**/go.sum" + # Root path permission workaround for caching https://github.com/actions/cache/issues/845#issuecomment-1252594999 + - run: sudo chown "$USER" /usr/local + shell: bash + - uses: actions/cache@1bd1e32a3bdc45362d1e726936510720a7c30a57 # v4.2.0 + id: cache-toolchain + with: + path: | + /usr/local/kubebuilder/bin + ~/go/bin + # Added go version to compensate for this issue with govulncheck: https://github.com/golang/go/issues/65590. Could re-evaluate if this is necessary once the + # upstream go issue is corrected and if this is causing too many cache misses. + key: ${{ runner.os }}-${{ inputs.k8sVersion }}-${{ steps.setup-go.outputs.go-version }}-toolchain-cache-${{ hashFiles('hack/toolchain.sh') }} + - if: ${{ steps.cache-toolchain.outputs.cache-hit != 'true' }} + shell: bash + run: K8S_VERSION=${{ inputs.k8sVersion }} make toolchain diff --git a/.github/toolchain.sh b/.github/toolchain.sh new file mode 100755 index 0000000..c4f0439 --- /dev/null +++ b/.github/toolchain.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +K8S_VERSION="${K8S_VERSION:="1.31.x"}" +KUBEBUILDER_ASSETS="/usr/local/kubebuilder/bin" + +main() { + kubebuilder +} + +kubebuilder() { + if ! mkdir -p ${KUBEBUILDER_ASSETS}; then + sudo mkdir -p ${KUBEBUILDER_ASSETS} + sudo chown $(whoami) ${KUBEBUILDER_ASSETS} + fi + arch=$(go env GOARCH) + ln -sf $(setup-envtest use -p path "${K8S_VERSION}" --arch="${arch}" --bin-dir="${KUBEBUILDER_ASSETS}")/* ${KUBEBUILDER_ASSETS} + find $KUBEBUILDER_ASSETS + + # Install latest binaries for 1.25.x (contains CEL fix) + if [[ "${K8S_VERSION}" = "1.25.x" ]] && [[ "$OSTYPE" == "linux"* ]]; then + for binary in 'kube-apiserver' 'kubectl'; do + rm $KUBEBUILDER_ASSETS/$binary + wget -P $KUBEBUILDER_ASSETS dl.k8s.io/v1.25.16/bin/linux/${arch}/${binary} + chmod +x $KUBEBUILDER_ASSETS/$binary + done + fi +} + +main "$@" diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index 90498b9..e6f50f8 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -9,4 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/install-deps + with: + k8sVersion: 1.31 - run: make presubmit \ No newline at end of file diff --git a/Makefile b/Makefile index afb5087..8b88b75 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,8 @@ verify: ## .PHONY: test test: ## - go test ./... \ No newline at end of file + go test ./... + +.PHONY: toolchain +toolchain: ## + ./.github/toolchain.sh diff --git a/singleton/controller.go b/singleton/controller.go index e70a707..67ad3fd 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -40,7 +40,7 @@ func Source() source.Source { } func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch.Event) source.Source { - eventSource := make(chan event.GenericEvent, 1) + eventSource := make(chan event.GenericEvent, 1000) go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) { for { From d39b60d132be70e3ff4a9a5bcfbd5c4a7cac7e26 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 07:45:24 -0800 Subject: [PATCH 05/21] Fix file name typo --- .github/actions/install-deps/{actions.yaml => action.yaml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/actions/install-deps/{actions.yaml => action.yaml} (100%) diff --git a/.github/actions/install-deps/actions.yaml b/.github/actions/install-deps/action.yaml similarity index 100% rename from .github/actions/install-deps/actions.yaml rename to .github/actions/install-deps/action.yaml From 0289cab5d71e6a5054f337ca83139967f1faff07 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 07:47:40 -0800 Subject: [PATCH 06/21] Add setup-envtest --- .github/toolchain.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/toolchain.sh b/.github/toolchain.sh index c4f0439..98f3667 100755 --- a/.github/toolchain.sh +++ b/.github/toolchain.sh @@ -5,6 +5,8 @@ K8S_VERSION="${K8S_VERSION:="1.31.x"}" KUBEBUILDER_ASSETS="/usr/local/kubebuilder/bin" main() { + go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest + kubebuilder } From 9ab9d62621b967762dfb9bc1f50e85d8bf36fadd Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 07:58:25 -0800 Subject: [PATCH 07/21] Clean-up for presubmit --- .github/actions/install-deps/action.yaml | 10 ---------- .github/workflows/presubmit.yaml | 6 +++++- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/.github/actions/install-deps/action.yaml b/.github/actions/install-deps/action.yaml index 0307801..629e06c 100644 --- a/.github/actions/install-deps/action.yaml +++ b/.github/actions/install-deps/action.yaml @@ -13,18 +13,8 @@ runs: go-version-file: go.mod check-latest: true cache-dependency-path: "**/go.sum" - # Root path permission workaround for caching https://github.com/actions/cache/issues/845#issuecomment-1252594999 - run: sudo chown "$USER" /usr/local shell: bash - - uses: actions/cache@1bd1e32a3bdc45362d1e726936510720a7c30a57 # v4.2.0 - id: cache-toolchain - with: - path: | - /usr/local/kubebuilder/bin - ~/go/bin - # Added go version to compensate for this issue with govulncheck: https://github.com/golang/go/issues/65590. Could re-evaluate if this is necessary once the - # upstream go issue is corrected and if this is causing too many cache misses. - key: ${{ runner.os }}-${{ inputs.k8sVersion }}-${{ steps.setup-go.outputs.go-version }}-toolchain-cache-${{ hashFiles('hack/toolchain.sh') }} - if: ${{ steps.cache-toolchain.outputs.cache-hit != 'true' }} shell: bash run: K8S_VERSION=${{ inputs.k8sVersion }} make toolchain diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index e6f50f8..0d6492b 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -7,9 +7,13 @@ on: jobs: presubmit: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + k8sVersion: ["1.25.x", "1.26.x", "1.27.x", "1.28.x", "1.29.x", "1.30.x", "1.31.x"] steps: - uses: actions/checkout@v4 - uses: ./.github/actions/install-deps with: - k8sVersion: 1.31 + k8sVersion: ${{ matrix.k8sVersion }} - run: make presubmit \ No newline at end of file From e7be7d08fd8ad767081694572082c60c3a6e17ce Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 07:58:25 -0800 Subject: [PATCH 08/21] Clean-up for presubmit --- .github/workflows/presubmit.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index 0d6492b..ac39dcd 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -4,8 +4,12 @@ on: branches: [main] pull_request: workflow_dispatch: +permissions: + contents: read jobs: presubmit: + permissions: + issues: write runs-on: ubuntu-latest strategy: fail-fast: false From cc8fc7d57127251de4c61c9755ca37967b944f9b Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 07:58:25 -0800 Subject: [PATCH 09/21] Adding --- events/controller.go | 30 ++++++++++++++++-------------- events/suite_test.go | 7 ++++--- singleton/controller.go | 31 ------------------------------- 3 files changed, 20 insertions(+), 48 deletions(-) diff --git a/events/controller.go b/events/controller.go index e12ef09..7d724c4 100644 --- a/events/controller.go +++ b/events/controller.go @@ -37,29 +37,31 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c startTime: clock.Now(), kubeClient: client, EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), - eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{})), + eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()), + })), } } func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). - WatchesRawSource(singleton.ChannelSource[*v1.Event](ctx, c.eventWatch.ResultChan())). - Complete(reconcile.AsReconciler(c.kubeClient, c)) + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) } -func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { - if event.InvolvedObject.Kind != c.gvk.Kind || event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() { - return reconcile.Result{}, nil - } +func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) { + for e := range c.eventWatch.ResultChan() { + event := e.Object.(*v1.Event) - // We check if the event was created in the lifetime of this controller - // since we don't duplicate metrics on controller restart or lease handover - if c.startTime.Before(event.LastTimestamp.Time) { - c.EventCount.Inc(map[string]string{ - pmetrics.LabelType: event.Type, - pmetrics.LabelReason: event.Reason, - }) + // We check if the event was created in the lifetime of this controller + // since we don't duplicate metrics on controller restart or lease handover + if c.startTime.Before(event.LastTimestamp.Time) { + c.EventCount.Inc(map[string]string{ + pmetrics.LabelType: event.Type, + pmetrics.LabelReason: event.Reason, + }) + } } return reconcile.Result{}, nil diff --git a/events/suite_test.go b/events/suite_test.go index 71f4c33..9e8264c 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -9,6 +9,7 @@ import ( "github.com/awslabs/operatorpkg/events" pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" + "github.com/awslabs/operatorpkg/singleton" "github.com/awslabs/operatorpkg/test" . "github.com/awslabs/operatorpkg/test/expectations" "github.com/onsi/ginkgo/v2" @@ -73,7 +74,7 @@ var _ = Describe("Controller", func() { Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil()) // reconcile on the event - _, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -90,7 +91,7 @@ var _ = Describe("Controller", func() { Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil()) // reconcile on the event - _, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect not have an emitted metric to for the event @@ -101,7 +102,7 @@ var _ = Describe("Controller", func() { ExpectApplied(ctx, kubeClient, event) // reconcile on the event - _, err = reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event diff --git a/singleton/controller.go b/singleton/controller.go index 67ad3fd..d1cb1dc 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -4,9 +4,7 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -38,32 +36,3 @@ func Source() source.Source { }, }) } - -func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch.Event) source.Source { - eventSource := make(chan event.GenericEvent, 1000) - - go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) { - for { - select { - case delta, ok := <-objectChan: - if !ok { - return - } - // Convert the delta to GenericEvent - eventSource <- event.GenericEvent{ - Object: delta.Object.(T), - } - case <-ctx.Done(): - return - } - } - }(ctx, objectChan, eventSource) - - return source.Channel(eventSource, handler.Funcs{ - GenericFunc: func(_ context.Context, event event.GenericEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - queue.Add(reconcile.Request{ - NamespacedName: client.ObjectKeyFromObject(event.Object), - }) - }, - }) -} From cfb097fbe2b3e6f9173c35b13e3701e3bcb5b0ba Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 10/21] Removed the source queue and pulled object from the channel --- events/controller.go | 26 +++++++++++--------------- singleton/controller.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/events/controller.go b/events/controller.go index 7d724c4..1fdd1da 100644 --- a/events/controller.go +++ b/events/controller.go @@ -27,7 +27,7 @@ type Controller[T client.Object] struct { startTime time.Time kubeClient client.Client EventCount pmetrics.CounterMetric - eventWatch watch.Interface + EventWatch watch.Interface } func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] { @@ -37,7 +37,7 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c startTime: clock.Now(), kubeClient: client, EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), - eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ + EventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()), })), } @@ -47,21 +47,17 @@ func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). WatchesRawSource(singleton.Source()). - Complete(singleton.AsReconciler(c)) + Complete(singleton.AsChannelObjectReconciler(c.EventWatch.ResultChan(), c)) } -func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) { - for e := range c.eventWatch.ResultChan() { - event := e.Object.(*v1.Event) - - // We check if the event was created in the lifetime of this controller - // since we don't duplicate metrics on controller restart or lease handover - if c.startTime.Before(event.LastTimestamp.Time) { - c.EventCount.Inc(map[string]string{ - pmetrics.LabelType: event.Type, - pmetrics.LabelReason: event.Reason, - }) - } +func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { + // We check if the event was created in the lifetime of this controller + // since we don't duplicate metrics on controller restart or lease handover + if c.startTime.Before(event.LastTimestamp.Time) { + c.EventCount.Inc(map[string]string{ + pmetrics.LabelType: event.Type, + pmetrics.LabelReason: event.Reason, + }) } return reconcile.Result{}, nil diff --git a/singleton/controller.go b/singleton/controller.go index d1cb1dc..3da941d 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -2,9 +2,13 @@ package singleton import ( "context" + "math" "time" + "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -27,6 +31,38 @@ func AsReconciler(reconciler Reconciler) reconcile.Reconciler { }) } +type ChannalObjectReconciler[T client.Object] interface { + Reconcile(ctx context.Context, object T) (reconcile.Result, error) +} + +func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event, reconciler ChannalObjectReconciler[T]) reconcile.Reconciler { + return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) { + var errs error + var results []reconcile.Result + for event := range watchEvents { + res, err := reconciler.Reconcile(ctx, event.Object.(T)) + errs = multierr.Append(errs, err) + results = append(results, res) + + } + + var result reconcile.Result + min := time.Duration(math.MaxInt64) + for _, r := range results { + if r.IsZero() { + continue + } + if r.RequeueAfter < min { + min = r.RequeueAfter + result.RequeueAfter = min + result.Requeue = true + } + } + + return result, errs + }) +} + func Source() source.Source { eventSource := make(chan event.GenericEvent, 1) eventSource <- event.GenericEvent{} From 3e10d2476d0dbc62cda4b5efa4092895feb33e49 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 11/21] Removed the source queue and pulled object from the channel --- events/suite_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/events/suite_test.go b/events/suite_test.go index 9e8264c..1dba63b 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -39,6 +39,7 @@ var ( var ctx context.Context var fakeClock *clock.FakeClock var controller *events.Controller[*test.CustomObject] +var environment envtest.Environment var kubeClient client.Client func Test(t *testing.T) { @@ -51,13 +52,17 @@ var _ = BeforeSuite(func() { ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) fakeClock = clock.NewFakeClock(time.Now()) - environment := envtest.Environment{Scheme: scheme.Scheme} + environment = envtest.Environment{Scheme: scheme.Scheme} _ = lo.Must(environment.Start()) kubeClient = lo.Must(client.New(environment.Config, client.Options{Scheme: scheme.Scheme})) controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config)) }) +var _ = AfterSuite(func() { + environment.Stop() +}) + var _ = Describe("Controller", func() { BeforeEach(func() { controller.EventCount.Reset() @@ -74,7 +79,7 @@ var _ = Describe("Controller", func() { Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil()) // reconcile on the event - _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])}) + _, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -91,7 +96,7 @@ var _ = Describe("Controller", func() { Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil()) // reconcile on the event - _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect not have an emitted metric to for the event @@ -102,7 +107,7 @@ var _ = Describe("Controller", func() { ExpectApplied(ctx, kubeClient, event) // reconcile on the event - _, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err = singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event From f8ec3e9a1777886c4be961199cce1a3316332d86 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 12/21] Removed the source queue and pulled object from the channel --- singleton/controller.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/singleton/controller.go b/singleton/controller.go index 3da941d..6c05da1 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -39,12 +39,10 @@ func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event, return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) { var errs error var results []reconcile.Result - for event := range watchEvents { - res, err := reconciler.Reconcile(ctx, event.Object.(T)) - errs = multierr.Append(errs, err) - results = append(results, res) - - } + e := <-watchEvents + res, err := reconciler.Reconcile(ctx, e.Object.(T)) + errs = multierr.Append(errs, err) + results = append(results, res) var result reconcile.Result min := time.Duration(math.MaxInt64) From 589d83c01226de5d8fde3897f4e9c2dded48b8d1 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 13/21] Removed the source queue and pulled object from the channel --- events/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/controller.go b/events/controller.go index 1fdd1da..d69a5fb 100644 --- a/events/controller.go +++ b/events/controller.go @@ -60,5 +60,5 @@ func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconci }) } - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } From df22a8c0beef25b009fe9af02ba0eb5317c91f4c Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 14/21] Removed the source queue and pulled object from the channel --- events/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/events/controller.go b/events/controller.go index d69a5fb..7459b55 100644 --- a/events/controller.go +++ b/events/controller.go @@ -38,6 +38,7 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c kubeClient: client, EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), EventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ + // Only reconcile on the object kind we care about FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()), })), } From 8025df27a47b97340f5c3b82752c37a89aed2707 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 11:29:37 -0800 Subject: [PATCH 15/21] Removed the source queue and pulled object from the channel --- events/controller.go | 30 ++++++++++++------------------ events/suite_test.go | 35 +++++++++++++++++++++-------------- singleton/controller.go | 3 +++ 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/events/controller.go b/events/controller.go index 7459b55..19ec61c 100644 --- a/events/controller.go +++ b/events/controller.go @@ -9,12 +9,9 @@ import ( pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" "github.com/awslabs/operatorpkg/singleton" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -23,24 +20,21 @@ import ( ) type Controller[T client.Object] struct { - gvk schema.GroupVersionKind - startTime time.Time - kubeClient client.Client - EventCount pmetrics.CounterMetric - EventWatch watch.Interface + gvk schema.GroupVersionKind + startTime time.Time + kubeClient client.Client + EventCount pmetrics.CounterMetric + EventWatchChannel <-chan watch.Event } -func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] { +func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, channel <-chan watch.Event) *Controller[T] { gvk := object.GVK(object.New[T]()) return &Controller[T]{ - gvk: gvk, - startTime: clock.Now(), - kubeClient: client, - EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), - EventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ - // Only reconcile on the object kind we care about - FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()), - })), + gvk: gvk, + startTime: clock.Now(), + kubeClient: client, + EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), + EventWatchChannel: channel, } } @@ -48,7 +42,7 @@ func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). WatchesRawSource(singleton.Source()). - Complete(singleton.AsChannelObjectReconciler(c.EventWatch.ResultChan(), c)) + Complete(singleton.AsChannelObjectReconciler(c.EventWatchChannel, c)) } func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { diff --git a/events/suite_test.go b/events/suite_test.go index 1dba63b..8d49b90 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -20,11 +20,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/kubernetes" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" + ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -39,8 +39,8 @@ var ( var ctx context.Context var fakeClock *clock.FakeClock var controller *events.Controller[*test.CustomObject] -var environment envtest.Environment var kubeClient client.Client +var eventChannel chan watch.Event func Test(t *testing.T) { lo.Must0(SchemeBuilder.AddToScheme(scheme.Scheme)) @@ -52,15 +52,13 @@ var _ = BeforeSuite(func() { ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) fakeClock = clock.NewFakeClock(time.Now()) - environment = envtest.Environment{Scheme: scheme.Scheme} - _ = lo.Must(environment.Start()) - kubeClient = lo.Must(client.New(environment.Config, client.Options{Scheme: scheme.Scheme})) + kubeClient = ctrlfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { + evt := o.(*corev1.Event) + return []string{evt.InvolvedObject.Kind} + }).Build() - controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config)) -}) - -var _ = AfterSuite(func() { - environment.Stop() + eventChannel = make(chan watch.Event, 1000) + controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, eventChannel) }) var _ = Describe("Controller", func() { @@ -78,8 +76,11 @@ var _ = Describe("Controller", func() { // expect an metrics for custom object to be zero, waiting on controller reconcile Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil()) + eventChannel <- watch.Event{ + Object: events[i], + } // reconcile on the event - _, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{}) + _, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -95,8 +96,11 @@ var _ = Describe("Controller", func() { // expect an metrics for custom object to be zero, waiting on controller reconcile Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil()) + eventChannel <- watch.Event{ + Object: event, + } // reconcile on the event - _, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect not have an emitted metric to for the event @@ -106,8 +110,11 @@ var _ = Describe("Controller", func() { event.LastTimestamp.Time = time.Now().Add(-30 * time.Minute) ExpectApplied(ctx, kubeClient, event) + eventChannel <- watch.Event{ + Object: event, + } // reconcile on the event - _, err = singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err = singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event diff --git a/singleton/controller.go b/singleton/controller.go index 6c05da1..3feea85 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -40,6 +40,9 @@ func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event, var errs error var results []reconcile.Result e := <-watchEvents + if e.Object == nil { + return reconcile.Result{RequeueAfter: RequeueImmediately}, nil + } res, err := reconciler.Reconcile(ctx, e.Object.(T)) errs = multierr.Append(errs, err) results = append(results, res) From 13bc5434fd8f095443181feb4023d5c66f65b025 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 16/21] Drop controller-runtime deps --- .github/actions/install-deps/action.yaml | 20 --------------- .github/toolchain.sh | 32 ------------------------ .github/workflows/presubmit.yaml | 7 ------ Makefile | 4 --- 4 files changed, 63 deletions(-) delete mode 100644 .github/actions/install-deps/action.yaml delete mode 100755 .github/toolchain.sh diff --git a/.github/actions/install-deps/action.yaml b/.github/actions/install-deps/action.yaml deleted file mode 100644 index 629e06c..0000000 --- a/.github/actions/install-deps/action.yaml +++ /dev/null @@ -1,20 +0,0 @@ -name: InstallDependencies -description: 'Installs Go Downloads and installs Karpenter Dependencies' -inputs: - k8sVersion: - description: Kubernetes version to use when installing the toolchain - default: "1.24.x" -runs: - using: "composite" - steps: - - uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0 - id: setup-go - with: - go-version-file: go.mod - check-latest: true - cache-dependency-path: "**/go.sum" - - run: sudo chown "$USER" /usr/local - shell: bash - - if: ${{ steps.cache-toolchain.outputs.cache-hit != 'true' }} - shell: bash - run: K8S_VERSION=${{ inputs.k8sVersion }} make toolchain diff --git a/.github/toolchain.sh b/.github/toolchain.sh deleted file mode 100755 index 98f3667..0000000 --- a/.github/toolchain.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -K8S_VERSION="${K8S_VERSION:="1.31.x"}" -KUBEBUILDER_ASSETS="/usr/local/kubebuilder/bin" - -main() { - go install sigs.k8s.io/controller-runtime/tools/setup-envtest@latest - - kubebuilder -} - -kubebuilder() { - if ! mkdir -p ${KUBEBUILDER_ASSETS}; then - sudo mkdir -p ${KUBEBUILDER_ASSETS} - sudo chown $(whoami) ${KUBEBUILDER_ASSETS} - fi - arch=$(go env GOARCH) - ln -sf $(setup-envtest use -p path "${K8S_VERSION}" --arch="${arch}" --bin-dir="${KUBEBUILDER_ASSETS}")/* ${KUBEBUILDER_ASSETS} - find $KUBEBUILDER_ASSETS - - # Install latest binaries for 1.25.x (contains CEL fix) - if [[ "${K8S_VERSION}" = "1.25.x" ]] && [[ "$OSTYPE" == "linux"* ]]; then - for binary in 'kube-apiserver' 'kubectl'; do - rm $KUBEBUILDER_ASSETS/$binary - wget -P $KUBEBUILDER_ASSETS dl.k8s.io/v1.25.16/bin/linux/${arch}/${binary} - chmod +x $KUBEBUILDER_ASSETS/$binary - done - fi -} - -main "$@" diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index ac39dcd..135720a 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -11,13 +11,6 @@ jobs: permissions: issues: write runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - k8sVersion: ["1.25.x", "1.26.x", "1.27.x", "1.28.x", "1.29.x", "1.30.x", "1.31.x"] steps: - uses: actions/checkout@v4 - - uses: ./.github/actions/install-deps - with: - k8sVersion: ${{ matrix.k8sVersion }} - run: make presubmit \ No newline at end of file diff --git a/Makefile b/Makefile index 8b88b75..83dc1a5 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,3 @@ verify: ## .PHONY: test test: ## go test ./... - -.PHONY: toolchain -toolchain: ## - ./.github/toolchain.sh From a4cf242a20e443c54438eea18058569cd67465b6 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 17/21] Drop controller-runtime deps --- .github/workflows/presubmit.yaml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/presubmit.yaml b/.github/workflows/presubmit.yaml index 135720a..90498b9 100644 --- a/.github/workflows/presubmit.yaml +++ b/.github/workflows/presubmit.yaml @@ -4,12 +4,8 @@ on: branches: [main] pull_request: workflow_dispatch: -permissions: - contents: read jobs: presubmit: - permissions: - issues: write runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 From daf6a22e54f9cc5ae5de15d54e92479d9eb3f921 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 18/21] Drop controller-runtime deps --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 83dc1a5..afb5087 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,4 @@ verify: ## .PHONY: test test: ## - go test ./... + go test ./... \ No newline at end of file From 3a1c475cb1745dfa660f86fc84e2a26bb335206d Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 19/21] Drop controller-runtime deps --- events/controller.go | 10 ++++++++-- events/suite_test.go | 6 +++--- singleton/controller.go | 37 ------------------------------------- 3 files changed, 11 insertions(+), 42 deletions(-) diff --git a/events/controller.go b/events/controller.go index 19ec61c..c5e844d 100644 --- a/events/controller.go +++ b/events/controller.go @@ -42,10 +42,16 @@ func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). WatchesRawSource(singleton.Source()). - Complete(singleton.AsChannelObjectReconciler(c.EventWatchChannel, c)) + Complete(singleton.AsReconciler(c)) } -func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) { +func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) { + e := <-c.EventWatchChannel + if e.Object == nil { + return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil + } + event := e.Object.(*v1.Event) + // We check if the event was created in the lifetime of this controller // since we don't duplicate metrics on controller restart or lease handover if c.startTime.Before(event.LastTimestamp.Time) { diff --git a/events/suite_test.go b/events/suite_test.go index 8d49b90..535a9f7 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -80,7 +80,7 @@ var _ = Describe("Controller", func() { Object: events[i], } // reconcile on the event - _, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event @@ -100,7 +100,7 @@ var _ = Describe("Controller", func() { Object: event, } // reconcile on the event - _, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect not have an emitted metric to for the event @@ -114,7 +114,7 @@ var _ = Describe("Controller", func() { Object: event, } // reconcile on the event - _, err = singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) + _, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)}) Expect(err).ToNot(HaveOccurred()) // expect an emitted metric to for the event diff --git a/singleton/controller.go b/singleton/controller.go index 3feea85..d1cb1dc 100644 --- a/singleton/controller.go +++ b/singleton/controller.go @@ -2,13 +2,9 @@ package singleton import ( "context" - "math" "time" - "go.uber.org/multierr" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -31,39 +27,6 @@ func AsReconciler(reconciler Reconciler) reconcile.Reconciler { }) } -type ChannalObjectReconciler[T client.Object] interface { - Reconcile(ctx context.Context, object T) (reconcile.Result, error) -} - -func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event, reconciler ChannalObjectReconciler[T]) reconcile.Reconciler { - return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) { - var errs error - var results []reconcile.Result - e := <-watchEvents - if e.Object == nil { - return reconcile.Result{RequeueAfter: RequeueImmediately}, nil - } - res, err := reconciler.Reconcile(ctx, e.Object.(T)) - errs = multierr.Append(errs, err) - results = append(results, res) - - var result reconcile.Result - min := time.Duration(math.MaxInt64) - for _, r := range results { - if r.IsZero() { - continue - } - if r.RequeueAfter < min { - min = r.RequeueAfter - result.RequeueAfter = min - result.Requeue = true - } - } - - return result, errs - }) -} - func Source() source.Source { eventSource := make(chan event.GenericEvent, 1) eventSource <- event.GenericEvent{} From 8a1bc5e12757fc4ee00e21a408963f6ec32f6f23 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 20/21] Drop controller-runtime deps --- events/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/events/controller.go b/events/controller.go index c5e844d..e50376d 100644 --- a/events/controller.go +++ b/events/controller.go @@ -27,7 +27,7 @@ type Controller[T client.Object] struct { EventWatchChannel <-chan watch.Event } -func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, channel <-chan watch.Event) *Controller[T] { +func NewController[T client.Object](client client.Client, clock clock.Clock, channel <-chan watch.Event) *Controller[T] { gvk := object.GVK(object.New[T]()) return &Controller[T]{ gvk: gvk, @@ -38,7 +38,7 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c } } -func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error { +func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))). WatchesRawSource(singleton.Source()). From 56c5c3966366e31b6d705dd55443f7b912e80f56 Mon Sep 17 00:00:00 2001 From: Amanuel Engeda Date: Fri, 24 Jan 2025 15:45:46 -0800 Subject: [PATCH 21/21] Drop controller-runtime deps --- events/controller.go | 19 +++++++++++++------ events/suite_test.go | 9 ++++++--- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/events/controller.go b/events/controller.go index e50376d..493614c 100644 --- a/events/controller.go +++ b/events/controller.go @@ -9,9 +9,13 @@ import ( pmetrics "github.com/awslabs/operatorpkg/metrics" "github.com/awslabs/operatorpkg/object" "github.com/awslabs/operatorpkg/singleton" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -27,14 +31,17 @@ type Controller[T client.Object] struct { EventWatchChannel <-chan watch.Event } -func NewController[T client.Object](client client.Client, clock clock.Clock, channel <-chan watch.Event) *Controller[T] { +func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] { gvk := object.GVK(object.New[T]()) return &Controller[T]{ - gvk: gvk, - startTime: clock.Now(), - kubeClient: client, - EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), - EventWatchChannel: channel, + gvk: gvk, + startTime: clock.Now(), + kubeClient: client, + EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)), + EventWatchChannel: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{ + // Only reconcile on the object kind we care about + FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", object.GVK(&corev1.Node{}).Kind, object.GVK(&corev1.Node{}).GroupVersion().String()), + })).ResultChan(), } } diff --git a/events/suite_test.go b/events/suite_test.go index 535a9f7..0e1c272 100644 --- a/events/suite_test.go +++ b/events/suite_test.go @@ -21,10 +21,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" - ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" + crfake "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -52,13 +53,15 @@ var _ = BeforeSuite(func() { ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr) fakeClock = clock.NewFakeClock(time.Now()) - kubeClient = ctrlfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { + kubeClient = crfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string { evt := o.(*corev1.Event) return []string{evt.InvolvedObject.Kind} }).Build() + client := fake.NewSimpleClientset() eventChannel = make(chan watch.Event, 1000) - controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, eventChannel) + controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, client) + controller.EventWatchChannel = eventChannel }) var _ = Describe("Controller", func() {