Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 199 additions & 18 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"net/url"
"os"
goruntime "runtime"
"strings"
"sync"
"time"

"github.com/cloudflare/cfssl/log"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/tigera/operator/pkg/awssgsetup"
"github.com/tigera/operator/pkg/common"
"github.com/tigera/operator/pkg/components"
"github.com/tigera/operator/pkg/controller/metrics"
"github.com/tigera/operator/pkg/controller/options"
"github.com/tigera/operator/pkg/controller/utils"
"github.com/tigera/operator/pkg/dns"
Expand All @@ -44,13 +48,17 @@ import (
"github.com/tigera/operator/pkg/render/istio"
"github.com/tigera/operator/pkg/render/logstorage"
"github.com/tigera/operator/pkg/render/logstorage/eck"
"github.com/tigera/operator/pkg/render/monitor"
"github.com/tigera/operator/pkg/tls/certificatemanagement"
"github.com/tigera/operator/version"

operatortigeraiov1 "github.com/tigera/operator/api/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -61,14 +69,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/yaml"
// +kubebuilder:scaffold:imports
)

var (
defaultMetricsPort int32 = 8484
defaultMetricsPort int32 = 9484
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
Expand Down Expand Up @@ -251,11 +260,33 @@ If a value other than 'all' is specified, the first CRD with a prefix of the spe
active.WaitUntilActive(cs, c, sigHandler, setupLog)
log.Info("Active operator: proceeding")

metricsOpts := server.Options{
BindAddress: metricsAddr(),
}
var certLoader *dynamicCertLoader
if metricsTLSEnabled() {
certLoader = newDynamicCertLoader()
metricsOpts.TLSOpts = []func(*tls.Config){
func(cfg *tls.Config) {
cfg.GetCertificate = certLoader.GetCertificate
cfg.ClientAuth = tls.RequireAndVerifyClientCert
cfg.GetConfigForClient = func(*tls.ClientHelloInfo) (*tls.Config, error) {
pool := certLoader.GetClientCAs()
return &tls.Config{
GetCertificate: certLoader.GetCertificate,
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
MinVersion: tls.VersionTLS12,
}, nil
}
cfg.MinVersion = tls.VersionTLS12
},
}
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: server.Options{
BindAddress: metricsAddr(),
},
Scheme: scheme,
Metrics: metricsOpts,
WebhookServer: webhook.NewServer(webhook.Options{
Port: 9443,
}),
Expand Down Expand Up @@ -474,6 +505,17 @@ If a value other than 'all' is specified, the first CRD with a prefix of the spe
os.Exit(1)
}

// Register custom Prometheus metrics collector.
if metricsEnabled() {
collector := metrics.NewOperatorCollector(mgr.GetClient())
ctrlmetrics.Registry.MustRegister(collector)
}

// Start watching TLS secrets for the mTLS metrics endpoint.
if certLoader != nil {
go watchMetricsTLSSecrets(ctx, mgr, certLoader)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down Expand Up @@ -512,29 +554,34 @@ func setKubernetesServiceEnv(kubeconfigFile string) error {
return nil
}

// metricsAddr processes user-specified metrics host and port and sets
// default values accordingly.
// metricsAddr returns the bind address for the metrics endpoint.
// When METRICS_ENABLED is not "true", returns "0" to disable metrics.
// Otherwise, defaults to 0.0.0.0:9484 and allows overriding via
// METRICS_HOST and METRICS_PORT.
func metricsAddr() string {
metricsHost := os.Getenv("METRICS_HOST")
metricsPort := os.Getenv("METRICS_PORT")

// if neither are specified, disable metrics.
if metricsHost == "" && metricsPort == "" {
if !metricsEnabled() {
// the controller-runtime accepts '0' to denote that metrics should be disabled.
return "0"
}
// if just a host is specified, listen on port 8484 of that host.
if metricsHost != "" && metricsPort == "" {
// the controller-runtime will choose a random port if none is specified.
// so use the defaultMetricsPort in that case.

metricsHost := os.Getenv("METRICS_HOST")
if metricsHost == "" {
metricsHost = "0.0.0.0"
}

metricsPort := os.Getenv("METRICS_PORT")
if metricsPort == "" {
return fmt.Sprintf("%s:%d", metricsHost, defaultMetricsPort)
}

// finally, handle cases where just a port is specified or both are specified in the same case
// since controller-runtime correctly uses all interfaces if no host is specified.
return fmt.Sprintf("%s:%s", metricsHost, metricsPort)
}

// metricsEnabled returns true when the operator metrics endpoint is enabled.
func metricsEnabled() bool {
return strings.EqualFold(os.Getenv("METRICS_ENABLED"), "true")
}

func showCRDs(variant operatortigeraiov1.ProductVariant, outputType string) error {
first := true
for _, v := range crds.GetCRDs(variant, os.Getenv("CALICO_API_GROUP") == "projectcalico.org/v3") {
Expand Down Expand Up @@ -625,3 +672,137 @@ func verifyConfiguration(ctx context.Context, cs kubernetes.Interface, opts opti
return fmt.Errorf("refusing to run: configured as internal-es but secret/%s found which suggests external ES", logstorage.ExternalCertsSecret)
}
}

// metricsTLSEnabled returns true when the operator metrics endpoint should use mTLS.
func metricsTLSEnabled() bool {
return strings.EqualFold(os.Getenv("METRICS_SCHEME"), "https")
}

// dynamicCertLoader dynamically loads TLS certificates from Kubernetes secrets
// for the metrics endpoint. The monitor controller creates the server cert, and
// the client CA is loaded from the Prometheus client TLS secret.
type dynamicCertLoader struct {
mu sync.RWMutex
cert *tls.Certificate
clientCA *x509.CertPool
}

func newDynamicCertLoader() *dynamicCertLoader {
return &dynamicCertLoader{
clientCA: x509.NewCertPool(),
}
}

// GetCertificate returns the current server certificate for the metrics endpoint.
func (d *dynamicCertLoader) GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
d.mu.RLock()
defer d.mu.RUnlock()
if d.cert == nil {
return nil, fmt.Errorf("operator metrics TLS certificate not yet available")
}
return d.cert, nil
}

// GetClientCAs returns the current client CA pool.
func (d *dynamicCertLoader) GetClientCAs() *x509.CertPool {
d.mu.RLock()
defer d.mu.RUnlock()
return d.clientCA
}

// updateServerCert updates the server certificate from a Kubernetes TLS secret.
func (d *dynamicCertLoader) updateServerCert(secret *corev1.Secret) error {
certPEM, ok := secret.Data[corev1.TLSCertKey]
if !ok {
return fmt.Errorf("secret %s/%s missing %s", secret.Namespace, secret.Name, corev1.TLSCertKey)
}
keyPEM, ok := secret.Data[corev1.TLSPrivateKeyKey]
if !ok {
return fmt.Errorf("secret %s/%s missing %s", secret.Namespace, secret.Name, corev1.TLSPrivateKeyKey)
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return fmt.Errorf("failed to parse TLS keypair from %s/%s: %w", secret.Namespace, secret.Name, err)
}
d.mu.Lock()
defer d.mu.Unlock()
d.cert = &cert
return nil
}

// updateClientCA updates the client CA pool from a Kubernetes secret containing a certificate.
func (d *dynamicCertLoader) updateClientCA(secrets ...*corev1.Secret) {
pool := x509.NewCertPool()
for _, s := range secrets {
if s == nil {
continue
}
if certPEM, ok := s.Data[corev1.TLSCertKey]; ok {
pool.AppendCertsFromPEM(certPEM)
}
}
d.mu.Lock()
defer d.mu.Unlock()
d.clientCA = pool
}

// watchMetricsTLSSecrets periodically loads TLS secrets for the metrics endpoint.
// It runs until the context is canceled.
func watchMetricsTLSSecrets(ctx context.Context, mgr ctrl.Manager, loader *dynamicCertLoader) {
logger := ctrl.Log.WithName("metrics-tls")

// Wait for the cache to start before reading secrets.
if !mgr.GetCache().WaitForCacheSync(ctx) {
logger.Error(fmt.Errorf("cache sync failed"), "Cannot watch metrics TLS secrets")
return
}

c := mgr.GetClient()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

operatorNs := common.OperatorNamespace()

serverCertLoaded := false
loadSecrets := func() {
// Load operator server TLS secret.
serverSecret := &corev1.Secret{}
if err := c.Get(ctx, types.NamespacedName{Name: monitor.OperatorMetricsSecretName, Namespace: operatorNs}, serverSecret); err != nil {
if !serverCertLoaded {
logger.Info("Metrics mTLS is enabled but the server certificate secret is not yet available. "+
"Create the secret manually or apply the Monitor CR to have it provisioned automatically.",
"secret", monitor.OperatorMetricsSecretName, "namespace", operatorNs)
} else {
logger.V(2).Info("Operator metrics TLS secret not yet available", "error", err)
}
} else {
if err := loader.updateServerCert(serverSecret); err != nil {
logger.Error(err, "Failed to update operator metrics server cert")
} else {
if !serverCertLoaded {
logger.Info("Operator metrics TLS certificate loaded successfully", "secret", monitor.OperatorMetricsSecretName)
}
serverCertLoaded = true
}
}

// Load client CA from the tigera-ca-private secret. Any cert signed by this CA
// will be trusted for mTLS client authentication.
caSecret := &corev1.Secret{}
if err := c.Get(ctx, types.NamespacedName{Name: certificatemanagement.CASecretName, Namespace: operatorNs}, caSecret); err == nil {
loader.updateClientCA(caSecret)
}
}

// Initial load.
loadSecrets()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
loadSecrets()
}
}
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ require (
sigs.k8s.io/yaml v1.6.0
)

require github.com/prometheus/client_golang v1.23.0

require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
dario.cat/mergo v1.0.2 // indirect
Expand Down Expand Up @@ -118,6 +120,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.9 // indirect
Expand All @@ -143,7 +146,6 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
Expand Down
Loading
Loading