diff --git a/internal/controller/base_controller.go b/internal/controller/base_controller.go index 9e165d14..130ff355 100644 --- a/internal/controller/base_controller.go +++ b/internal/controller/base_controller.go @@ -14,6 +14,8 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // BaseControllerOp is an enum for the different operations that can be performed by a BaseController @@ -114,6 +116,17 @@ func (self *BaseController[T]) Reconcile(ctx context.Context, req ctrl.Request, return ctrl.Result{}, nil } +// NewEnqueueRequestForMapFunc wraps a map function to be used as an event handler. +// It also takes care to make sure that the controllers logger is passed through to the map function, so +// that we can use our common pattern of getting the logger from the context. +func (self *BaseController[T]) NewEnqueueRequestForMapFunc(f func(ctx context.Context, obj client.Object) []reconcile.Request) handler.EventHandler { + wrappedFunc := func(ctx context.Context, obj client.Object) []reconcile.Request { + ctx = ctrl.LoggerInto(ctx, self.Log) + return f(ctx, obj) + } + return handler.EnqueueRequestsFromMapFunc(wrappedFunc) +} + // handleErr is a helper function to handle errors in the controller. If an ErrResult function is not provided, // it will use the default CtrlResultForErr function. func (self *BaseController[T]) handleErr(op BaseControllerOp, obj T, err error) (ctrl.Result, error) { diff --git a/internal/controller/bindings/endpointbinding_controller.go b/internal/controller/bindings/endpointbinding_controller.go index dc4012b3..e54d78b8 100644 --- a/internal/controller/bindings/endpointbinding_controller.go +++ b/internal/controller/bindings/endpointbinding_controller.go @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" @@ -141,11 +140,11 @@ func (r *EndpointBindingReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&bindingsv1alpha1.EndpointBinding{}). Watches( &v1.Service{}, - handler.EnqueueRequestsFromMapFunc(r.findEndpointBindingsForService), + r.controller.NewEnqueueRequestForMapFunc(r.findEndpointBindingsForService), ). Watches( &v1.Namespace{}, - handler.EnqueueRequestsFromMapFunc(r.findEndpointBindingsForNamespace), + r.controller.NewEnqueueRequestForMapFunc(r.findEndpointBindingsForNamespace), ). Complete(r) } @@ -159,7 +158,7 @@ func (r *EndpointBindingReconciler) Reconcile(ctx context.Context, req ctrl.Requ } func (r *EndpointBindingReconciler) create(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error { - targetService, upstreamService := r.convertEndpointBindingToServices(cr) + targetService, upstreamService := r.convertEndpointBindingToServices(ctx, cr) if err := r.createUpstreamService(ctx, cr, upstreamService); err != nil { return err @@ -208,7 +207,7 @@ func (r *EndpointBindingReconciler) createUpstreamService(ctx context.Context, o func (r *EndpointBindingReconciler) update(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error { log := ctrl.LoggerFrom(ctx) - desiredTargetService, desiredUpstreamService := r.convertEndpointBindingToServices(cr) + desiredTargetService, desiredUpstreamService := r.convertEndpointBindingToServices(ctx, cr) var existingTargetService v1.Service var existingUpstreamService v1.Service @@ -278,7 +277,7 @@ func (r *EndpointBindingReconciler) update(ctx context.Context, cr *bindingsv1al func (r *EndpointBindingReconciler) delete(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error { log := ctrl.LoggerFrom(ctx) - targetService, upstreamService := r.convertEndpointBindingToServices(cr) + targetService, upstreamService := r.convertEndpointBindingToServices(ctx, cr) if err := r.Client.Delete(ctx, targetService); err != nil { if client.IgnoreNotFound(err) == nil { return nil @@ -307,7 +306,9 @@ func (r *EndpointBindingReconciler) errResult(op controller.BaseControllerOp, cr } // convertEndpointBindingToServices converts an EndpointBinding into 2 Services: Target(ExternalName) and Upstream(Pod Forwarders) -func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBinding *bindingsv1alpha1.EndpointBinding) (*v1.Service, *v1.Service) { +func (r *EndpointBindingReconciler) convertEndpointBindingToServices(ctx context.Context, endpointBinding *bindingsv1alpha1.EndpointBinding) (*v1.Service, *v1.Service) { + log := ctrl.LoggerFrom(ctx) + // Send traffic to any Node in the cluster internalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster @@ -319,7 +320,7 @@ func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBin parts := strings.Split(label, "=") if len(parts) != 2 { - r.Log.Error(fmt.Errorf("invalid Pod Forwarder label: %s", label), "invalid Pod Forwarder label") + log.Error(fmt.Errorf("invalid Pod Forwarder label: %s", label), "invalid Pod Forwarder label") } podForwarderSelector[parts[0]] = parts[1] @@ -406,7 +407,7 @@ func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBin func (r *EndpointBindingReconciler) findEndpointBindingsForNamespace(ctx context.Context, namespace client.Object) []reconcile.Request { nsName := namespace.GetName() - log := r.Log.WithValues("namespace", nsName) + log := ctrl.LoggerFrom(ctx).WithValues("namespace", nsName) log.V(3).Info("Finding endpoint bindings for namespace") endpointBindings := &bindingsv1alpha1.EndpointBindingList{} @@ -437,7 +438,7 @@ func (r *EndpointBindingReconciler) findEndpointBindingsForNamespace(ctx context } func (r *EndpointBindingReconciler) findEndpointBindingsForService(ctx context.Context, svc client.Object) []reconcile.Request { - log := r.Log.WithValues("service.name", svc.GetName(), "service.namespace", svc.GetNamespace()) + log := ctrl.LoggerFrom(ctx).WithValues("service.name", svc.GetName(), "service.namespace", svc.GetNamespace()) log.V(3).Info("Finding endpoint bindings for service") svcLabels := svc.GetLabels() diff --git a/internal/controller/bindings/endpointbinding_controller_test.go b/internal/controller/bindings/endpointbinding_controller_test.go index dfded2b3..2efe8f3e 100644 --- a/internal/controller/bindings/endpointbinding_controller_test.go +++ b/internal/controller/bindings/endpointbinding_controller_test.go @@ -25,6 +25,7 @@ SOFTWARE. package bindings import ( + "context" "testing" bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1" @@ -65,7 +66,7 @@ func Test_convertEndpointBindingToServices(t *testing.T) { }, } - targetService, upstreamService := controller.convertEndpointBindingToServices(endpointBinding) + targetService, upstreamService := controller.convertEndpointBindingToServices(context.TODO(), endpointBinding) assert.Equal(targetService.Name, "client-service") assert.Equal(targetService.Namespace, "client-namespace") diff --git a/internal/controller/bindings/endpointbinding_poller.go b/internal/controller/bindings/endpointbinding_poller.go index ce43d6d9..0a468c8b 100644 --- a/internal/controller/bindings/endpointbinding_poller.go +++ b/internal/controller/bindings/endpointbinding_poller.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -58,34 +59,38 @@ type EndpointBindingPoller struct { // Start implements the manager.Runnable interface. func (r *EndpointBindingPoller) Start(ctx context.Context) error { - r.Log.Info("Starting the BindingConfiguration polling routine") + log := ctrl.LoggerFrom(ctx) + + log.Info("Starting the BindingConfiguration polling routine") r.stopCh = make(chan struct{}) defer close(r.stopCh) go r.startPollingAPI(ctx) <-ctx.Done() - r.Log.Info("Stopping the BindingConfiguration polling routine") + log.Info("Stopping the BindingConfiguration polling routine") return nil } // startPollingAPI polls a mock API every over a polling interval and updates the BindingConfiguration's status. func (r *EndpointBindingPoller) startPollingAPI(ctx context.Context) { + log := ctrl.LoggerFrom(ctx) + ticker := time.NewTicker(r.PollingInterval) defer ticker.Stop() // Reconcile on startup if err := r.reconcileEndpointBindingsFromAPI(ctx); err != nil { - r.Log.Error(err, "Failed to update binding_endpoints from API") + log.Error(err, "Failed to update binding_endpoints from API") } for { select { case <-ticker.C: - r.Log.Info("Polling API for binding_endpoints") + log.Info("Polling API for binding_endpoints") if err := r.reconcileEndpointBindingsFromAPI(ctx); err != nil { - r.Log.Error(err, "Failed to update binding_endpoints from API") + log.Error(err, "Failed to update binding_endpoints from API") } case <-r.stopCh: - r.Log.Info("Stopping API polling") + log.Info("Stopping API polling") return } } @@ -94,6 +99,8 @@ func (r *EndpointBindingPoller) startPollingAPI(ctx context.Context) { // reconcileEndpointBindingsFromAPI fetches the desired binding_endpoints for this kubernetes operator binding // then creates, updates, or deletes the EndpointBindings in-cluster func (r *EndpointBindingPoller) reconcileEndpointBindingsFromAPI(ctx context.Context) error { + log := ctrl.LoggerFrom(ctx) + if r.reconcilingCancel != nil { r.reconcilingCancel() // cancel the previous reconcile loop } @@ -101,6 +108,7 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingsFromAPI(ctx context.Con // Fetch the mock endpoint data from the API resp, err := fetchEndpoints() if err != nil { + log.Error(err, "Failed to fetch binding_endpoints from API") return err } @@ -137,7 +145,7 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingsFromAPI(ctx context.Con // reassign port allocations r.portAllocator = currentPortAllocations - toCreate, toUpdate, toDelete := r.filterEndpointBindingActions(existingEndpointBindings, desiredEndpointBindings) + toCreate, toUpdate, toDelete := r.filterEndpointBindingActions(ctx, existingEndpointBindings, desiredEndpointBindings) // create context + errgroup for managing/closing the future goroutine in the reconcile actions loops errGroup, ctx := errgroup.WithContext(ctx) @@ -167,6 +175,8 @@ type endpointBindingActionFn func(context.Context, bindingsv1alpha1.EndpointBind // reconcileEndpointBindingAction runs a goroutine to try and process a list of EndpointBindings // for their desired action over and over again until stopChan is closed or receives a value func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Context, errGroup *errgroup.Group, endpointBindings []bindingsv1alpha1.EndpointBinding, actionMsg string, action endpointBindingActionFn) { + log := ctrl.LoggerFrom(ctx) + errGroup.Go(func() error { // attempt reconciliation actions every so often ticker := time.NewTicker(5 * time.Second) @@ -179,10 +189,10 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte select { // stop go routine and return, there is a new reconcile poll happening actively case <-ctx.Done(): - r.Log.Error(ctx.Err(), "Reconcile context canceled, stopping EndpointBinding reconcile loop early", "action", actionMsg) + log.Error(ctx.Err(), "Reconcile context canceled, stopping EndpointBinding reconcile loop early", "action", actionMsg) return nil case <-ticker.C: - r.Log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings) + log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings) if len(remainingBindings) == 0 { return nil // all bindings have been processed } @@ -193,7 +203,7 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte for _, binding := range remainingBindings { if err := action(ctx, binding); err != nil { name := hashURI(binding.Spec.EndpointURI) - r.Log.Error(err, "Failed to reconcile EndpointBinding", "action", actionMsg, "name", name, "uri", binding.Spec.EndpointURI) + log.Error(err, "Failed to reconcile EndpointBinding", "action", actionMsg, "name", name, "uri", binding.Spec.EndpointURI) failedBindings = append(failedBindings, binding) } } @@ -208,12 +218,14 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte // filterEndpointBindingActions takse 2 sets of existing and desired EndpointBindings // and returns 3 lists: toCreate, toUpdate, toDelete // representing the actions needed to reconcile the existing set with the desired set -func (r *EndpointBindingPoller) filterEndpointBindingActions(existingEndpointBindings []bindingsv1alpha1.EndpointBinding, desiredEndpoints ngrokapi.AggregatedEndpoints) (toCreate []bindingsv1alpha1.EndpointBinding, toUpdate []bindingsv1alpha1.EndpointBinding, toDelete []bindingsv1alpha1.EndpointBinding) { +func (r *EndpointBindingPoller) filterEndpointBindingActions(ctx context.Context, existingEndpointBindings []bindingsv1alpha1.EndpointBinding, desiredEndpoints ngrokapi.AggregatedEndpoints) (toCreate []bindingsv1alpha1.EndpointBinding, toUpdate []bindingsv1alpha1.EndpointBinding, toDelete []bindingsv1alpha1.EndpointBinding) { + log := ctrl.LoggerFrom(ctx) + toCreate = []bindingsv1alpha1.EndpointBinding{} toUpdate = []bindingsv1alpha1.EndpointBinding{} toDelete = []bindingsv1alpha1.EndpointBinding{} - r.Log.V(9).Info("Filtering EndpointBindings", "existing", existingEndpointBindings, "desired", desiredEndpoints) + log.V(9).Info("Filtering EndpointBindings", "existing", existingEndpointBindings, "desired", desiredEndpoints) for _, existingEndpointBinding := range existingEndpointBindings { uri := existingEndpointBinding.Spec.EndpointURI @@ -253,6 +265,8 @@ func (r *EndpointBindingPoller) filterEndpointBindingActions(existingEndpointBin // TODO: Metadata func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindingsv1alpha1.EndpointBinding) error { + log := ctrl.LoggerFrom(ctx) + name := hashURI(desired.Spec.EndpointURI) // allocate a port @@ -284,10 +298,10 @@ func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindi }, } - r.Log.Info("Creating new EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI) + log.Info("Creating new EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI) if err := r.Create(ctx, toCreate); err != nil { if client.IgnoreAlreadyExists(err) == nil { - r.Log.Info("EndpointBinding already exists, skipping create...", "name", name, "uri", toCreate.Spec.EndpointURI) + log.Info("EndpointBinding already exists, skipping create...", "name", name, "uri", toCreate.Spec.EndpointURI) if toCreate.Status.HashedName != "" && len(toCreate.Status.Endpoints) > 0 { // Status is filled, no need to update @@ -295,16 +309,16 @@ func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindi } else { // intentionally blonk // we want to fall through and fill in the status - r.Log.Info("EndpointBinding already exists, but status is empty, filling in status...", "name", name, "uri", toCreate.Spec.EndpointURI, "toCreate", toCreate) + log.Info("EndpointBinding already exists, but status is empty, filling in status...", "name", name, "uri", toCreate.Spec.EndpointURI, "toCreate", toCreate) // refresh the toCreate object with existing data if err := r.Get(ctx, client.ObjectKey{Namespace: r.Namespace, Name: name}, toCreate); err != nil { - r.Log.Error(err, "Failed to get existing EndpointBinding, skipping status update...", "name", name, "uri", toCreate.Spec.EndpointURI) + log.Error(err, "Failed to get existing EndpointBinding, skipping status update...", "name", name, "uri", toCreate.Spec.EndpointURI) return nil } } } else { - r.Log.Error(err, "Failed to create EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI) + log.Error(err, "Failed to create EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI) r.Recorder.Event(toCreate, v1.EventTypeWarning, "Created", fmt.Sprintf("Failed to create EndpointBinding: %v", err)) return err } @@ -338,6 +352,8 @@ func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindi } func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindingsv1alpha1.EndpointBinding) error { + log := ctrl.LoggerFrom(ctx) + desiredName := hashURI(desired.Spec.EndpointURI) var existing bindingsv1alpha1.EndpointBinding @@ -345,17 +361,17 @@ func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindi if err != nil { if client.IgnoreNotFound(err) == nil { // EndpointBinding doesn't exist, create it on the next polling loop - r.Log.Info("Unable to find existing EndpointBinding, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI) + log.Info("Unable to find existing EndpointBinding, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI) return nil // not an error } else { // real error - r.Log.Error(err, "Failed to find existing EndpointBinding", "name", desiredName, "uri", desired.Spec.EndpointURI) + log.Error(err, "Failed to find existing EndpointBinding", "name", desiredName, "uri", desired.Spec.EndpointURI) return err } } if !endpointBindingNeedsUpdate(existing, desired) { - r.Log.Info("EndpointBinding already matches existing state, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI) + log.Info("EndpointBinding already matches existing state, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI) return nil } @@ -367,9 +383,9 @@ func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindi toUpdate.Spec.Target = desired.Spec.Target toUpdate.Spec.EndpointURI = desired.Spec.EndpointURI - r.Log.Info("Updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) + log.Info("Updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) if err := r.Update(ctx, toUpdate); err != nil { - r.Log.Error(err, "Failed updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) + log.Error(err, "Failed updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) r.Recorder.Event(toUpdate, v1.EventTypeWarning, "Updated", fmt.Sprintf("Failed to update EndpointBinding: %v", err)) return err } @@ -402,11 +418,13 @@ func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindi } func (r *EndpointBindingPoller) deleteBinding(ctx context.Context, endpointBinding bindingsv1alpha1.EndpointBinding) error { + log := ctrl.LoggerFrom(ctx) + if err := r.Delete(ctx, &endpointBinding); err != nil { - r.Log.Error(err, "Failed to delete EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI) + log.Error(err, "Failed to delete EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI) return err } else { - r.Log.Info("Deleted EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI) + log.Info("Deleted EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI) // unset the port allocation r.portAllocator.Unset(endpointBinding.Spec.Port) @@ -416,15 +434,17 @@ func (r *EndpointBindingPoller) deleteBinding(ctx context.Context, endpointBindi } func (r *EndpointBindingPoller) updateBindingStatus(ctx context.Context, desired *bindingsv1alpha1.EndpointBinding) error { + log := ctrl.LoggerFrom(ctx) + toUpdate := desired toUpdate.Status = desired.Status if err := r.Status().Update(ctx, toUpdate); err != nil { - r.Log.Error(err, "Failed to update EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) + log.Error(err, "Failed to update EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) return err } - r.Log.Info("Updated EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) + log.Info("Updated EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI) return nil } diff --git a/internal/controller/bindings/endpointbinding_poller_test.go b/internal/controller/bindings/endpointbinding_poller_test.go index bbe1a928..8e190c33 100644 --- a/internal/controller/bindings/endpointbinding_poller_test.go +++ b/internal/controller/bindings/endpointbinding_poller_test.go @@ -1,6 +1,7 @@ package bindings import ( + "context" "testing" "github.com/go-logr/logr" @@ -152,11 +153,11 @@ func Test_EndpointBindingPoller_filterEndpointBindingActions(t *testing.T) { t.Parallel() assert := assert.New(t) - gotCreate, gotUpdate, gotDelete := examplePoller.filterEndpointBindingActions(test.existing, test.desired) + gotCreate, gotUpdate, gotDelete := examplePoller.filterEndpointBindingActions(context.TODO(), test.existing, test.desired) - assert.Equal(test.wantCreate, gotCreate) - assert.Equal(test.wantUpdate, gotUpdate) - assert.Equal(test.wantDelete, gotDelete) + assert.ElementsMatch(test.wantCreate, gotCreate) + assert.ElementsMatch(test.wantUpdate, gotUpdate) + assert.ElementsMatch(test.wantDelete, gotDelete) }) } } diff --git a/internal/controller/gateway/gateway_controller.go b/internal/controller/gateway/gateway_controller.go index bf379ba8..f609e1bb 100644 --- a/internal/controller/gateway/gateway_controller.go +++ b/internal/controller/gateway/gateway_controller.go @@ -63,7 +63,7 @@ type GatewayReconciler struct { // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status,verbs=get;list;watch;update func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("Gateway", req.NamespacedName) + log := ctrl.LoggerFrom(ctx).WithValues("Gateway", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) gw := new(gatewayv1.Gateway) diff --git a/internal/controller/gateway/httproute_controller.go b/internal/controller/gateway/httproute_controller.go index 49b06553..e3822ff6 100644 --- a/internal/controller/gateway/httproute_controller.go +++ b/internal/controller/gateway/httproute_controller.go @@ -55,7 +55,7 @@ type HTTPRouteReconciler struct { // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;update func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("HTTPRoute", req.NamespacedName) + log := ctrl.LoggerFrom(ctx).WithValues("HTTPRoute", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) httproute := new(gatewayv1.HTTPRoute) diff --git a/internal/controller/ingress/httpsedge_controller.go b/internal/controller/ingress/httpsedge_controller.go index bc9a8e86..68e98e68 100644 --- a/internal/controller/ingress/httpsedge_controller.go +++ b/internal/controller/ingress/httpsedge_controller.go @@ -450,17 +450,19 @@ func (r *HTTPSEdgeReconciler) getMatchingRouteFromEdgeStatus(edge *ingressv1alph } //nolint:unused -func (r *HTTPSEdgeReconciler) listHTTPSEdgesForIPPolicy(obj client.Object) []reconcile.Request { - r.Log.Info("Listing HTTPSEdges for ip policy to determine if they need to be reconciled") +func (r *HTTPSEdgeReconciler) listHTTPSEdgesForIPPolicy(ctx context.Context, obj client.Object) []reconcile.Request { + log := ctrl.LoggerFrom(ctx) + + log.Info("Listing HTTPSEdges for ip policy to determine if they need to be reconciled") policy, ok := obj.(*ingressv1alpha1.IPPolicy) if !ok { - r.Log.Error(nil, "failed to convert object to IPPolicy", "object", obj) + log.Error(nil, "failed to convert object to IPPolicy", "object", obj) return []reconcile.Request{} } edges := &ingressv1alpha1.HTTPSEdgeList{} if err := r.Client.List(context.Background(), edges); err != nil { - r.Log.Error(err, "failed to list HTTPSEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) + log.Error(err, "failed to list HTTPSEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) return []reconcile.Request{} } @@ -486,7 +488,7 @@ func (r *HTTPSEdgeReconciler) listHTTPSEdgesForIPPolicy(obj client.Object) []rec } } - r.Log.Info("IPPolicy change triggered HTTPSEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) + log.Info("IPPolicy change triggered HTTPSEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) return recs } diff --git a/internal/controller/ingress/ingress_controller.go b/internal/controller/ingress/ingress_controller.go index ad901948..4d652e15 100644 --- a/internal/controller/ingress/ingress_controller.go +++ b/internal/controller/ingress/ingress_controller.go @@ -66,7 +66,7 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { // being watched (in our case, ingress objects). If you tail the controller // logs and delete, update, edit ingress objects, you see the events come in. func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("ingress", req.NamespacedName) + log := ctrl.LoggerFrom(ctx).WithValues("ingress", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) ingress := &netv1.Ingress{} diff --git a/internal/controller/ingress/ippolicy_controller.go b/internal/controller/ingress/ippolicy_controller.go index a9b6db6a..dd63b307 100644 --- a/internal/controller/ingress/ippolicy_controller.go +++ b/internal/controller/ingress/ippolicy_controller.go @@ -152,6 +152,8 @@ func (r *IPPolicyReconciler) delete(ctx context.Context, policy *ingressv1alpha1 } func (r *IPPolicyReconciler) createOrUpdateIPPolicyRules(ctx context.Context, policy *ingressv1alpha1.IPPolicy) error { + log := ctrl.LoggerFrom(ctx) + remoteRules, err := r.getRemotePolicyRules(ctx, policy.Status.ID) if err != nil { return err @@ -160,29 +162,29 @@ func (r *IPPolicyReconciler) createOrUpdateIPPolicyRules(ctx context.Context, po for iter.Next() { for _, d := range iter.NeedsDelete() { - r.Log.V(3).Info("Deleting IP Policy Rule", "id", d.ID, "policy.id", policy.Status.ID, "cidr", d.CIDR, "action", d.Action) + log.V(3).Info("Deleting IP Policy Rule", "id", d.ID, "policy.id", policy.Status.ID, "cidr", d.CIDR, "action", d.Action) if err := r.IPPolicyRulesClient.Delete(ctx, d.ID); err != nil { return err } - r.Log.V(3).Info("Deleted IP Policy Rule", "id", d.ID) + log.V(3).Info("Deleted IP Policy Rule", "id", d.ID) } for _, c := range iter.NeedsCreate() { - r.Log.V(3).Info("Creating IP Policy Rule", "policy.id", policy.Status.ID, "cidr", c.CIDR, "action", c.Action) + log.V(3).Info("Creating IP Policy Rule", "policy.id", policy.Status.ID, "cidr", c.CIDR, "action", c.Action) rule, err := r.IPPolicyRulesClient.Create(ctx, c) if err != nil { return err } - r.Log.V(3).Info("Created IP Policy Rule", "id", rule.ID, "policy.id", policy.Status.ID, "cidr", rule.CIDR, "action", rule.Action) + log.V(3).Info("Created IP Policy Rule", "id", rule.ID, "policy.id", policy.Status.ID, "cidr", rule.CIDR, "action", rule.Action) } for _, u := range iter.NeedsUpdate() { - r.Log.V(3).Info("Updating IP Policy Rule", "id", u.ID, "policy.id", policy.Status.ID, "cidr", u.CIDR, "metadata", u.Metadata, "description", u.Description) + log.V(3).Info("Updating IP Policy Rule", "id", u.ID, "policy.id", policy.Status.ID, "cidr", u.CIDR, "metadata", u.Metadata, "description", u.Description) rule, err := r.IPPolicyRulesClient.Update(ctx, u) if err != nil { return err } - r.Log.V(3).Info("Updated IP Policy Rule", "id", rule.ID, "policy.id", policy.Status.ID) + log.V(3).Info("Updated IP Policy Rule", "id", rule.ID, "policy.id", policy.Status.ID) } } diff --git a/internal/controller/ingress/service_controller.go b/internal/controller/ingress/service_controller.go index 09e1d0e1..04bad465 100644 --- a/internal/controller/ingress/service_controller.go +++ b/internal/controller/ingress/service_controller.go @@ -171,7 +171,7 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { // being watched (in our case, service objects). If you tail the controller // logs and delete, update, edit service objects, you see the events come in. func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithValues("service", req.NamespacedName) + log := ctrl.LoggerFrom(ctx).WithValues("service", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) svc := &corev1.Service{} @@ -281,10 +281,12 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } func (r *ServiceReconciler) findServicesForModuleSet(ctx context.Context, moduleSet client.Object) []reconcile.Request { + log := r.Log + moduleSetNamespace := moduleSet.GetNamespace() moduleSetName := moduleSet.GetName() - r.Log.V(3).Info("Finding services for module set", "namespace", moduleSetNamespace, "name", moduleSetName) + log.V(3).Info("Finding services for module set", "namespace", moduleSetNamespace, "name", moduleSetName) services := &corev1.ServiceList{} listOpts := &client.ListOptions{ Namespace: moduleSetNamespace, @@ -292,7 +294,7 @@ func (r *ServiceReconciler) findServicesForModuleSet(ctx context.Context, module } err := r.Client.List(ctx, services, listOpts) if err != nil { - r.Log.Error(err, "Failed to list services for module set") + log.Error(err, "Failed to list services for module set") return []reconcile.Request{} } @@ -306,16 +308,18 @@ func (r *ServiceReconciler) findServicesForModuleSet(ctx context.Context, module Name: svcName, }, } - r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) + log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) } return requests } func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, policy client.Object) []reconcile.Request { + log := r.Log + policyNamespace := policy.GetNamespace() policyName := policy.GetName() - r.Log.V(3).Info("Finding services for traffic policy", "namespace", policyNamespace, "name", policyName) + log.V(3).Info("Finding services for traffic policy", "namespace", policyNamespace, "name", policyName) services := &corev1.ServiceList{} listOpts := &client.ListOptions{ Namespace: policyNamespace, @@ -323,7 +327,7 @@ func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, po } err := r.Client.List(ctx, services, listOpts) if err != nil { - r.Log.Error(err, "Failed to list services for traffic policy") + log.Error(err, "Failed to list services for traffic policy") return []reconcile.Request{} } @@ -337,12 +341,14 @@ func (r *ServiceReconciler) findServicesForTrafficPolicy(ctx context.Context, po Name: svcName, }, } - r.Log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) + log.V(3).Info("Triggering reconciliation for service", "namespace", svcNamespace, "name", svcName) } return requests } func (r *ServiceReconciler) buildTunnelAndEdge(ctx context.Context, svc *corev1.Service) ([]client.Object, error) { + log := ctrl.LoggerFrom(ctx) + port := svc.Spec.Ports[0].Port objects := make([]client.Object, 0) @@ -371,13 +377,13 @@ func (r *ServiceReconciler) buildTunnelAndEdge(ctx context.Context, svc *corev1. // Get the modules from the service annotations moduleSets, err := getNgrokModuleSetForService(ctx, r.Client, svc) if err != nil { - r.Log.Error(err, "Failed to get module sets") + log.Error(err, "Failed to get module sets") return objects, err } policy, err := getNgrokTrafficPolicyForService(ctx, r.Client, svc) if err != nil { - r.Log.Error(err, "Failed to get traffic policy") + log.Error(err, "Failed to get traffic policy") return objects, err } diff --git a/internal/controller/ingress/tcpedge_controller.go b/internal/controller/ingress/tcpedge_controller.go index 513a7867..aa94c958 100644 --- a/internal/controller/ingress/tcpedge_controller.go +++ b/internal/controller/ingress/tcpedge_controller.go @@ -39,7 +39,6 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" @@ -83,7 +82,7 @@ func (r *TCPEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&ingressv1alpha1.TCPEdge{}). Watches( &ingressv1alpha1.IPPolicy{}, - handler.EnqueueRequestsFromMapFunc(r.listTCPEdgesForIPPolicy), + r.controller.NewEnqueueRequestForMapFunc(r.listTCPEdgesForIPPolicy), ). Complete(r) } @@ -134,7 +133,8 @@ func (r *TCPEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.TC if err != nil { return err } - r.Log.Info("Created new TCPEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) + + log.Info("Created new TCPEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) return r.updateEdge(ctx, edge, resp) } @@ -410,16 +410,18 @@ func (r *TCPEdgeReconciler) updateIPRestrictionModule(ctx context.Context, edge } func (r *TCPEdgeReconciler) listTCPEdgesForIPPolicy(ctx context.Context, obj client.Object) []reconcile.Request { - r.Log.Info("Listing TCPEdges for ip policy to determine if they need to be reconciled") + log := ctrl.LoggerFrom(ctx) + + log.Info("Listing TCPEdges for ip policy to determine if they need to be reconciled") policy, ok := obj.(*ingressv1alpha1.IPPolicy) if !ok { - r.Log.Error(nil, "failed to convert object to IPPolicy", "object", obj) + log.Error(nil, "failed to convert object to IPPolicy", "object", obj) return []reconcile.Request{} } edges := &ingressv1alpha1.TCPEdgeList{} if err := r.Client.List(ctx, edges); err != nil { - r.Log.Error(err, "failed to list TCPEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) + log.Error(err, "failed to list TCPEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) return []reconcile.Request{} } @@ -442,27 +444,29 @@ func (r *TCPEdgeReconciler) listTCPEdgesForIPPolicy(ctx context.Context, obj cli } } - r.Log.Info("IPPolicy change triggered TCPEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) + log.Info("IPPolicy change triggered TCPEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) return recs } func (r *TCPEdgeReconciler) updatePolicyModule(ctx context.Context, edge *ingressv1alpha1.TCPEdge, remoteEdge *ngrok.TCPEdge) error { + log := ctrl.LoggerFrom(ctx) + policy := edge.Spec.Policy client := r.NgrokClientset.EdgeModules().TCP().RawPolicy() // Early return if nothing to be done if policy == nil { if remoteEdge.Policy == nil { - r.Log.Info("Module matches desired state, skipping update", "module", "Policy", "comparison", routeModuleComparisonBothNil) + log.Info("Module matches desired state, skipping update", "module", "Policy", "comparison", routeModuleComparisonBothNil) return nil } - r.Log.Info("Deleting Policy module") + log.Info("Deleting Policy module") return client.Delete(ctx, edge.Status.ID) } - r.Log.Info("Updating Policy module") + log.Info("Updating Policy module") _, err := client.Replace(ctx, &ngrokapi.EdgeRawTCPPolicyReplace{ ID: remoteEdge.ID, Module: policy, diff --git a/internal/controller/ingress/tlsedge_controller.go b/internal/controller/ingress/tlsedge_controller.go index 041701fd..e9404e5a 100644 --- a/internal/controller/ingress/tlsedge_controller.go +++ b/internal/controller/ingress/tlsedge_controller.go @@ -40,7 +40,6 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/go-logr/logr" @@ -94,11 +93,11 @@ func (r *TLSEdgeReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&ingressv1alpha1.TLSEdge{}). Watches( &ingressv1alpha1.IPPolicy{}, - handler.EnqueueRequestsFromMapFunc(r.listTLSEdgesForIPPolicy), + r.controller.NewEnqueueRequestForMapFunc(r.listTLSEdgesForIPPolicy), ). Watches( &ingressv1alpha1.Domain{}, - handler.EnqueueRequestsFromMapFunc(r.listTLSEdgesForDomain), + r.controller.NewEnqueueRequestForMapFunc(r.listTLSEdgesForDomain), ) return controller.Complete(r) @@ -119,6 +118,8 @@ func (r *TLSEdgeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } func (r *TLSEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.TLSEdge) error { + log := ctrl.LoggerFrom(ctx) + if err := r.reconcileDomains(ctx, edge); err != nil { return err } @@ -138,7 +139,7 @@ func (r *TLSEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.TL } // No edge has been created for this edge, create one - r.Log.Info("Creating new TLSEdge", "namespace", edge.Namespace, "name", edge.Name) + log.Info("Creating new TLSEdge", "namespace", edge.Namespace, "name", edge.Name) resp, err = r.NgrokClientset.TLSEdges().Create(ctx, &ngrok.TLSEdgeCreate{ Hostports: edge.Spec.Hostports, Description: edge.Spec.Description, @@ -150,7 +151,7 @@ func (r *TLSEdgeReconciler) create(ctx context.Context, edge *ingressv1alpha1.TL if err != nil { return err } - r.Log.Info("Created new TLSEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) + log.Info("Created new TLSEdge", "edge.ID", resp.ID, "name", edge.Name, "namespace", edge.Namespace) return r.updateEdge(ctx, edge, resp) } @@ -440,16 +441,18 @@ func (r *TLSEdgeReconciler) updateIPRestrictionModule(ctx context.Context, edge } func (r *TLSEdgeReconciler) listTLSEdgesForIPPolicy(ctx context.Context, obj client.Object) []reconcile.Request { - r.Log.Info("Listing TLSEdges for ip policy to determine if they need to be reconciled") + log := ctrl.LoggerFrom(ctx) + + log.Info("Listing TLSEdges for ip policy to determine if they need to be reconciled") policy, ok := obj.(*ingressv1alpha1.IPPolicy) if !ok { - r.Log.Error(nil, "failed to convert object to IPPolicy", "object", obj) + log.Error(nil, "failed to convert object to IPPolicy", "object", obj) return []reconcile.Request{} } edges := &ingressv1alpha1.TLSEdgeList{} if err := r.Client.List(ctx, edges); err != nil { - r.Log.Error(err, "failed to list TLSEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) + log.Error(err, "failed to list TLSEdges for ippolicy", "name", policy.Name, "namespace", policy.Namespace) return []reconcile.Request{} } @@ -472,19 +475,21 @@ func (r *TLSEdgeReconciler) listTLSEdgesForIPPolicy(ctx context.Context, obj cli } } - r.Log.Info("IPPolicy change triggered TLSEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) + log.Info("IPPolicy change triggered TLSEdge reconciliation", "count", len(recs), "policy", policy.Name, "namespace", policy.Namespace) return recs } func (r *TLSEdgeReconciler) listTLSEdgesForDomain(ctx context.Context, obj client.Object) []reconcile.Request { - r.Log.Info("Listing TLSEdges for domain to determine if they need to be reconciled") + log := ctrl.LoggerFrom(ctx) + + log.Info("Listing TLSEdges for domain to determine if they need to be reconciled") domain, ok := obj.(*ingressv1alpha1.Domain) if !ok { - r.Log.Error(nil, "failed to convert object to Domain", "object", obj) + log.Error(nil, "failed to convert object to Domain", "object", obj) return []reconcile.Request{} } - log := ctrl.LoggerFrom(ctx).WithValues("domain", domain.Name, "namespace", domain.Namespace) + log = log.WithValues("domain", domain.Name, "namespace", domain.Namespace) edges := &ingressv1alpha1.TLSEdgeList{} if err := r.Client.List(ctx, edges); err != nil { @@ -519,22 +524,24 @@ func (r *TLSEdgeReconciler) listTLSEdgesForDomain(ctx context.Context, obj clien } func (r *TLSEdgeReconciler) updatePolicyModule(ctx context.Context, edge *ingressv1alpha1.TLSEdge, remoteEdge *ngrok.TLSEdge) error { + log := ctrl.LoggerFrom(ctx) + policy := edge.Spec.Policy client := r.NgrokClientset.EdgeModules().TLS().RawPolicy() // Early return if nothing to be done if policy == nil { if remoteEdge.Policy == nil { - r.Log.Info("Module matches desired state, skipping update", "module", "Policy", "comparison", routeModuleComparisonBothNil) + log.Info("Module matches desired state, skipping update", "module", "Policy", "comparison", routeModuleComparisonBothNil) return nil } - r.Log.Info("Deleting Policy module") + log.Info("Deleting Policy module") return client.Delete(ctx, edge.Status.ID) } - r.Log.Info("Updating Policy module") + log.Info("Updating Policy module") _, err := client.Replace(ctx, &ngrokapi.EdgeRawTLSPolicyReplace{ ID: remoteEdge.ID, Module: policy, diff --git a/internal/controller/ngrok/kubernetesoperator_controller.go b/internal/controller/ngrok/kubernetesoperator_controller.go index baf0d75c..04f85e81 100644 --- a/internal/controller/ngrok/kubernetesoperator_controller.go +++ b/internal/controller/ngrok/kubernetesoperator_controller.go @@ -70,7 +70,7 @@ type KubernetesOperatorReconciler struct { Recorder record.EventRecorder NgrokClientset ngrokapi.Clientset - // Namespace where the BindingConfiguration lives + // Namespace where the ngrok-operator is managing its resources Namespace string } @@ -109,13 +109,6 @@ func (r *KubernetesOperatorReconciler) SetupWithManager(mgr ctrl.Manager) error // +kubebuilder:rbac:groups=ngrok.k8s.ngrok.com,resources=kubernetesoperators/status,verbs=get;update;patch // +kubebuilder:rbac:groups=ngrok.k8s.ngrok.com,resources=kubernetesoperators/finalizers,verbs=update -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the BindingConfiguration object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.4/pkg/reconcile func (r *KubernetesOperatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {