Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
ctrlerrors "sigs.k8s.io/aws-load-balancer-controller/pkg/error"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
lbcmetrics "sigs.k8s.io/aws-load-balancer-controller/pkg/metrics/lbc"
metricsutil "sigs.k8s.io/aws-load-balancer-controller/pkg/metrics/util"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
)

const (
controllerName = "globalAccelerator"

// the groupVersion of used GlobalAccelerator resource.
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
globalAcceleratorKind = "GlobalAccelerator"

// Metric stage constants
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
MetricStageAddFinalizers = "add_finalizers"
MetricStageReconcileGlobalAccelerator = "reconcile_globalaccelerator"

// Metric error constants
MetricErrorAddFinalizers = "add_finalizers_error"
MetricErrorRemoveFinalizers = "remove_finalizers_error"
MetricErrorReconcileGlobalAccelerator = "reconcile_globalaccelerator_error"
)

// NewGlobalAcceleratorReconciler constructs new globalAcceleratorReconciler
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager,
config config.ControllerConfig, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {

return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
logger: logger,
metricsCollector: metricsCollector,
reconcileTracker: reconcileCounters.IncrementAGA,

maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
}
}

// globalAcceleratorReconciler reconciles a GlobalAccelerator object
type globalAcceleratorReconciler struct {
k8sClient client.Client
eventRecorder record.EventRecorder
finalizerManager k8s.FinalizerManager
logger logr.Logger
metricsCollector lbcmetrics.MetricCollector
reconcileTracker func(namespaceName types.NamespacedName)

maxConcurrentReconciles int
}

// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators/status,verbs=update;patch
// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators/finalizers,verbs=update;patch
func (r *globalAcceleratorReconciler) Reconcile(ctx context.Context, req reconcile.Request) (ctrl.Result, error) {
r.reconcileTracker(req.NamespacedName)
r.logger.V(1).Info("Reconcile request", "name", req.Name)
err := r.reconcile(ctx, req)
return runtime.HandleReconcileError(err, r.logger)
}

func (r *globalAcceleratorReconciler) reconcile(ctx context.Context, req reconcile.Request) error {
ga := &agaapi.GlobalAccelerator{}
var err error
fetchGlobalAcceleratorFn := func() {
err = r.k8sClient.Get(ctx, req.NamespacedName, ga)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageFetchGlobalAccelerator, fetchGlobalAcceleratorFn)
if err != nil {
return client.IgnoreNotFound(err)
}

if ga.DeletionTimestamp != nil && !ga.DeletionTimestamp.IsZero() {
return r.cleanupGlobalAccelerator(ctx, ga)
}
return r.reconcileGlobalAccelerator(ctx, ga)
}

func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
var err error
finalizerFn := func() {
if !k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
err = r.finalizerManager.AddFinalizers(ctx, ga, shared_constants.GlobalAcceleratorFinalizer)
}
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageAddFinalizers, finalizerFn)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorAddFinalizers, err, r.metricsCollector)
}

// TODO: Implement GlobalAccelerator resource management
// This would include:
// 1. Creating/updating AWS Global Accelerator
// 2. Managing listeners and endpoint groups
// 3. Handling endpoint discovery from Services/Ingresses/Gateways
reconcileResourceFn := func() {
err = r.reconcileGlobalAcceleratorResources(ctx, ga)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageReconcileGlobalAccelerator, reconcileResourceFn)
if err != nil {
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorReconcileGlobalAccelerator, err, r.metricsCollector)
}

r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
// TODO: Implement cleanup logic for AWS Global Accelerator resources
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
return err
}
if err := r.finalizerManager.RemoveFinalizers(ctx, ga, shared_constants.GlobalAcceleratorFinalizer); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedRemoveFinalizer, fmt.Sprintf("Failed remove finalizer due to %v", err))
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorRemoveFinalizers, err, r.metricsCollector)
}
}
return nil
}

