Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/tigera/operator/internal/controller"
"github.com/tigera/operator/pkg/active"
"github.com/tigera/operator/pkg/apigroup"
"github.com/tigera/operator/pkg/apis"
"github.com/tigera/operator/pkg/awssgsetup"
"github.com/tigera/operator/pkg/common"
Expand Down Expand Up @@ -211,12 +212,17 @@ If a value other than 'all' is specified, the first CRD with a prefix of the spe
os.Exit(1)
}

v3CRDs, err := apis.UseV3CRDS(cs)
v3CRDs, err := apis.UseV3CRDS(cfg)
if err != nil {
log.Error(err, "Failed to determine CRD version to use")
os.Exit(1)
}

// Tell the component handler which API group to inject into workloads.
if v3CRDs {
apigroup.Set(apigroup.V3)
}

// Add the Calico API to the scheme, now that we know which backing CRD version to use.
utilruntime.Must(apis.AddToScheme(scheme, v3CRDs))

Expand Down
76 changes: 76 additions & 0 deletions pkg/apigroup/apigroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2026 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package apigroup tracks which Calico API group the operator should configure
// on the workloads it manages. The value is set once at startup (or when a
// datastore migration completes) and read by the component handler to inject
// the CALICO_API_GROUP env var into all workload containers.
package apigroup

import (
"sync"

corev1 "k8s.io/api/core/v1"
)

// APIGroup identifies which Calico CRD API group to use.
type APIGroup int

const (
// Unknown means the API group hasn't been determined yet.
Unknown APIGroup = iota
// V1 uses crd.projectcalico.org/v1 (legacy, via aggregated API server).
V1
// V3 uses projectcalico.org/v3 (native CRDs, no API server).
V3
)

const (
envVarName = "CALICO_API_GROUP"
v3Value = "projectcalico.org/v3"
)

var (
mu sync.RWMutex
current APIGroup
envVars []corev1.EnvVar
)

// Set records the active API group. If V3, subsequent calls to EnvVars will
// return a CALICO_API_GROUP env var for injection into workload containers.
func Set(g APIGroup) {
mu.Lock()
defer mu.Unlock()
current = g
if g == V3 {
envVars = []corev1.EnvVar{{Name: envVarName, Value: v3Value}}
} else {
envVars = nil
}
}

// Get returns the current API group.
func Get() APIGroup {
mu.RLock()
defer mu.RUnlock()
return current
}

// EnvVars returns the env vars to inject into workload containers, or nil if
// no explicit API group has been configured.
func EnvVars() []corev1.EnvVar {
mu.RLock()
defer mu.RUnlock()
return envVars
}
67 changes: 67 additions & 0 deletions pkg/apigroup/apigroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2026 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apigroup

import (
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

func TestSetAndGet(t *testing.T) {
tests := []struct {
name string
set APIGroup
wantGet APIGroup
wantEnvVars []corev1.EnvVar
}{
{
name: "default state is Unknown with nil env vars",
set: Unknown,
wantGet: Unknown,
wantEnvVars: nil,
},
{
name: "V3 returns CALICO_API_GROUP env var",
set: V3,
wantGet: V3,
wantEnvVars: []corev1.EnvVar{
{Name: "CALICO_API_GROUP", Value: "projectcalico.org/v3"},
},
},
{
name: "V1 returns nil env vars",
set: V1,
wantGet: V1,
wantEnvVars: nil,
},
{
name: "setting back to Unknown clears env vars",
set: Unknown,
wantGet: Unknown,
wantEnvVars: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
Set(tt.set)
g.Expect(Get()).To(Equal(tt.wantGet))
g.Expect(EnvVars()).To(Equal(tt.wantEnvVars))
})
}
}
53 changes: 51 additions & 2 deletions pkg/apis/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,47 @@
package apis

import (
"context"
"os"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
)

var log = ctrl.Log.WithName("apis")

// UseV3CRDS detects whether we should use the crd.projectcalic.org/v1 or
// DatastoreMigrationGVR is the GroupVersionResource for DatastoreMigration CRs.
var DatastoreMigrationGVR = schema.GroupVersionResource{
Group: "migration.projectcalico.org",
Version: "v1beta1",
Resource: "datastoremigrations",
}

