Skip to content

Commit 1274d54

Browse files
authored
🐛 use operator cache provider for deprecation updates to limit calls to GRPC server (#3490)
* add log and metric to instrument count of catalog source snapshots Signed-off-by: Joe Lanford <[email protected]> * use operator cache provider for deprecation updates to limit calls to GRPC server Signed-off-by: Joe Lanford <[email protected]> --------- Signed-off-by: Joe Lanford <[email protected]>
1 parent edb751b commit 1274d54

File tree

8 files changed

+57
-31
lines changed

8 files changed

+57
-31
lines changed

pkg/controller/operators/catalog/operator.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type Operator struct {
131131
clientFactory clients.Factory
132132
muInstallPlan sync.Mutex
133133
resolverSourceProvider *resolver.RegistrySourceProvider
134+
operatorCacheProvider resolvercache.OperatorCacheProvider
134135
}
135136

136137
type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
@@ -217,8 +218,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
217218
}
218219
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
219220
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
221+
op.operatorCacheProvider = resolver.NewOperatorCacheProvider(lister, crClient, op.resolverSourceProvider, logger)
220222
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
221-
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
223+
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.operatorCacheProvider, logger)
222224
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)
223225

224226
// Wire OLM CR sharedIndexInformers
@@ -360,7 +362,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
360362
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
361363
subscription.WithRegistryReconcilerFactory(op.reconciler),
362364
subscription.WithGlobalCatalogNamespace(op.namespace),
363-
subscription.WithSourceProvider(op.resolverSourceProvider),
365+
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
364366
)
365367
if err != nil {
366368
return nil, err
@@ -781,6 +783,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
781783

782784
o.logger.Infof("state.Key.Namespace=%s state.Key.Name=%s state.State=%s", state.Key.Namespace, state.Key.Name, state.State.String())
783785
metrics.RegisterCatalogSourceState(state.Key.Name, state.Key.Namespace, state.State)
786+
metrics.RegisterCatalogSourceSnapshotsTotal(state.Key.Name, state.Key.Namespace)
784787

785788
switch state.State {
786789
case connectivity.Ready:
@@ -896,6 +899,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
896899
o.logger.WithField("source", sourceKey).Info("removed client for deleted catalogsource")
897900

898901
metrics.DeleteCatalogSourceStateMetric(catsrc.GetName(), catsrc.GetNamespace())
902+
metrics.DeleteCatalogSourceSnapshotsTotal(catsrc.GetName(), catsrc.GetNamespace())
899903
}
900904

901905
func validateSourceType(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, _ error) {

pkg/controller/operators/catalog/subscription/config.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type syncerConfig struct {
2828
reconcilers kubestate.ReconcilerChain
2929
registryReconcilerFactory reconciler.RegistryReconcilerFactory
3030
globalCatalogNamespace string
31-
sourceProvider resolverCache.SourceProvider
31+
operatorCacheProvider resolverCache.OperatorCacheProvider
3232
}
3333

3434
// SyncerOption is a configuration option for a subscription syncer.
@@ -131,9 +131,9 @@ func WithGlobalCatalogNamespace(namespace string) SyncerOption {
131131
}
132132
}
133133

134-
func WithSourceProvider(provider resolverCache.SourceProvider) SyncerOption {
134+
func WithOperatorCacheProvider(provider resolverCache.OperatorCacheProvider) SyncerOption {
135135
return func(config *syncerConfig) {
136-
config.sourceProvider = provider
136+
config.operatorCacheProvider = provider
137137
}
138138
}
139139

pkg/controller/operators/catalog/subscription/reconciler.go

+10-17
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ type catalogHealthReconciler struct {
5757
catalogLister listers.CatalogSourceLister
5858
registryReconcilerFactory reconciler.RegistryReconcilerFactory
5959
globalCatalogNamespace string
60-
sourceProvider cache.SourceProvider
60+
operatorCacheProvider cache.OperatorCacheProvider
61+
logger logrus.StdLogger
6162
}
6263

6364
// Reconcile reconciles subscription catalog health conditions.
@@ -126,21 +127,16 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
126127
// updateDeprecatedStatus adds deprecation status conditions to the subscription when present in the cache entry then
127128
// returns a bool value of true if any changes to the existing subscription have occurred.
128129
func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, sub *v1alpha1.Subscription) (bool, error) {
129-
if c.sourceProvider == nil {
130+
if c.operatorCacheProvider == nil {
130131
return false, nil
131132
}
132-
source, ok := c.sourceProvider.Sources(sub.Spec.CatalogSourceNamespace)[cache.SourceKey{
133+
134+
entries := c.operatorCacheProvider.Namespaced(sub.Spec.CatalogSourceNamespace).Catalog(cache.SourceKey{
133135
Name: sub.Spec.CatalogSource,
134136
Namespace: sub.Spec.CatalogSourceNamespace,
135-
}]
136-
if !ok {
137-
return false, nil
138-
}
139-
snapshot, err := source.Snapshot(ctx)
140-
if err != nil {
141-
return false, err
142-
}
143-
if len(snapshot.Entries) == 0 {
137+
}).Find(cache.PkgPredicate(sub.Spec.Package), cache.ChannelPredicate(sub.Spec.Channel))
138+
139+
if len(entries) == 0 {
144140
return false, nil
145141
}
146142

@@ -149,12 +145,9 @@ func (c *catalogHealthReconciler) updateDeprecatedStatus(ctx context.Context, su
149145
var deprecations *cache.Deprecations
150146

151147
found := false
152-
for _, entry := range snapshot.Entries {
148+
for _, entry := range entries {
153149
// Find the cache entry that matches this subscription
154-
if entry.SourceInfo == nil || entry.Package() != sub.Spec.Package {
155-
continue
156-
}
157-
if sub.Spec.Channel != "" && entry.Channel() != sub.Spec.Channel {
150+
if entry.SourceInfo == nil {
158151
continue
159152
}
160153
if sub.Status.InstalledCSV != entry.Name {

pkg/controller/operators/catalog/subscription/syncer.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/operator-framework/api/pkg/operators/install"
1717
"github.com/operator-framework/api/pkg/operators/v1alpha1"
1818
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
19-
resolverCache "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
2019
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
2120
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
2221
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
@@ -38,7 +37,6 @@ type subscriptionSyncer struct {
3837
installPlanLister listers.InstallPlanLister
3938
globalCatalogNamespace string
4039
notify kubestate.NotifyFunc
41-
sourceProvider resolverCache.SourceProvider
4240
}
4341

4442
// now returns the Syncer's current time.
@@ -218,7 +216,6 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
218216
reconcilers: config.reconcilers,
219217
subscriptionCache: config.subscriptionInformer.GetIndexer(),
220218
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
221-
sourceProvider: config.sourceProvider,
222219
notify: func(event types.NamespacedName) {
223220
// Notify Subscriptions by enqueuing to the Subscription queue.
224221
config.subscriptionQueue.Add(event)
@@ -256,7 +253,8 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
256253
catalogLister: config.lister.OperatorsV1alpha1().CatalogSourceLister(),
257254
registryReconcilerFactory: config.registryReconcilerFactory,
258255
globalCatalogNamespace: config.globalCatalogNamespace,
259-
sourceProvider: config.sourceProvider,
256+
operatorCacheProvider: config.operatorCacheProvider,
257+
logger: config.logger,
260258
},
261259
}
262260
s.reconcilers = append(defaultReconcilers, s.reconcilers...)

pkg/controller/registry/resolver/resolver.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ type constraintProvider interface {
2929
}
3030

3131
type Resolver struct {
32-
cache *cache.Cache
32+
cache cache.OperatorCacheProvider
3333
log logrus.FieldLogger
3434
pc *predicateConverter
3535
systemConstraintsProvider constraintProvider
3636
}
3737

38-
func NewDefaultResolver(rcp cache.SourceProvider, sourcePriorityProvider cache.SourcePriorityProvider, logger logrus.FieldLogger) *Resolver {
38+
func NewDefaultResolver(cacheProvider cache.OperatorCacheProvider, logger logrus.FieldLogger) *Resolver {
3939
return &Resolver{
40-
cache: cache.New(rcp, cache.WithLogger(logger), cache.WithSourcePriorityProvider(sourcePriorityProvider)),
40+
cache: cacheProvider,
4141
log: logger,
4242
pc: &predicateConverter{
4343
celEnv: constraints.NewCelEnvironment(),

pkg/controller/registry/resolver/source_registry.go

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1313
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1414
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
1516
"github.com/operator-framework/operator-registry/pkg/api"
1617
"github.com/operator-framework/operator-registry/pkg/client"
1718
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
@@ -143,6 +144,9 @@ type registrySource struct {
143144
}
144145

145146
func (s *registrySource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
147+
s.logger.Printf("requesting snapshot for catalog source %s/%s", s.key.Namespace, s.key.Name)
148+
metrics.IncrementCatalogSourceSnapshotsTotal(s.key.Name, s.key.Namespace)
149+
146150
// Fetching default channels this way makes many round trips
147151
// -- may need to either add a new API to fetch all at once,
148152
// or embed the information into Bundle.

pkg/controller/registry/resolver/step_resolver.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (pp catsrcPriorityProvider) Priority(key cache.SourceKey) int {
5656
return catsrc.Spec.Priority
5757
}
5858

59-
func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, sourceProvider cache.SourceProvider, log logrus.FieldLogger) *OperatorStepResolver {
59+
func NewOperatorCacheProvider(lister operatorlister.OperatorLister, client versioned.Interface, sourceProvider cache.SourceProvider, log logrus.FieldLogger) cache.OperatorCacheProvider {
6060
cacheSourceProvider := &mergedSourceProvider{
6161
sps: []cache.SourceProvider{
6262
sourceProvider,
@@ -70,13 +70,19 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
7070
},
7171
},
7272
}
73+
catSrcPriorityProvider := &catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}
74+
75+
return cache.New(cacheSourceProvider, cache.WithLogger(log), cache.WithSourcePriorityProvider(catSrcPriorityProvider))
76+
}
77+
78+
func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versioned.Interface, globalCatalogNamespace string, opCacheProvider cache.OperatorCacheProvider, log logrus.FieldLogger) *OperatorStepResolver {
7379
stepResolver := &OperatorStepResolver{
7480
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
7581
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
7682
ogLister: lister.OperatorsV1().OperatorGroupLister(),
7783
client: client,
7884
globalCatalogNamespace: globalCatalogNamespace,
79-
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
85+
resolver: NewDefaultResolver(opCacheProvider, log),
8086
log: log,
8187
}
8288

pkg/metrics/metrics.go

+21
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ var (
152152
[]string{NamespaceLabel, NameLabel},
153153
)
154154

155+
catalogSourceSnapshotsTotal = prometheus.NewCounterVec(
156+
prometheus.CounterOpts{
157+
Name: "catalog_source_snapshots_total",
158+
Help: "The number of times the catalog operator has requested a snapshot of data from a catalog source",
159+
},
160+
[]string{NamespaceLabel, NameLabel},
161+
)
162+
155163
// exported since it's not handled by HandleMetrics
156164
CSVUpgradeCount = prometheus.NewCounter(
157165
prometheus.CounterOpts{
@@ -250,6 +258,7 @@ func RegisterCatalog() {
250258
prometheus.MustRegister(subscriptionCount)
251259
prometheus.MustRegister(catalogSourceCount)
252260
prometheus.MustRegister(catalogSourceReady)
261+
prometheus.MustRegister(catalogSourceSnapshotsTotal)
253262
prometheus.MustRegister(SubscriptionSyncCount)
254263
prometheus.MustRegister(dependencyResolutionSummary)
255264
prometheus.MustRegister(installPlanWarningCount)
@@ -272,6 +281,18 @@ func DeleteCatalogSourceStateMetric(name, namespace string) {
272281
catalogSourceReady.DeleteLabelValues(namespace, name)
273282
}
274283

284+
func RegisterCatalogSourceSnapshotsTotal(name, namespace string) {
285+
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Add(0)
286+
}
287+
288+
func IncrementCatalogSourceSnapshotsTotal(name, namespace string) {
289+
catalogSourceSnapshotsTotal.WithLabelValues(namespace, name).Inc()
290+
}
291+
292+
func DeleteCatalogSourceSnapshotsTotal(name, namespace string) {
293+
catalogSourceSnapshotsTotal.DeleteLabelValues(namespace, name)
294+
}
295+
275296
func DeleteCSVMetric(oldCSV *operatorsv1alpha1.ClusterServiceVersion) {
276297
// Delete the old CSV metrics
277298
csvAbnormal.DeleteLabelValues(oldCSV.Namespace, oldCSV.Name, oldCSV.Spec.Version.String(), string(oldCSV.Status.Phase), string(oldCSV.Status.Reason))

0 commit comments

Comments
 (0)