func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
// TODO: Implement the actual AWS Global Accelerator resource management
// This is a placeholder implementation
r.logger.Info("Reconciling GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)

return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
// TODO: Implement the actual AWS Global Accelerator resource cleanup
// This is a placeholder implementation
r.logger.Info("Cleaning up GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
return nil
}

func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, clientSet *kubernetes.Clientset) error {
// Check if GlobalAccelerator CRD is available
resList, err := clientSet.ServerResourcesForGroupVersion(agaResourcesGroupVersion)
if err != nil {
r.logger.Info("GlobalAccelerator CRD is not available, skipping controller setup")
return nil
}
globalAcceleratorResourceAvailable := k8s.IsResourceKindAvailable(resList, globalAcceleratorKind)
if !globalAcceleratorResourceAvailable {
r.logger.Info("GlobalAccelerator CRD is not available, skipping controller setup")
return nil
}

if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
return err
}

// TODO: Add event handlers for Services, Ingresses, and Gateways
// that are referenced by GlobalAccelerator endpoints

return ctrl.NewControllerManagedBy(mgr).
For(&agaapi.GlobalAccelerator{}).
Named(controllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
}).
Complete(r)
}

func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
// TODO: Add field indexes for efficient lookups
return nil
}
12 changes: 1 addition & 11 deletions controllers/ingress/group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -305,7 +304,7 @@ func (r *groupReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager
if err != nil {
return err
}
ingressClassResourceAvailable := isResourceKindAvailable(resList, ingressClassKind)
ingressClassResourceAvailable := k8s.IsResourceKindAvailable(resList, ingressClassKind)
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer(), ingressClassResourceAvailable); err != nil {
return err
}
Expand Down Expand Up @@ -401,15 +400,6 @@ func (r *groupReconciler) setupWatches(_ context.Context, c controller.Controlle
return nil
}

// isResourceKindAvailable checks whether specific kind is available.
func isResourceKindAvailable(resList *metav1.APIResourceList, kind string) bool {
for _, res := range resList.APIResources {
if res.Kind == kind {
return true
}
}
return false
}

