Skip to content

Commit edb751b

Browse files
joelanfordPer Goncalves da Silva
and
Per Goncalves da Silva
authored
🐛 Switch queue informer to use types.Namespace name; relocate deletion handler (#3483)
* switch queue informer to use types.NamespacedName; relocate deletion handler Signed-off-by: Joe Lanford <[email protected]> * Add error message to object count assertion failure Signed-off-by: Per Goncalves da Silva <[email protected]> --------- Signed-off-by: Joe Lanford <[email protected]> Signed-off-by: Per Goncalves da Silva <[email protected]> Co-authored-by: Per Goncalves da Silva <[email protected]>
1 parent 8b47a95 commit edb751b

File tree

20 files changed

+275
-540
lines changed

20 files changed

+275
-540
lines changed

Diff for: pkg/controller/operators/catalog/operator.go

+34-32
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"
3232
"k8s.io/apimachinery/pkg/selection"
33+
"k8s.io/apimachinery/pkg/types"
3334
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3435
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3536
"k8s.io/apimachinery/pkg/util/sets"
@@ -114,7 +115,7 @@ type Operator struct {
114115
subQueueSet *queueinformer.ResourceQueueSet
115116
ipQueueSet *queueinformer.ResourceQueueSet
116117
ogQueueSet *queueinformer.ResourceQueueSet
117-
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
118+
nsResolveQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]
118119
namespace string
119120
recorder record.EventRecorder
120121
sources *grpc.SourceStore
@@ -268,8 +269,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
268269
// Wire InstallPlans
269270
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
270271
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
271-
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
272-
workqueue.TypedRateLimitingQueueConfig[any]{
272+
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
273+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
273274
Name: "ips",
274275
})
275276
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
@@ -290,8 +291,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
290291

291292
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
292293
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
293-
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
294-
workqueue.TypedRateLimitingQueueConfig[any]{
294+
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
295+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
295296
Name: "ogs",
296297
})
297298
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
@@ -312,8 +313,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
312313
// Wire CatalogSources
313314
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
314315
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
315-
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
316-
workqueue.TypedRateLimitingQueueConfig[any]{
316+
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
317+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
317318
Name: "catsrcs",
318319
})
319320
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
@@ -323,7 +324,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
323324
queueinformer.WithLogger(op.logger),
324325
queueinformer.WithQueue(catsrcQueue),
325326
queueinformer.WithInformer(catsrcInformer.Informer()),
326-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)),
327+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()),
328+
queueinformer.WithDeletionHandler(op.handleCatSrcDeletion),
327329
)
328330
if err != nil {
329331
return nil, err
@@ -341,8 +343,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
341343
subIndexer := subInformer.Informer().GetIndexer()
342344
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer
343345

344-
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
345-
workqueue.TypedRateLimitingQueueConfig[any]{
346+
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
347+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
346348
Name: "subs",
347349
})
348350
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
@@ -355,7 +357,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
355357
subscription.WithCatalogInformer(catsrcInformer.Informer()),
356358
subscription.WithInstallPlanInformer(ipInformer.Informer()),
357359
subscription.WithSubscriptionQueue(subQueue),
358-
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
360+
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
359361
subscription.WithRegistryReconcilerFactory(op.reconciler),
360362
subscription.WithGlobalCatalogNamespace(op.namespace),
361363
subscription.WithSourceProvider(op.resolverSourceProvider),
@@ -415,7 +417,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
415417
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
416418
logger.Info("registering labeller")
417419

418-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
420+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
419421
Name: gvr.String(),
420422
})
421423
queueInformer, err := queueinformer.NewQueueInformer(
@@ -560,7 +562,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
560562
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
561563
logger.Info("registering owner reference fixer")
562564

563-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
565+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
564566
Name: gvr.String(),
565567
})
566568
queueInformer, err := queueinformer.NewQueueInformer(
@@ -670,13 +672,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
670672
}
671673

672674
// Generate and register QueueInformers for k8s resources
673-
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
675+
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()
674676
for _, informer := range sharedIndexInformers {
675677
queueInformer, err := queueinformer.NewQueueInformer(
676678
ctx,
677679
queueinformer.WithLogger(op.logger),
678680
queueinformer.WithInformer(informer),
679681
queueinformer.WithSyncer(k8sSyncer),
682+
queueinformer.WithDeletionHandler(op.handleDeletion),
680683
)
681684
if err != nil {
682685
return nil, err
@@ -724,7 +727,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
724727
ctx,
725728
queueinformer.WithLogger(op.logger),
726729
queueinformer.WithInformer(crdInformer),
727-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
730+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()),
731+
queueinformer.WithDeletionHandler(op.handleDeletion),
728732
)
729733
if err != nil {
730734
return nil, err
@@ -745,8 +749,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
745749
// Namespace sync for resolving subscriptions
746750
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747751
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749-
workqueue.TypedRateLimitingQueueConfig[any]{
752+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
753+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
750754
Name: "resolve",
751755
})
752756
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
@@ -787,12 +791,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
787791

788792
if err == nil {
789793
for ns := range namespaces {
790-
o.nsResolveQueue.Add(ns)
794+
o.nsResolveQueue.Add(types.NamespacedName{Name: ns})
791795
}
792796
}
793797
}
794798

