Skip to content

Commit e295a39

Browse files
committed
Initial minimal ClusterMonitoring CRD Controller
Signed-off-by: Daniel Mellado <[email protected]>
1 parent 231e593 commit e295a39

File tree

5 files changed

+337
-8
lines changed

5 files changed

+337
-8
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
jsonnet/vendor/
55
/tmp
66

7+
# Test artifacts - temporary files generated during test runs
8+
test/**/e2e-test-monitor-*.json
9+
710
# These are empty target files, created on every docker build. Their sole
811
# purpose is to track the last target execution time to evaluate, whether the
912
# container needs to be rebuilt
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Copyright 2025 The Cluster Monitoring Operator Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package alert
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
configv1alpha1 "github.com/openshift/api/config/v1alpha1"
22+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
23+
"k8s.io/client-go/tools/cache"
24+
"k8s.io/client-go/util/workqueue"
25+
"k8s.io/klog/v2"
26+
27+
"github.com/openshift/cluster-monitoring-operator/pkg/client"
28+
)
29+
30+
const (
31+
controllerName = "cluster-monitoring"
32+
)
33+
34+
// ClusterMonitoringController is a minimal controller for ClusterMonitoring resources.
35+
type ClusterMonitoringController struct {
36+
client *client.Client
37+
queue workqueue.TypedRateLimitingInterface[string]
38+
informer cache.SharedIndexInformer
39+
}
40+
41+
// NewClusterMonitoringController returns a new minimal ClusterMonitoringController.
42+
func NewClusterMonitoringController(ctx context.Context, client *client.Client, version string) (*ClusterMonitoringController, error) {
43+
informer := cache.NewSharedIndexInformer(
44+
client.ClusterMonitoringListWatch(),
45+
&configv1alpha1.ClusterMonitoring{},
46+
resyncPeriod,
47+
cache.Indexers{},
48+
)
49+
50+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[string](
51+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](queueBaseDelay, queueMaxDelay),
52+
workqueue.TypedRateLimitingQueueConfig[string]{Name: controllerName},
53+
)
54+
55+
controller := &ClusterMonitoringController{
56+
client: client,
57+
queue: queue,
58+
informer: informer,
59+
}
60+
61+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
62+
AddFunc: controller.handleAdd,
63+
UpdateFunc: controller.handleUpdate,
64+
DeleteFunc: controller.handleDelete,
65+
})
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
return controller, nil
71+
}
72+
73+
// Run starts the controller.
74+
func (c *ClusterMonitoringController) Run(ctx context.Context, workers int) {
75+
klog.Info("Starting ClusterMonitoring controller")
76+
defer c.queue.ShutDown()
77+
78+
go c.informer.Run(ctx.Done())
79+
80+
if !cache.WaitForNamedCacheSync("ClusterMonitoring controller", ctx.Done(), c.informer.HasSynced) {
81+
klog.Error("Failed to sync ClusterMonitoring controller cache")
82+
return
83+
}
84+
85+
for i := 0; i < workers; i++ {
86+
go c.worker(ctx)
87+
}
88+
89+
klog.Info("ClusterMonitoring controller started")
90+
<-ctx.Done()
91+
klog.Info("ClusterMonitoring controller stopped")
92+
}
93+
94+
func (c *ClusterMonitoringController) worker(ctx context.Context) {
95+
for c.processNextWorkItem(ctx) {
96+
}
97+
}
98+
99+
func (c *ClusterMonitoringController) processNextWorkItem(ctx context.Context) bool {
100+
key, quit := c.queue.Get()
101+
if quit {
102+
return false
103+
}
104+
defer c.queue.Done(key)
105+
106+
if err := c.sync(ctx, key); err != nil {
107+
utilruntime.HandleError(fmt.Errorf("error syncing ClusterMonitoring (%s): %w", key, err))
108+
c.queue.AddRateLimited(key)
109+
return true
110+
}
111+
112+
klog.V(4).Infof("ClusterMonitoring successfully synced: %s", key)
113+
c.queue.Forget(key)
114+
return true
115+
}
116+
117+
func (c *ClusterMonitoringController) sync(ctx context.Context, key string) error {
118+
klog.Infof("🎉 ClusterMonitoring controller processing: %s", key)
119+
120+
// For now, just log that we saw the resource
121+
// Later we'll add the actual reconciliation logic
122+
123+
return nil
124+
}
125+
126+
func (c *ClusterMonitoringController) handleAdd(obj interface{}) {
127+
key, ok := c.keyFunc(obj)
128+
if !ok {
129+
return
130+
}
131+
klog.Infof("ClusterMonitoring added: %s", key)
132+
c.queue.Add(key)
133+
}
134+
135+
func (c *ClusterMonitoringController) handleUpdate(oldObj, newObj interface{}) {
136+
key, ok := c.keyFunc(newObj)
137+
if !ok {
138+
return
139+
}
140+
klog.Infof("ClusterMonitoring updated: %s", key)
141+
c.queue.Add(key)
142+
}
143+
144+
func (c *ClusterMonitoringController) handleDelete(obj interface{}) {
145+
key, ok := c.keyFunc(obj)
146+
if !ok {
147+
return
148+
}
149+
klog.Infof("ClusterMonitoring deleted: %s", key)
150+
c.queue.Add(key)
151+
}
152+
153+
func (c *ClusterMonitoringController) keyFunc(obj interface{}) (string, bool) {
154+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
155+
if err != nil {
156+
klog.Errorf("Creating key for ClusterMonitoring object failed: %v", err)
157+
return key, false
158+
}
159+
return key, true
160+
}