func isIngressStatusEqual(a, b []networking.IngressLoadBalancerIngress) bool {
if len(a) != len(b) {
Expand Down
9 changes: 9 additions & 0 deletions helm/aws-load-balancer-controller/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ rules:
- apiGroups: ["gateway.networking.k8s.io"]
resources: [grpcroutes/status, httproutes/status, tcproutes/status, tlsroutes/status, udproutes/status]
verbs: [get, patch, update]
- apiGroups: ["aga.k8s.aws"]
resources: [globalaccelerators]
verbs: [get, list, patch, watch]
- apiGroups: ["aga.k8s.aws"]
resources: [globalaccelerators/finalizers]
verbs: [patch, update]
- apiGroups: ["aga.k8s.aws"]
resources: [globalaccelerators/status]
verbs: [patch, update]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
1 change: 1 addition & 0 deletions helm/aws-load-balancer-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ controllerConfig:
# NLBHealthCheckAdvancedConfig: true
# ALBSingleSubnet: false
# LBCapacityReservation: true
# AGAController: true
# EnhancedDefaultBehavior: false

certDiscovery:
Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/klog/v2"
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
agacontroller "sigs.k8s.io/aws-load-balancer-controller/controllers/aga"
elbv2controller "sigs.k8s.io/aws-load-balancer-controller/controllers/elbv2"
"sigs.k8s.io/aws-load-balancer-controller/controllers/ingress"
"sigs.k8s.io/aws-load-balancer-controller/controllers/service"
Expand Down Expand Up @@ -79,6 +81,7 @@ var (
func init() {
_ = clientgoscheme.AddToScheme(scheme)

_ = agaapi.AddToScheme(scheme)
_ = elbv2api.AddToScheme(scheme)
_ = elbv2gw.AddToScheme(scheme)
_ = gwv1.AddToScheme(scheme)
Expand Down Expand Up @@ -226,6 +229,16 @@ func main() {
os.Exit(1)
}

// Setup GlobalAccelerator controller only if enabled
if controllerCFG.FeatureGates.Enabled(config.AGAController) {
agaReconciler := agacontroller.NewGlobalAcceleratorReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("globalAccelerator"),
finalizerManager, controllerCFG, ctrl.Log.WithName("controllers").WithName("globalAccelerator"), lbcMetricsCollector, reconcileCounters)
if err := agaReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "GlobalAccelerator")
os.Exit(1)
}
}

// Initialize common gateway configuration
if controllerCFG.FeatureGates.Enabled(config.NLBGatewayAPI) || controllerCFG.FeatureGates.Enabled(config.ALBGatewayAPI) {

Expand Down
6 changes: 6 additions & 0 deletions pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
flagGatewayClassMaxConcurrentReconciles = "gateway-class-max-concurrent-reconciles"
flagALBGatewayMaxConcurrentReconciles = "alb-gateway-max-concurrent-reconciles"
flagNLBGatewayMaxConcurrentReconciles = "nlb-gateway-max-concurrent-reconciles"
flagGlobalAcceleratorMaxConcurrentReconciles = "globalaccelerator-max-concurrent-reconciles"
flagTargetGroupBindingMaxExponentialBackoffDelay = "targetgroupbinding-max-exponential-backoff-delay"
flagLbStabilizationMonitorInterval = "lb-stabilization-monitor-interval"
flagDefaultSSLPolicy = "default-ssl-policy"
Expand Down Expand Up @@ -119,6 +120,9 @@ type ControllerConfig struct {
// NLBGatewayMaxConcurrentReconciles Max concurrent reconcile loops for NLB Gateway objects
NLBGatewayMaxConcurrentReconciles int

// GlobalAcceleratorMaxConcurrentReconciles Max concurrent reconcile loops for GlobalAccelerator objects
GlobalAcceleratorMaxConcurrentReconciles int

// EnableBackendSecurityGroup specifies whether to use optimized security group rules
EnableBackendSecurityGroup bool

Expand Down Expand Up @@ -164,6 +168,8 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
"Maximum number of concurrently running reconcile loops for alb gateway")
fs.IntVar(&cfg.NLBGatewayMaxConcurrentReconciles, flagNLBGatewayMaxConcurrentReconciles, defaultMaxConcurrentReconciles,
"Maximum number of concurrently running reconcile loops for nlb gateway")
fs.IntVar(&cfg.GlobalAcceleratorMaxConcurrentReconciles, flagGlobalAcceleratorMaxConcurrentReconciles, defaultMaxConcurrentReconciles,
"Maximum number of concurrently running reconcile loops for globalAccelerator")
fs.DurationVar(&cfg.TargetGroupBindingMaxExponentialBackoffDelay, flagTargetGroupBindingMaxExponentialBackoffDelay, defaultMaxExponentialBackoffDelay,
"Maximum duration of exponential backoff for targetGroupBinding reconcile failures")
fs.DurationVar(&cfg.LBStabilizationMonitorInterval, flagLbStabilizationMonitorInterval, defaultLbStabilizationMonitorInterval,
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/feature_gates.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
SubnetDiscoveryByReachability Feature = "SubnetDiscoveryByReachability"
NLBGatewayAPI Feature = "NLBGatewayAPI"
ALBGatewayAPI Feature = "ALBGatewayAPI"
AGAController Feature = "AGAController"
EnhancedDefaultBehavior Feature = "EnhancedDefaultBehavior"
)

Expand Down Expand Up @@ -70,6 +71,7 @@ func NewFeatureGates() FeatureGates {
LBCapacityReservation: true,
NLBGatewayAPI: false,
ALBGatewayAPI: false,
AGAController: true,
EnableTCPUDPListenerType: false,
EnhancedDefaultBehavior: false,
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/k8s/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ const (
// Load Balancer Configuration events
LoadBalancerConfigurationEventReasonFailedAddFinalizer = "FailedAddFinalizer"
LoadBalancerConfigurationEventReasonFailedRemoveFinalizer = "FailedRemoveFinalizer"

// GlobalAccelerator events
GlobalAcceleratorEventReasonFailedAddFinalizer = "FailedAddFinalizer"
GlobalAcceleratorEventReasonFailedRemoveFinalizer = "FailedRemoveFinalizer"
GlobalAcceleratorEventReasonFailedUpdateStatus = "FailedUpdateStatus"
GlobalAcceleratorEventReasonFailedCleanup = "FailedCleanup"
GlobalAcceleratorEventReasonSuccessfullyReconciled = "SuccessfullyReconciled"
)
10 changes: 10 additions & 0 deletions pkg/k8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ func ToSliceOfNamespacedNames[T metav1.ObjectMetaAccessor](s []T) []types.Namesp
}
return result
}

// IsResourceKindAvailable checks whether specific kind is available.
func IsResourceKindAvailable(resList *metav1.APIResourceList, kind string) bool {
for _, res := range resList.APIResources {
if res.Kind == kind {
return true
}
}
return false
}
Loading
Loading