// UseV3CRDS detects whether we should use the crd.projectcalico.org/v1 or
// projectcalico.org/v3 API group for Calico CRDs.
func UseV3CRDS(cs kubernetes.Interface) (bool, error) {
func UseV3CRDS(cfg *rest.Config) (bool, error) {
if os.Getenv("CALICO_API_GROUP") != "" {
log.Info("CALICO_API_GROUP environment variable is set, using its value to determine API group", "CALICO_API_GROUP", os.Getenv("CALICO_API_GROUP"))
return os.Getenv("CALICO_API_GROUP") == "projectcalico.org/v3", nil
}

// Check if a DatastoreMigration CR exists in a state that indicates v3 CRDs
// should be used. This handles operator restarts during or after migration.
if v3, err := checkDatastoreMigration(cfg); err != nil {
log.Info("Failed to check DatastoreMigration CR, falling through to API discovery", "error", err)
} else if v3 {
return true, nil
}

cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
return false, err
}
apiGroups, err := cs.Discovery().ServerGroups()
if err != nil {
return false, err
Expand All @@ -50,3 +74,28 @@ func UseV3CRDS(cs kubernetes.Interface) (bool, error) {
log.Info("Detected API groups from API server", "v3present", v3present, "v1present", v1present)
return v3present && !v1present, nil
}

// checkDatastoreMigration uses a dynamic client to look for a DatastoreMigration CR
// and returns true if one exists in a phase that indicates v3 CRDs should be used.
func checkDatastoreMigration(cfg *rest.Config) (bool, error) {
dc, err := dynamic.NewForConfig(cfg)
if err != nil {
return false, err
}
list, err := dc.Resource(DatastoreMigrationGVR).List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, err
}
for _, item := range list.Items {
status, ok := item.Object["status"].(map[string]any)
if !ok {
continue
}
phase, _ := status["phase"].(string)
if phase == "Converged" || phase == "Complete" {
log.Info("DatastoreMigration CR found in post-migration phase, using v3 CRDs", "name", item.GetName(), "phase", phase)
return true, nil
}
}
return false, nil
}
56 changes: 56 additions & 0 deletions pkg/controller/apiserver/apiserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -42,6 +43,7 @@ import (
webhooksvalidation "github.com/tigera/operator/pkg/common/validation/webhooks"
"github.com/tigera/operator/pkg/controller/certificatemanager"
"github.com/tigera/operator/pkg/controller/k8sapi"
"github.com/tigera/operator/pkg/controller/migration/datastoremigration"
"github.com/tigera/operator/pkg/controller/options"
"github.com/tigera/operator/pkg/controller/status"
"github.com/tigera/operator/pkg/controller/utils"
Expand All @@ -64,9 +66,14 @@ var log = logf.Log.WithName("controller_apiserver")
// Add creates a new APIServer Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, opts options.ControllerOptions) error {
dc, err := dynamic.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}
r := &ReconcileAPIServer{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
dynamicClient: dc,
status: status.New(mgr.GetClient(), "apiserver", opts.KubernetesVersion),
tierWatchReady: &utils.ReadyFlag{},
opts: opts,
Expand Down Expand Up @@ -182,6 +189,10 @@ func Add(mgr manager.Manager, opts options.ControllerOptions) error {
return fmt.Errorf("apiserver-controller failed to create periodic reconcile watch: %w", err)
}

// Watch DatastoreMigration CRs so the apiserver controller reacts promptly
// to migration phase changes (e.g., goes hands-off during Migrating).
go datastoremigration.WaitForWatchAndAdd(c, opts.K8sClientset)

log.V(5).Info("Controller created and Watches setup")
return nil
}
Expand All @@ -194,6 +205,7 @@ type ReconcileAPIServer struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
dynamicClient dynamic.Interface
scheme *runtime.Scheme
status status.StatusManager
tierWatchReady *utils.ReadyFlag
Expand All @@ -209,6 +221,24 @@ func (r *ReconcileAPIServer) Reconcile(ctx context.Context, request reconcile.Re
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.V(2).Info("Reconciling APIServer")

// Check if a datastore migration is in progress. If so, the migration controller
// owns the APIService and we should not reconcile to avoid fighting over it.
migrationPhase := datastoremigration.GetPhase(r.dynamicClient)
if migrationPhase == datastoremigration.PhaseMigrating {
reqLogger.Info("DatastoreMigration is in Migrating phase, deferring APIServer reconciliation")
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}
if migrationPhase == datastoremigration.PhaseConverged && !r.opts.UseV3CRDs {
// Migration has converged but the operator is still running in v1 mode.
// Trigger a restart by updating our own deployment with the CALICO_API_GROUP env var.
reqLogger.Info("DatastoreMigration converged, triggering operator restart to switch to v3 CRD mode")
if err := r.setAPIGroupEnvVar(ctx); err != nil {
reqLogger.Error(err, "Failed to trigger operator restart")
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}
return reconcile.Result{}, nil
}

instance, msg, err := utils.GetAPIServer(ctx, r.client)
if err != nil {
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -566,6 +596,32 @@ func validateAPIServerResource(instance *operatorv1.APIServer) error {
return nil
}

// setAPIGroupEnvVar updates the operator's own Deployment to add the
// CALICO_API_GROUP env var, which triggers a rolling restart. On restart,
// UseV3CRDS() picks up the env var and the operator starts in v3 CRD mode.
func (r *ReconcileAPIServer) setAPIGroupEnvVar(ctx context.Context) error {
dep := &v1.Deployment{}
key := types.NamespacedName{Name: "tigera-operator", Namespace: common.OperatorNamespace()}
if err := r.client.Get(ctx, key, dep); err != nil {
return fmt.Errorf("failed to get operator deployment: %w", err)
}

envVar := corev1.EnvVar{Name: "CALICO_API_GROUP", Value: "projectcalico.org/v3"}
for i, c := range dep.Spec.Template.Spec.Containers {
for _, e := range c.Env {
if e.Name == "CALICO_API_GROUP" {
return nil
}
}
dep.Spec.Template.Spec.Containers[i].Env = append(dep.Spec.Template.Spec.Containers[i].Env, envVar)
}

if err := r.client.Update(ctx, dep); err != nil {
return fmt.Errorf("failed to update operator deployment: %w", err)
}
return nil
}

// maintainFinalizer manages this controller's finalizer on the Installation resource.
// We add a finalizer to the Installation when the API server has been installed, and only remove that finalizer when
// the API server has been deleted and its pods have stopped running. This allows for a graceful cleanup of API server resources
Expand Down
Loading
Loading