pkg/client/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,15 @@ func (c *Client) ClusterOperatorListWatch(ctx context.Context, name string) *cac
417417
}
418418
}
419419

420+
func (c *Client) ClusterMonitoringListWatch() *cache.ListWatch {
421+
return cache.NewListWatchFromClient(
422+
c.oscclient.ConfigV1alpha1().RESTClient(),
423+
"clustermonitorings",
424+
"",
425+
fields.Everything(),
426+
)
427+
}
428+
420429
func (c *Client) HasRouteCapability(ctx context.Context) (bool, error) {
421430
_, err := c.oscclient.ConfigV1().ClusterOperators().Get(ctx, "ingress", metav1.GetOptions{})
422431
if apierrors.IsNotFound(err) {

pkg/operator/operator.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ type Operator struct {
202202

203203
assets *manifests.Assets
204204

205-
ruleController *alert.RuleController
206-
relabelController *alert.RelabelConfigController
205+
ruleController *alert.RuleController
206+
relabelController *alert.RelabelConfigController
207+
clusterMonitoringController *alert.ClusterMonitoringController
207208
}
208209

209210
func New(
@@ -252,6 +253,11 @@ func New(
252253
return nil, fmt.Errorf("failed to create alert relabel config controller: %w", err)
253254
}
254255

256+
clusterMonitoringController, err := alert.NewClusterMonitoringController(ctx, c, version)
257+
if err != nil {
258+
return nil, fmt.Errorf("failed to create cluster monitoring controller: %w", err)
259+
}
260+
255261
o := &Operator{
256262
images: images,
257263
telemetryMatches: telemetryMatches,
@@ -265,12 +271,13 @@ func New(
265271
workqueue.NewTypedItemExponentialFailureRateLimiter[string](50*time.Millisecond, 3*time.Minute),
266272
workqueue.TypedRateLimitingQueueConfig[string]{Name: "cluster-monitoring"},
267273
),
268-
informers: make([]cache.SharedIndexInformer, 0),
269-
assets: a,
270-
informerFactories: make([]informers.SharedInformerFactory, 0),
271-
controllersToRunFunc: make([]func(context.Context, int), 0),
272-
ruleController: ruleController,
273-
relabelController: relabelController,
274+
informers: make([]cache.SharedIndexInformer, 0),
275+
assets: a,
276+
informerFactories: make([]informers.SharedInformerFactory, 0),
277+
controllersToRunFunc: make([]func(context.Context, int), 0),
278+
ruleController: ruleController,
279+
relabelController: relabelController,
280+
clusterMonitoringController: clusterMonitoringController,
274281
}
275282

276283
informer := cache.NewSharedIndexInformer(
@@ -532,6 +539,7 @@ func New(
532539
csrMetricsServerController.Run,
533540
o.ruleController.Run,
534541
o.relabelController.Run,
542+
o.clusterMonitoringController.Run,
535543
)
536544

537545
return o, nil
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright 2025 The Cluster Monitoring Operator Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import (
18+
"context"
19+
"testing"
20+
"time"
21+
22+
configv1alpha1 "github.com/openshift/api/config/v1alpha1"
23+
"github.com/openshift/cluster-monitoring-operator/test/e2e/framework"
24+
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/wait"
27+
)
28+
29+
const (
30+
clusterMonitoringName = "cluster"
31+
)
32+
33+
func TestClusterMonitoring(t *testing.T) {
34+
ctx := context.Background()
35+
clusterMonitorings := f.OpenShiftConfigClient.ConfigV1alpha1().ClusterMonitorings()
36+
37+
// Check if the ClusterMonitoring CRD is available (feature gate enabled)
38+
_, err := clusterMonitorings.List(ctx, metav1.ListOptions{Limit: 1})
39+
if err != nil {
40+
if apierrors.IsNotFound(err) {
41+
t.Skip("ClusterMonitoring CRD not available - ClusterMonitoringConfig feature gate may not be enabled")
42+
return
43+
}
44+
t.Fatalf("unexpected error checking ClusterMonitoring CRD availability: %v", err)
45+
}
46+
47+
// Clean up any existing test resource first
48+
_ = clusterMonitorings.Delete(ctx, clusterMonitoringName, metav1.DeleteOptions{})
49+
50+
time.Sleep(2 * time.Second)
51+
52+
cm := &configv1alpha1.ClusterMonitoring{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: clusterMonitoringName,
55+
Labels: map[string]string{
56+
framework.E2eTestLabelName: framework.E2eTestLabelValue,
57+
},
58+
},
59+
Spec: configv1alpha1.ClusterMonitoringSpec{
60+
AlertmanagerConfig: configv1alpha1.AlertmanagerConfig{
61+
DeploymentMode: configv1alpha1.AlertManagerDeployModeDefaultConfig,
62+
},
63+
},
64+
}
65+
66+
t.Log("Creating ClusterMonitoring resource...")
67+
createdCM, err := clusterMonitorings.Create(ctx, cm, metav1.CreateOptions{})
68+
if err != nil {
69+
t.Fatalf("Failed to create ClusterMonitoring: %v", err)
70+
}
71+
72+
defer func() {
73+
t.Log("Cleaning up ClusterMonitoring resource...")
74+
err := clusterMonitorings.Delete(ctx, clusterMonitoringName, metav1.DeleteOptions{})
75+
if err != nil && !apierrors.IsNotFound(err) {
76+
t.Errorf("Failed to delete ClusterMonitoring: %v", err)
77+
}
78+
}()
79+
80+
t.Logf("✅ Successfully created ClusterMonitoring resource: %s", createdCM.Name)
81+
82+
err = wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
83+
retrievedCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{})
84+
if err != nil {
85+
t.Logf("Waiting for ClusterMonitoring to be available: %v", err)
86+
return false, nil
87+
}
88+
89+
if retrievedCM.Spec.AlertmanagerConfig.DeploymentMode != configv1alpha1.AlertManagerDeployModeDefaultConfig {
90+
t.Logf("Waiting for correct AlertmanagerConfig.DeploymentMode, got: %s", retrievedCM.Spec.AlertmanagerConfig.DeploymentMode)
91+
return false, nil
92+
}
93+
94+
t.Logf("✅ ClusterMonitoring resource retrieved successfully with correct spec")
95+
return true, nil
96+
})
97+
98+
if err != nil {
99+
t.Fatalf("ClusterMonitoring resource was not properly created or retrieved: %v", err)
100+
}
101+
102+
t.Log("Testing ClusterMonitoring resource update...")
103+
err = wait.PollImmediate(2*time.Second, 30*time.Second, func() (bool, error) {
104+
currentCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{})
105+
if err != nil {
106+
return false, err
107+
}
108+
109+
currentCM.Spec.AlertmanagerConfig.DeploymentMode = configv1alpha1.AlertManagerDeployModeCustomConfig
110+
currentCM.Spec.AlertmanagerConfig.CustomConfig = configv1alpha1.AlertmanagerCustomConfig{
111+
LogLevel: configv1alpha1.LogLevelInfo,
112+
}
113+
114+
_, err = clusterMonitorings.Update(ctx, currentCM, metav1.UpdateOptions{})
115+
if err != nil {
116+
t.Logf("Retrying update due to: %v", err)
117+
return false, nil // Retry on conflict
118+
}
119+
120+
return true, nil
121+
})
122+
123+
if err != nil {
124+
t.Fatalf("Failed to update ClusterMonitoring: %v", err)
125+
}
126+
127+
updatedCM, err := clusterMonitorings.Get(ctx, clusterMonitoringName, metav1.GetOptions{})
128+
if err != nil {
129+
t.Fatalf("Failed to get updated ClusterMonitoring: %v", err)
130+
}
131+
132+
if updatedCM.Spec.AlertmanagerConfig.DeploymentMode != configv1alpha1.AlertManagerDeployModeCustomConfig {
133+
t.Errorf("Expected DeploymentMode to be CustomConfig, got: %s", updatedCM.Spec.AlertmanagerConfig.DeploymentMode)
134+
}
135+
136+
if updatedCM.Spec.AlertmanagerConfig.CustomConfig.LogLevel != configv1alpha1.LogLevelInfo {
137+
t.Errorf("Expected LogLevel to be Info, got: %s", updatedCM.Spec.AlertmanagerConfig.CustomConfig.LogLevel)
138+
}
139+
140+
t.Log("✅ ClusterMonitoring resource updated successfully")
141+
142+
// TODO: Once the controller is integrated into the operator
143+
// - Controller processes the ClusterMonitoring resource
144+
// - Appropriate Alertmanager resources are created/updated/deleted
145+
// - Controller logs show the resource was processed
146+
// For now, this test verifies the CRD CRUD operations
147+
148+
t.Log("✅ ClusterMonitoring e2e test completed successfully")
149+
}

0 commit comments

Comments
 (0)