diff --git a/cmd/main.go b/cmd/main.go index 3b44745047..23d5263076 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,6 +30,7 @@ import ( "github.com/tigera/operator/internal/controller" "github.com/tigera/operator/pkg/active" + "github.com/tigera/operator/pkg/apigroup" "github.com/tigera/operator/pkg/apis" "github.com/tigera/operator/pkg/awssgsetup" "github.com/tigera/operator/pkg/common" @@ -211,12 +212,17 @@ If a value other than 'all' is specified, the first CRD with a prefix of the spe os.Exit(1) } - v3CRDs, err := apis.UseV3CRDS(cs) + v3CRDs, err := apis.UseV3CRDS(cfg) if err != nil { log.Error(err, "Failed to determine CRD version to use") os.Exit(1) } + // Tell the component handler which API group to inject into workloads. + if v3CRDs { + apigroup.Set(apigroup.V3) + } + // Add the Calico API to the scheme, now that we know which backing CRD version to use. utilruntime.Must(apis.AddToScheme(scheme, v3CRDs)) diff --git a/pkg/apigroup/apigroup.go b/pkg/apigroup/apigroup.go new file mode 100644 index 0000000000..cdd5ca4f15 --- /dev/null +++ b/pkg/apigroup/apigroup.go @@ -0,0 +1,76 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package apigroup tracks which Calico API group the operator should configure +// on the workloads it manages. The value is set once at startup (or when a +// datastore migration completes) and read by the component handler to inject +// the CALICO_API_GROUP env var into all workload containers. +package apigroup + +import ( + "sync" + + corev1 "k8s.io/api/core/v1" +) + +// APIGroup identifies which Calico CRD API group to use. +type APIGroup int + +const ( + // Unknown means the API group hasn't been determined yet. + Unknown APIGroup = iota + // V1 uses crd.projectcalico.org/v1 (legacy, via aggregated API server). + V1 + // V3 uses projectcalico.org/v3 (native CRDs, no API server). + V3 +) + +const ( + envVarName = "CALICO_API_GROUP" + v3Value = "projectcalico.org/v3" +) + +var ( + mu sync.RWMutex + current APIGroup + envVars []corev1.EnvVar +) + +// Set records the active API group. If V3, subsequent calls to EnvVars will +// return a CALICO_API_GROUP env var for injection into workload containers. +func Set(g APIGroup) { + mu.Lock() + defer mu.Unlock() + current = g + if g == V3 { + envVars = []corev1.EnvVar{{Name: envVarName, Value: v3Value}} + } else { + envVars = nil + } +} + +// Get returns the current API group. +func Get() APIGroup { + mu.RLock() + defer mu.RUnlock() + return current +} + +// EnvVars returns the env vars to inject into workload containers, or nil if +// no explicit API group has been configured. +func EnvVars() []corev1.EnvVar { + mu.RLock() + defer mu.RUnlock() + return envVars +} diff --git a/pkg/apigroup/apigroup_test.go b/pkg/apigroup/apigroup_test.go new file mode 100644 index 0000000000..2f3c75cbe3 --- /dev/null +++ b/pkg/apigroup/apigroup_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apigroup + +import ( + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" +) + +func TestSetAndGet(t *testing.T) { + tests := []struct { + name string + set APIGroup + wantGet APIGroup + wantEnvVars []corev1.EnvVar + }{ + { + name: "default state is Unknown with nil env vars", + set: Unknown, + wantGet: Unknown, + wantEnvVars: nil, + }, + { + name: "V3 returns CALICO_API_GROUP env var", + set: V3, + wantGet: V3, + wantEnvVars: []corev1.EnvVar{ + {Name: "CALICO_API_GROUP", Value: "projectcalico.org/v3"}, + }, + }, + { + name: "V1 returns nil env vars", + set: V1, + wantGet: V1, + wantEnvVars: nil, + }, + { + name: "setting back to Unknown clears env vars", + set: Unknown, + wantGet: Unknown, + wantEnvVars: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + Set(tt.set) + g.Expect(Get()).To(Equal(tt.wantGet)) + g.Expect(EnvVars()).To(Equal(tt.wantEnvVars)) + }) + } +} diff --git a/pkg/apis/version.go b/pkg/apis/version.go index 72cffb2a87..d6f6cc2195 100644 --- a/pkg/apis/version.go +++ b/pkg/apis/version.go @@ -15,23 +15,47 @@ package apis import ( + "context" "os" v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" ) var log = ctrl.Log.WithName("apis") -// UseV3CRDS detects whether we should use the crd.projectcalic.org/v1 or +// DatastoreMigrationGVR is the GroupVersionResource for DatastoreMigration CRs. +var DatastoreMigrationGVR = schema.GroupVersionResource{ + Group: "migration.projectcalico.org", + Version: "v1beta1", + Resource: "datastoremigrations", +} + +// UseV3CRDS detects whether we should use the crd.projectcalico.org/v1 or // projectcalico.org/v3 API group for Calico CRDs. -func UseV3CRDS(cs kubernetes.Interface) (bool, error) { +func UseV3CRDS(cfg *rest.Config) (bool, error) { if os.Getenv("CALICO_API_GROUP") != "" { log.Info("CALICO_API_GROUP environment variable is set, using its value to determine API group", "CALICO_API_GROUP", os.Getenv("CALICO_API_GROUP")) return os.Getenv("CALICO_API_GROUP") == "projectcalico.org/v3", nil } + // Check if a DatastoreMigration CR exists in a state that indicates v3 CRDs + // should be used. This handles operator restarts during or after migration. + if v3, err := checkDatastoreMigration(cfg); err != nil { + log.Info("Failed to check DatastoreMigration CR, falling through to API discovery", "error", err) + } else if v3 { + return true, nil + } + + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + return false, err + } apiGroups, err := cs.Discovery().ServerGroups() if err != nil { return false, err @@ -50,3 +74,28 @@ func UseV3CRDS(cs kubernetes.Interface) (bool, error) { log.Info("Detected API groups from API server", "v3present", v3present, "v1present", v1present) return v3present && !v1present, nil } + +// checkDatastoreMigration uses a dynamic client to look for a DatastoreMigration CR +// and returns true if one exists in a phase that indicates v3 CRDs should be used. +func checkDatastoreMigration(cfg *rest.Config) (bool, error) { + dc, err := dynamic.NewForConfig(cfg) + if err != nil { + return false, err + } + list, err := dc.Resource(DatastoreMigrationGVR).List(context.Background(), metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, item := range list.Items { + status, ok := item.Object["status"].(map[string]any) + if !ok { + continue + } + phase, _ := status["phase"].(string) + if phase == "Converged" || phase == "Complete" { + log.Info("DatastoreMigration CR found in post-migration phase, using v3 CRDs", "name", item.GetName(), "phase", phase) + return true, nil + } + } + return false, nil +} diff --git a/pkg/controller/apiserver/apiserver_controller.go b/pkg/controller/apiserver/apiserver_controller.go index 430e83cc21..5689083913 100644 --- a/pkg/controller/apiserver/apiserver_controller.go +++ b/pkg/controller/apiserver/apiserver_controller.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -42,6 +43,7 @@ import ( webhooksvalidation "github.com/tigera/operator/pkg/common/validation/webhooks" "github.com/tigera/operator/pkg/controller/certificatemanager" "github.com/tigera/operator/pkg/controller/k8sapi" + "github.com/tigera/operator/pkg/controller/migration/datastoremigration" "github.com/tigera/operator/pkg/controller/options" "github.com/tigera/operator/pkg/controller/status" "github.com/tigera/operator/pkg/controller/utils" @@ -64,9 +66,14 @@ var log = logf.Log.WithName("controller_apiserver") // Add creates a new APIServer Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager, opts options.ControllerOptions) error { + dc, err := dynamic.NewForConfig(mgr.GetConfig()) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } r := &ReconcileAPIServer{ client: mgr.GetClient(), scheme: mgr.GetScheme(), + dynamicClient: dc, status: status.New(mgr.GetClient(), "apiserver", opts.KubernetesVersion), tierWatchReady: &utils.ReadyFlag{}, opts: opts, @@ -182,6 +189,10 @@ func Add(mgr manager.Manager, opts options.ControllerOptions) error { return fmt.Errorf("apiserver-controller failed to create periodic reconcile watch: %w", err) } + // Watch DatastoreMigration CRs so the apiserver controller reacts promptly + // to migration phase changes (e.g., goes hands-off during Migrating). + go datastoremigration.WaitForWatchAndAdd(c, opts.K8sClientset) + log.V(5).Info("Controller created and Watches setup") return nil } @@ -194,6 +205,7 @@ type ReconcileAPIServer struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver client client.Client + dynamicClient dynamic.Interface scheme *runtime.Scheme status status.StatusManager tierWatchReady *utils.ReadyFlag @@ -209,6 +221,24 @@ func (r *ReconcileAPIServer) Reconcile(ctx context.Context, request reconcile.Re reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) reqLogger.V(2).Info("Reconciling APIServer") + // Check if a datastore migration is in progress. If so, the migration controller + // owns the APIService and we should not reconcile to avoid fighting over it. + migrationPhase := datastoremigration.GetPhase(r.dynamicClient) + if migrationPhase == datastoremigration.PhaseMigrating { + reqLogger.Info("DatastoreMigration is in Migrating phase, deferring APIServer reconciliation") + return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil + } + if migrationPhase == datastoremigration.PhaseConverged && !r.opts.UseV3CRDs { + // Migration has converged but the operator is still running in v1 mode. + // Trigger a restart by updating our own deployment with the CALICO_API_GROUP env var. + reqLogger.Info("DatastoreMigration converged, triggering operator restart to switch to v3 CRD mode") + if err := r.setAPIGroupEnvVar(ctx); err != nil { + reqLogger.Error(err, "Failed to trigger operator restart") + return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil + } + return reconcile.Result{}, nil + } + instance, msg, err := utils.GetAPIServer(ctx, r.client) if err != nil { if errors.IsNotFound(err) { @@ -567,6 +597,32 @@ func validateAPIServerResource(instance *operatorv1.APIServer) error { return nil } +// setAPIGroupEnvVar updates the operator's own Deployment to add the +// CALICO_API_GROUP env var, which triggers a rolling restart. On restart, +// UseV3CRDS() picks up the env var and the operator starts in v3 CRD mode. +func (r *ReconcileAPIServer) setAPIGroupEnvVar(ctx context.Context) error { + dep := &v1.Deployment{} + key := types.NamespacedName{Name: "tigera-operator", Namespace: common.OperatorNamespace()} + if err := r.client.Get(ctx, key, dep); err != nil { + return fmt.Errorf("failed to get operator deployment: %w", err) + } + + envVar := corev1.EnvVar{Name: "CALICO_API_GROUP", Value: "projectcalico.org/v3"} + for i, c := range dep.Spec.Template.Spec.Containers { + for _, e := range c.Env { + if e.Name == "CALICO_API_GROUP" { + return nil + } + } + dep.Spec.Template.Spec.Containers[i].Env = append(dep.Spec.Template.Spec.Containers[i].Env, envVar) + } + + if err := r.client.Update(ctx, dep); err != nil { + return fmt.Errorf("failed to update operator deployment: %w", err) + } + return nil +} + // maintainFinalizer manages this controller's finalizer on the Installation resource. // We add a finalizer to the Installation when the API server has been installed, and only remove that finalizer when // the API server has been deleted and its pods have stopped running. This allows for a graceful cleanup of API server resources diff --git a/pkg/controller/installation/core_controller.go b/pkg/controller/installation/core_controller.go index d043064209..b0e76a9061 100644 --- a/pkg/controller/installation/core_controller.go +++ b/pkg/controller/installation/core_controller.go @@ -44,6 +44,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -68,6 +69,7 @@ import ( "github.com/tigera/operator/pkg/controller/k8sapi" "github.com/tigera/operator/pkg/controller/migration" "github.com/tigera/operator/pkg/controller/migration/convert" + "github.com/tigera/operator/pkg/controller/migration/datastoremigration" "github.com/tigera/operator/pkg/controller/options" "github.com/tigera/operator/pkg/controller/status" "github.com/tigera/operator/pkg/controller/utils" @@ -279,6 +281,11 @@ func Add(mgr manager.Manager, opts options.ControllerOptions) error { return fmt.Errorf("tigera-installation-controller failed to create periodic reconcile watch: %w", err) } + // Watch DatastoreMigration CRs so the installation controller re-reconciles when + // migration state changes (e.g., Converged → triggers env var injection on components). + // This is a deferred watch since the CRD may not be installed. + go datastoremigration.WaitForWatchAndAdd(c, opts.K8sClientset) + return nil } @@ -315,10 +322,16 @@ func newReconciler(mgr manager.Manager, opts options.ControllerOptions) (*Reconc typhaListWatch := cache.NewListWatchFromClient(opts.K8sClientset.AppsV1().RESTClient(), "deployments", "calico-system", fields.OneTermEqualSelector("metadata.name", "calico-typha")) typhaScaler := newTyphaAutoscaler(opts.K8sClientset, nodeIndexInformer, typhaListWatch, statusManager) + dc, err := dynamic.NewForConfig(mgr.GetConfig()) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + r := &ReconcileInstallation{ config: mgr.GetConfig(), client: mgr.GetClient(), clientset: opts.K8sClientset, + dynamicClient: dc, scheme: mgr.GetScheme(), shutdownContext: opts.ShutdownContext, watches: make(map[runtime.Object]struct{}), @@ -355,6 +368,7 @@ func secondaryResources() []client.Object { &rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: render.CalicoNodeObjectName}}, &rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: render.CalicoCNIPluginObjectName}}, &rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: kubecontrollers.KubeControllerRole}}, + &rbacv1.ClusterRole{ObjectMeta: metav1.ObjectMeta{Name: kubecontrollers.MigrationClusterRoleName}}, &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{Name: render.CalicoNodeObjectName}}, &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{Name: render.CalicoCNIPluginObjectName}}, &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{Name: kubecontrollers.KubeControllerRole}}, @@ -374,6 +388,7 @@ type ReconcileInstallation struct { config *rest.Config client client.Client clientset *kubernetes.Clientset + dynamicClient dynamic.Interface scheme *runtime.Scheme shutdownContext context.Context watches map[runtime.Object]struct{} @@ -1121,6 +1136,14 @@ func (r *ReconcileInstallation) Reconcile(ctx context.Context, request reconcile } } + // If a DatastoreMigration CR exists, ensure the migration RBAC is created + // early so kube-controllers can start the migration without waiting for + // the rest of this reconcile to complete. + ch := r.newComponentHandler(reqLogger, r.client, r.scheme, instance) + if err := ch.CreateOrUpdateOrDelete(ctx, kubecontrollers.MigrationRBACComponent(datastoremigration.Exists(r.dynamicClient)), nil); err != nil { + reqLogger.Info("Failed to reconcile migration RBAC", "error", err) + } + // Determine if we need to migrate resources from the kube-system namespace. If // we do then we'll render the Calico components with additional node selectors to // prevent scheduling, later we will run a migration that migrates nodes one by one diff --git a/pkg/controller/migration/datastoremigration/utils.go b/pkg/controller/migration/datastoremigration/utils.go new file mode 100644 index 0000000000..d3fad2a5ef --- /dev/null +++ b/pkg/controller/migration/datastoremigration/utils.go @@ -0,0 +1,107 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package datastoremigration provides utilities for checking DatastoreMigration +// CR state from the operator's controllers. +package datastoremigration + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/tigera/operator/pkg/apis" + "github.com/tigera/operator/pkg/ctrlruntime" +) + +var log = logf.Log.WithName("datastoremigration") + +// Phase constants for DatastoreMigration status. +const ( + PhasePending = "Pending" + PhaseMigrating = "Migrating" + PhaseWaitingForConflictResolution = "WaitingForConflictResolution" + PhaseConverged = "Converged" + PhaseComplete = "Complete" + PhaseFailed = "Failed" +) + +// get fetches the first DatastoreMigration CR and returns its phase and +// whether it exists. Returns ("", false) if the CRD is not installed or +// no CR exists. +func get(dc dynamic.Interface) (string, bool) { + if dc == nil { + return "", false + } + list, err := dc.Resource(apis.DatastoreMigrationGVR).List(context.Background(), metav1.ListOptions{Limit: 1}) + if err != nil || len(list.Items) == 0 { + return "", false + } + status, ok := list.Items[0].Object["status"].(map[string]any) + if !ok { + return "", true + } + phase, _ := status["phase"].(string) + return phase, true +} + +// GetPhase returns the phase of the first DatastoreMigration CR, or empty +// string if none exists or the CRD is not installed. +func GetPhase(dc dynamic.Interface) string { + phase, _ := get(dc) + return phase +} + +// Exists returns true if at least one DatastoreMigration CR exists. +func Exists(dc dynamic.Interface) bool { + _, exists := get(dc) + return exists +} + +// WaitForWatchAndAdd polls for the DatastoreMigration CRD and sets up a watch +// on the given controller once available. This triggers controller reconciliation +// when the migration phase changes. +func WaitForWatchAndAdd(c ctrlruntime.Controller, cs *kubernetes.Clientset) { + duration := 1 * time.Second + maxDuration := 30 * time.Second + for { + time.Sleep(duration) + duration = min(2*duration, maxDuration) + + _, err := cs.Discovery().ServerResourcesForGroupVersion("migration.projectcalico.org/v1beta1") + if err != nil { + continue + } + + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "migration.projectcalico.org", + Version: "v1beta1", + Kind: "DatastoreMigration", + }) + if err := c.WatchObject(obj, &handler.EnqueueRequestForObject{}); err != nil { + log.V(2).Info("Failed to watch DatastoreMigration, will retry", "error", err) + continue + } + log.Info("Successfully watching DatastoreMigration CRs") + return + } +} diff --git a/pkg/controller/migration/datastoremigration/utils_test.go b/pkg/controller/migration/datastoremigration/utils_test.go new file mode 100644 index 0000000000..bb2a445b17 --- /dev/null +++ b/pkg/controller/migration/datastoremigration/utils_test.go @@ -0,0 +1,111 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datastoremigration + +import ( + "testing" + + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" + + "github.com/tigera/operator/pkg/apis" +) + +func newFakeClient(objects ...runtime.Object) *dynamicfake.FakeDynamicClient { + scheme := runtime.NewScheme() + gvrToListKind := map[schema.GroupVersionResource]string{ + apis.DatastoreMigrationGVR: "DatastoreMigrationList", + } + return dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind, objects...) +} + +func migrationCR(name, phase string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "migration.projectcalico.org", + Version: "v1beta1", + Kind: "DatastoreMigration", + }) + obj.SetName(name) + if phase != "" { + obj.Object["status"] = map[string]any{"phase": phase} + } + return obj +} + +func TestGetPhaseAndExists(t *testing.T) { + tests := []struct { + name string + objects []runtime.Object + nilClient bool + wantPhase string + wantExist bool + }{ + { + name: "nil client", + nilClient: true, + wantPhase: "", + wantExist: false, + }, + { + name: "no CRs", + wantPhase: "", + wantExist: false, + }, + { + name: "CR with no status", + objects: []runtime.Object{migrationCR("default", "")}, + wantPhase: "", + wantExist: true, + }, + { + name: "CR in Migrating phase", + objects: []runtime.Object{migrationCR("default", PhaseMigrating)}, + wantPhase: PhaseMigrating, + wantExist: true, + }, + { + name: "CR in Converged phase", + objects: []runtime.Object{migrationCR("default", PhaseConverged)}, + wantPhase: PhaseConverged, + wantExist: true, + }, + { + name: "CR in Complete phase", + objects: []runtime.Object{migrationCR("default", PhaseComplete)}, + wantPhase: PhaseComplete, + wantExist: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + if tt.nilClient { + g.Expect(GetPhase(nil)).To(Equal(tt.wantPhase)) + g.Expect(Exists(nil)).To(Equal(tt.wantExist)) + return + } + + dc := newFakeClient(tt.objects...) + g.Expect(GetPhase(dc)).To(Equal(tt.wantPhase)) + g.Expect(Exists(dc)).To(Equal(tt.wantExist)) + }) + } +} diff --git a/pkg/controller/utils/component.go b/pkg/controller/utils/component.go index eca9dffb57..5fa534c06e 100644 --- a/pkg/controller/utils/component.go +++ b/pkg/controller/utils/component.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + "github.com/tigera/operator/pkg/apigroup" "github.com/tigera/operator/pkg/common" "github.com/tigera/operator/pkg/controller/status" "github.com/tigera/operator/pkg/render" @@ -75,19 +76,21 @@ type ComponentHandler interface { // this is useful for CRD management so that they are not removed automatically. func NewComponentHandler(log logr.Logger, cli client.Client, scheme *runtime.Scheme, cr metav1.Object) ComponentHandler { return &componentHandler{ - client: cli, - scheme: scheme, - cr: cr, - log: log, + client: cli, + scheme: scheme, + cr: cr, + log: log, + apiGroupEnvs: apigroup.EnvVars(), } } type componentHandler struct { - client client.Client - scheme *runtime.Scheme - cr metav1.Object - log logr.Logger - createOnly bool + client client.Client + scheme *runtime.Scheme + cr metav1.Object + log logr.Logger + createOnly bool + apiGroupEnvs []v1.EnvVar } func (c *componentHandler) SetCreateOnly() { @@ -444,6 +447,12 @@ func (c *componentHandler) CreateOrUpdateOrDelete(ctx context.Context, component objsToCreate, objsToDelete := component.Objects() osType := component.SupportedOSType() + if len(c.apiGroupEnvs) > 0 { + for _, obj := range objsToCreate { + c.injectAPIGroupEnv(obj) + } + } + var alreadyExistsErr error = nil for _, obj := range objsToCreate { @@ -1086,7 +1095,6 @@ func addComponentLabel(obj metav1.Object, cr metav1.Object) { owner, ok := cr.(runtime.Object) if ok && owner.GetObjectKind() != nil && owner.GetObjectKind() != nil { obj.GetLabels()["app.kubernetes.io/component"] = sanitizeLabel(owner.GetObjectKind().GroupVersionKind().GroupKind().String()) - } } } @@ -1129,3 +1137,50 @@ func (r *ReadyFlag) MarkAsReady() { defer r.mu.Unlock() r.isReady = true } + +// injectAPIGroupEnv adds the CALICO_API_GROUP env var to all containers in +// workload objects. This ensures every component uses the correct API group +// during and after a datastore migration. +func (c *componentHandler) injectAPIGroupEnv(obj client.Object) { + var podSpec *v1.PodSpec + switch o := obj.(type) { + case *apps.Deployment: + podSpec = &o.Spec.Template.Spec + case *apps.DaemonSet: + podSpec = &o.Spec.Template.Spec + case *apps.StatefulSet: + podSpec = &o.Spec.Template.Spec + case *batchv1.Job: + podSpec = &o.Spec.Template.Spec + case *batchv1.CronJob: + podSpec = &o.Spec.JobTemplate.Spec.Template.Spec + default: + return + } + for i := range podSpec.Containers { + podSpec.Containers[i].Env = mergeEnvVars(podSpec.Containers[i].Env, c.apiGroupEnvs) + } + for i := range podSpec.InitContainers { + podSpec.InitContainers[i].Env = mergeEnvVars(podSpec.InitContainers[i].Env, c.apiGroupEnvs) + } +} + +// mergeEnvVars adds or updates env vars in existing. If an env var with the +// same name already exists, its value is updated in place. +func mergeEnvVars(existing []v1.EnvVar, toMerge []v1.EnvVar) []v1.EnvVar { + for _, env := range toMerge { + found := false + for i, e := range existing { + if e.Name == env.Name { + existing[i].Value = env.Value + existing[i].ValueFrom = env.ValueFrom + found = true + break + } + } + if !found { + existing = append(existing, env) + } + } + return existing +} diff --git a/pkg/render/kubecontrollers/kube-controllers.go b/pkg/render/kubecontrollers/kube-controllers.go index 036e950228..15d90b5e1c 100644 --- a/pkg/render/kubecontrollers/kube-controllers.go +++ b/pkg/render/kubecontrollers/kube-controllers.go @@ -404,6 +404,12 @@ func kubeControllersRoleCommonRules(cfg *KubeControllersConfiguration) []rbacv1. Resources: []string{"virtualmachineinstances", "virtualmachines"}, Verbs: []string{"get", "list", "watch"}, }, + { + // The datastore migration controller watches DatastoreMigration CRs and updates their status. + APIGroups: []string{"migration.projectcalico.org"}, + Resources: []string{"datastoremigrations", "datastoremigrations/status"}, + Verbs: []string{"get", "list", "watch", "create", "update", "patch"}, + }, } if cfg.Installation.KubernetesProvider.IsOpenShift() { diff --git a/pkg/render/kubecontrollers/kube-controllers_test.go b/pkg/render/kubecontrollers/kube-controllers_test.go index c7dcee5769..83cbccc357 100644 --- a/pkg/render/kubecontrollers/kube-controllers_test.go +++ b/pkg/render/kubecontrollers/kube-controllers_test.go @@ -266,7 +266,7 @@ var _ = Describe("kube-controllers rendering tests", func() { Expect(len(dp.Spec.Template.Spec.Volumes)).To(Equal(1)) clusterRole := rtest.GetResource(resources, kubecontrollers.KubeControllerRole, "", "rbac.authorization.k8s.io", "v1", "ClusterRole").(*rbacv1.ClusterRole) - Expect(clusterRole.Rules).To(HaveLen(25), "cluster role should have 25 rules") + Expect(clusterRole.Rules).To(HaveLen(26), "cluster role should have 26 rules") ms := rtest.GetResource(resources, kubecontrollers.KubeControllerMetrics, common.CalicoNamespace, "", "v1", "Service").(*corev1.Service) Expect(ms.Spec.ClusterIP).To(Equal("None"), "metrics service should be headless") @@ -353,7 +353,7 @@ var _ = Describe("kube-controllers rendering tests", func() { Expect(dp.Spec.Template.Spec.Volumes[0].ConfigMap.Name).To(Equal("tigera-ca-bundle")) clusterRole := rtest.GetResource(resources, kubecontrollers.EsKubeControllerRole, "", "rbac.authorization.k8s.io", "v1", "ClusterRole").(*rbacv1.ClusterRole) - Expect(clusterRole.Rules).To(HaveLen(23), "cluster role should have 23 rules") + Expect(clusterRole.Rules).To(HaveLen(24), "cluster role should have 24 rules") Expect(clusterRole.Rules).To(ContainElement( rbacv1.PolicyRule{ APIGroups: []string{""}, @@ -564,7 +564,7 @@ var _ = Describe("kube-controllers rendering tests", func() { Expect(dp.Spec.Template.Spec.Containers[0].Image).To(Equal("test-reg/tigera/kube-controllers:" + components.ComponentTigeraKubeControllers.Version)) clusterRole := rtest.GetResource(resources, kubecontrollers.EsKubeControllerRole, "", "rbac.authorization.k8s.io", "v1", "ClusterRole").(*rbacv1.ClusterRole) - Expect(clusterRole.Rules).To(HaveLen(23), "cluster role should have 23 rules") + Expect(clusterRole.Rules).To(HaveLen(24), "cluster role should have 24 rules") Expect(clusterRole.Rules).To(ContainElement( rbacv1.PolicyRule{ APIGroups: []string{""}, diff --git a/pkg/render/kubecontrollers/migration_rbac.go b/pkg/render/kubecontrollers/migration_rbac.go new file mode 100644 index 0000000000..68b997c075 --- /dev/null +++ b/pkg/render/kubecontrollers/migration_rbac.go @@ -0,0 +1,84 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubecontrollers + +import ( + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/tigera/operator/pkg/common" + "github.com/tigera/operator/pkg/render" +) + +const ( + MigrationClusterRoleName = "calico-kube-controllers-migration" + migrationClusterRoleBindingName = "calico-kube-controllers-migration" +) + +// migrationRBACObjects returns the ClusterRole and ClusterRoleBinding that grant +// calico-kube-controllers broad access to both API groups during a datastore migration. +func migrationRBACObjects() []client.Object { + return []client.Object{ + &rbacv1.ClusterRole{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRole", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: MigrationClusterRoleName}, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{"projectcalico.org", "crd.projectcalico.org"}, + Resources: []string{"*"}, + Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"}, + }, + { + APIGroups: []string{"apiregistration.k8s.io"}, + Resources: []string{"apiservices"}, + Verbs: []string{"get", "list", "watch", "create", "delete"}, + }, + { + APIGroups: []string{"apiextensions.k8s.io"}, + Resources: []string{"customresourcedefinitions"}, + Verbs: []string{"get", "list", "delete"}, + }, + }, + }, + &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRoleBinding", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: migrationClusterRoleBindingName}, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: MigrationClusterRoleName, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: "calico-kube-controllers", + Namespace: common.CalicoNamespace, + }, + }, + }, + } +} + +// MigrationRBACComponent returns a render component that creates or deletes the +// migration RBAC. When migrationActive is true, kube-controllers needs broad +// access to both API groups to read v1 resources and write v3 resources. +// When false, the extra permissions are cleaned up. +func MigrationRBACComponent(migrationActive bool) render.Component { + if migrationActive { + return render.NewCreationPassthrough(migrationRBACObjects()...) + } + return render.NewDeletionPassthrough(migrationRBACObjects()...) +} diff --git a/test/mainline_test.go b/test/mainline_test.go index bddb15a3b2..711d1ebd07 100644 --- a/test/mainline_test.go +++ b/test/mainline_test.go @@ -300,7 +300,7 @@ func setupManagerNoControllers() (client.Client, *kubernetes.Clientset, manager. clientset, err := kubernetes.NewForConfig(cfg) Expect(err).NotTo(HaveOccurred()) - v3CRDs, err := apis.UseV3CRDS(clientset) + v3CRDs, err := apis.UseV3CRDS(cfg) Expect(err).NotTo(HaveOccurred()) // Create a scheme to use.