795-
o.nsResolveQueue.Add(state.Key.Namespace)
799+
o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace})
796800
}
797801
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
798802
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -873,18 +877,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
873877
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
874878
catsrc, ok := obj.(metav1.Object)
875879
if !ok {
880+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
876881
if !ok {
877-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
878-
if !ok {
879-
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
880-
return
881-
}
882+
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
883+
return
884+
}
882885

883-
catsrc, ok = tombstone.Obj.(metav1.Object)
884-
if !ok {
885-
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
886-
return
887-
}
886+
catsrc, ok = tombstone.Obj.(metav1.Object)
887+
if !ok {
888+
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
889+
return
888890
}
889891
}
890892
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
@@ -1411,7 +1413,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14111413
}
14121414

14131415
logger.Info("unpacking is not complete yet, requeueing")
1414-
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
1416+
o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second)
14151417
return nil
14161418
}
14171419
}
@@ -1506,7 +1508,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
15061508
return fmt.Errorf("casting Subscription failed")
15071509
}
15081510

1509-
o.nsResolveQueue.Add(sub.GetNamespace())
1511+
o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()})
15101512

15111513
return nil
15121514
}
@@ -1520,7 +1522,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
15201522
return fmt.Errorf("casting OperatorGroup failed")
15211523
}
15221524

1523-
o.nsResolveQueue.Add(og.GetNamespace())
1525+
o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()})
15241526

15251527
return nil
15261528
}

Diff for: pkg/controller/operators/catalog/operator_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -2156,13 +2156,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
21562156
client: clientFake,
21572157
lister: lister,
21582158
namespace: namespace,
2159-
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
2160-
workqueue.NewTypedMaxOfRateLimiter[any](
2161-
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second),
2159+
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](
2160+
workqueue.NewTypedMaxOfRateLimiter[types.NamespacedName](
2161+
workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](1*time.Second, 1000*time.Second),
21622162
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
2163-
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
2163+
&workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
21642164
),
2165-
workqueue.TypedRateLimitingQueueConfig[any]{
2165+
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
21662166
Name: "resolver",
21672167
}),
21682168
resolver: config.resolver,

Diff for: pkg/controller/operators/catalog/subscription/config.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/pkg/errors"
55
"github.com/sirupsen/logrus"
66
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
"k8s.io/apimachinery/pkg/types"
78
"k8s.io/client-go/tools/cache"
89
"k8s.io/client-go/util/workqueue"
910
utilclock "k8s.io/utils/clock"
@@ -23,7 +24,7 @@ type syncerConfig struct {
2324
subscriptionInformer cache.SharedIndexInformer
2425
catalogInformer cache.SharedIndexInformer
2526
installPlanInformer cache.SharedIndexInformer
26-
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
27+
subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]
2728
reconcilers kubestate.ReconcilerChain
2829
registryReconcilerFactory reconciler.RegistryReconcilerFactory
2930
globalCatalogNamespace string
@@ -97,7 +98,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
9798
}
9899

99100
// WithSubscriptionQueue sets a syncer's subscription queue.
100-
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
101+
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]) SyncerOption {
101102
return func(config *syncerConfig) {
102103
config.subscriptionQueue = subscriptionQueue
103104
}

Diff for: pkg/controller/operators/catalog/subscription/reconciler.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,14 @@ import (
2828

2929
// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
3030
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
31-
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
31+
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler {
3232
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
3333
out = in
3434
switch s := in.(type) {
3535
case SubscriptionExistsState:
3636
if sync != nil {
3737
err = sync(s.Subscription())
3838
}
39-
case SubscriptionDeletedState:
40-
if onDelete != nil {
41-
onDelete(s.Subscription())
42-
}
4339
case SubscriptionState:
4440
if sync != nil {
4541
err = sync(s.Subscription())

Diff for: pkg/controller/operators/catalog/subscription/state.go

-20
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type SubscriptionState interface {
2525
Subscription() *v1alpha1.Subscription
2626
Add() SubscriptionExistsState
2727
Update() SubscriptionExistsState
28-
Delete() SubscriptionDeletedState
2928
}
3029

3130
// SubscriptionExistsState describes subscription states in which the subscription exists on the cluster.
@@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface {
4948
isSubscriptionUpdatedState()
5049
}
5150

52-
// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster.
53-
type SubscriptionDeletedState interface {
54-
SubscriptionState
55-
56-
isSubscriptionDeletedState()
57-
}
58-
5951
// CatalogHealthState describes subscription states that represent a subscription with respect to catalog health.
6052
type CatalogHealthState interface {
6153
SubscriptionExistsState
@@ -176,12 +168,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState {
176168
}
177169
}
178170

179-
func (s *subscriptionState) Delete() SubscriptionDeletedState {
180-
return &subscriptionDeletedState{
181-
SubscriptionState: s,
182-
}
183-
}
184-
185171
func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState {
186172
return &subscriptionState{
187173
State: kubestate.NewState(),
@@ -207,12 +193,6 @@ type subscriptionUpdatedState struct {
207193

208194
func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {}
209195

210-
type subscriptionDeletedState struct {
211-
SubscriptionState
212-
}
213-
214-
func (c *subscriptionDeletedState) isSubscriptionDeletedState() {}
215-
216196
type catalogHealthState struct {
217197
SubscriptionExistsState
218198
}

0 commit comments

Comments
 (0)