Skip to content

Commit

Permalink
Standardize logging pattern using ctrl.LoggerFrom(ctx) (#462)
Browse files Browse the repository at this point in the history
<!-- Thank you for contributing! Please make sure that your code changes
are covered with tests. In case of new features or big changes remember
to adjust the documentation.

In case of an existing issue, reference it using one of the following:

closes: #ISSUE
related: #ISSUE

How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->

## What
*Describe what the change is solving*

## How
*Describe the solution*

## Breaking Changes
*Are there any breaking changes in this PR?*
No.
  • Loading branch information
hjkatz authored Oct 24, 2024
2 parents 63cd4d4 + 20c2ae1 commit 0fde978
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 97 deletions.
13 changes: 13 additions & 0 deletions internal/controller/base_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// BaseControllerOp is an enum for the different operations that can be performed by a BaseController
Expand Down Expand Up @@ -114,6 +116,17 @@ func (self *BaseController[T]) Reconcile(ctx context.Context, req ctrl.Request,
return ctrl.Result{}, nil
}

// NewEnqueueRequestForMapFunc wraps a map function to be used as an event handler.
// It also takes care to make sure that the controllers logger is passed through to the map function, so
// that we can use our common pattern of getting the logger from the context.
func (self *BaseController[T]) NewEnqueueRequestForMapFunc(f func(ctx context.Context, obj client.Object) []reconcile.Request) handler.EventHandler {
wrappedFunc := func(ctx context.Context, obj client.Object) []reconcile.Request {
ctx = ctrl.LoggerInto(ctx, self.Log)
return f(ctx, obj)
}
return handler.EnqueueRequestsFromMapFunc(wrappedFunc)
}

// handleErr is a helper function to handle errors in the controller. If an ErrResult function is not provided,
// it will use the default CtrlResultForErr function.
func (self *BaseController[T]) handleErr(op BaseControllerOp, obj T, err error) (ctrl.Result, error) {
Expand Down
21 changes: 11 additions & 10 deletions internal/controller/bindings/endpointbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -141,11 +140,11 @@ func (r *EndpointBindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&bindingsv1alpha1.EndpointBinding{}).
Watches(
&v1.Service{},
handler.EnqueueRequestsFromMapFunc(r.findEndpointBindingsForService),
r.controller.NewEnqueueRequestForMapFunc(r.findEndpointBindingsForService),
).
Watches(
&v1.Namespace{},
handler.EnqueueRequestsFromMapFunc(r.findEndpointBindingsForNamespace),
r.controller.NewEnqueueRequestForMapFunc(r.findEndpointBindingsForNamespace),
).
Complete(r)
}
Expand All @@ -159,7 +158,7 @@ func (r *EndpointBindingReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

func (r *EndpointBindingReconciler) create(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error {
targetService, upstreamService := r.convertEndpointBindingToServices(cr)
targetService, upstreamService := r.convertEndpointBindingToServices(ctx, cr)

if err := r.createUpstreamService(ctx, cr, upstreamService); err != nil {
return err
Expand Down Expand Up @@ -208,7 +207,7 @@ func (r *EndpointBindingReconciler) createUpstreamService(ctx context.Context, o
func (r *EndpointBindingReconciler) update(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

desiredTargetService, desiredUpstreamService := r.convertEndpointBindingToServices(cr)
desiredTargetService, desiredUpstreamService := r.convertEndpointBindingToServices(ctx, cr)

var existingTargetService v1.Service
var existingUpstreamService v1.Service
Expand Down Expand Up @@ -278,7 +277,7 @@ func (r *EndpointBindingReconciler) update(ctx context.Context, cr *bindingsv1al
func (r *EndpointBindingReconciler) delete(ctx context.Context, cr *bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

targetService, upstreamService := r.convertEndpointBindingToServices(cr)
targetService, upstreamService := r.convertEndpointBindingToServices(ctx, cr)
if err := r.Client.Delete(ctx, targetService); err != nil {
if client.IgnoreNotFound(err) == nil {
return nil
Expand Down Expand Up @@ -307,7 +306,9 @@ func (r *EndpointBindingReconciler) errResult(op controller.BaseControllerOp, cr
}

// convertEndpointBindingToServices converts an EndpointBinding into 2 Services: Target(ExternalName) and Upstream(Pod Forwarders)
func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBinding *bindingsv1alpha1.EndpointBinding) (*v1.Service, *v1.Service) {
func (r *EndpointBindingReconciler) convertEndpointBindingToServices(ctx context.Context, endpointBinding *bindingsv1alpha1.EndpointBinding) (*v1.Service, *v1.Service) {
log := ctrl.LoggerFrom(ctx)

// Send traffic to any Node in the cluster
internalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster

Expand All @@ -319,7 +320,7 @@ func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBin
parts := strings.Split(label, "=")

if len(parts) != 2 {
r.Log.Error(fmt.Errorf("invalid Pod Forwarder label: %s", label), "invalid Pod Forwarder label")
log.Error(fmt.Errorf("invalid Pod Forwarder label: %s", label), "invalid Pod Forwarder label")
}

podForwarderSelector[parts[0]] = parts[1]
Expand Down Expand Up @@ -406,7 +407,7 @@ func (r *EndpointBindingReconciler) convertEndpointBindingToServices(endpointBin

func (r *EndpointBindingReconciler) findEndpointBindingsForNamespace(ctx context.Context, namespace client.Object) []reconcile.Request {
nsName := namespace.GetName()
log := r.Log.WithValues("namespace", nsName)
log := ctrl.LoggerFrom(ctx).WithValues("namespace", nsName)

log.V(3).Info("Finding endpoint bindings for namespace")
endpointBindings := &bindingsv1alpha1.EndpointBindingList{}
Expand Down Expand Up @@ -437,7 +438,7 @@ func (r *EndpointBindingReconciler) findEndpointBindingsForNamespace(ctx context
}

func (r *EndpointBindingReconciler) findEndpointBindingsForService(ctx context.Context, svc client.Object) []reconcile.Request {
log := r.Log.WithValues("service.name", svc.GetName(), "service.namespace", svc.GetNamespace())
log := ctrl.LoggerFrom(ctx).WithValues("service.name", svc.GetName(), "service.namespace", svc.GetNamespace())
log.V(3).Info("Finding endpoint bindings for service")

svcLabels := svc.GetLabels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ SOFTWARE.
package bindings

import (
"context"
"testing"

bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1"
Expand Down Expand Up @@ -65,7 +66,7 @@ func Test_convertEndpointBindingToServices(t *testing.T) {
},
}

targetService, upstreamService := controller.convertEndpointBindingToServices(endpointBinding)
targetService, upstreamService := controller.convertEndpointBindingToServices(context.TODO(), endpointBinding)

assert.Equal(targetService.Name, "client-service")
assert.Equal(targetService.Namespace, "client-namespace")
Expand Down
72 changes: 46 additions & 26 deletions internal/controller/bindings/endpointbinding_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -58,34 +59,38 @@ type EndpointBindingPoller struct {

// Start implements the manager.Runnable interface.
func (r *EndpointBindingPoller) Start(ctx context.Context) error {
r.Log.Info("Starting the BindingConfiguration polling routine")
log := ctrl.LoggerFrom(ctx)

log.Info("Starting the BindingConfiguration polling routine")
r.stopCh = make(chan struct{})
defer close(r.stopCh)
go r.startPollingAPI(ctx)
<-ctx.Done()
r.Log.Info("Stopping the BindingConfiguration polling routine")
log.Info("Stopping the BindingConfiguration polling routine")
return nil
}

// startPollingAPI polls a mock API every over a polling interval and updates the BindingConfiguration's status.
func (r *EndpointBindingPoller) startPollingAPI(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)

ticker := time.NewTicker(r.PollingInterval)
defer ticker.Stop()

// Reconcile on startup
if err := r.reconcileEndpointBindingsFromAPI(ctx); err != nil {
r.Log.Error(err, "Failed to update binding_endpoints from API")
log.Error(err, "Failed to update binding_endpoints from API")
}

for {
select {
case <-ticker.C:
r.Log.Info("Polling API for binding_endpoints")
log.Info("Polling API for binding_endpoints")
if err := r.reconcileEndpointBindingsFromAPI(ctx); err != nil {
r.Log.Error(err, "Failed to update binding_endpoints from API")
log.Error(err, "Failed to update binding_endpoints from API")
}
case <-r.stopCh:
r.Log.Info("Stopping API polling")
log.Info("Stopping API polling")
return
}
}
Expand All @@ -94,13 +99,16 @@ func (r *EndpointBindingPoller) startPollingAPI(ctx context.Context) {
// reconcileEndpointBindingsFromAPI fetches the desired binding_endpoints for this kubernetes operator binding
// then creates, updates, or deletes the EndpointBindings in-cluster
func (r *EndpointBindingPoller) reconcileEndpointBindingsFromAPI(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)

if r.reconcilingCancel != nil {
r.reconcilingCancel() // cancel the previous reconcile loop
}

// Fetch the mock endpoint data from the API
resp, err := fetchEndpoints()
if err != nil {
log.Error(err, "Failed to fetch binding_endpoints from API")
return err
}

Expand Down Expand Up @@ -137,7 +145,7 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingsFromAPI(ctx context.Con
// reassign port allocations
r.portAllocator = currentPortAllocations

toCreate, toUpdate, toDelete := r.filterEndpointBindingActions(existingEndpointBindings, desiredEndpointBindings)
toCreate, toUpdate, toDelete := r.filterEndpointBindingActions(ctx, existingEndpointBindings, desiredEndpointBindings)

// create context + errgroup for managing/closing the future goroutine in the reconcile actions loops
errGroup, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -167,6 +175,8 @@ type endpointBindingActionFn func(context.Context, bindingsv1alpha1.EndpointBind
// reconcileEndpointBindingAction runs a goroutine to try and process a list of EndpointBindings
// for their desired action over and over again until stopChan is closed or receives a value
func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Context, errGroup *errgroup.Group, endpointBindings []bindingsv1alpha1.EndpointBinding, actionMsg string, action endpointBindingActionFn) {
log := ctrl.LoggerFrom(ctx)

errGroup.Go(func() error {
// attempt reconciliation actions every so often
ticker := time.NewTicker(5 * time.Second)
Expand All @@ -179,10 +189,10 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte
select {
// stop go routine and return, there is a new reconcile poll happening actively
case <-ctx.Done():
r.Log.Error(ctx.Err(), "Reconcile context canceled, stopping EndpointBinding reconcile loop early", "action", actionMsg)
log.Error(ctx.Err(), "Reconcile context canceled, stopping EndpointBinding reconcile loop early", "action", actionMsg)
return nil
case <-ticker.C:
r.Log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)
log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
}
Expand All @@ -193,7 +203,7 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte
for _, binding := range remainingBindings {
if err := action(ctx, binding); err != nil {
name := hashURI(binding.Spec.EndpointURI)
r.Log.Error(err, "Failed to reconcile EndpointBinding", "action", actionMsg, "name", name, "uri", binding.Spec.EndpointURI)
log.Error(err, "Failed to reconcile EndpointBinding", "action", actionMsg, "name", name, "uri", binding.Spec.EndpointURI)
failedBindings = append(failedBindings, binding)
}
}
Expand All @@ -208,12 +218,14 @@ func (r *EndpointBindingPoller) reconcileEndpointBindingAction(ctx context.Conte
// filterEndpointBindingActions takse 2 sets of existing and desired EndpointBindings
// and returns 3 lists: toCreate, toUpdate, toDelete
// representing the actions needed to reconcile the existing set with the desired set
func (r *EndpointBindingPoller) filterEndpointBindingActions(existingEndpointBindings []bindingsv1alpha1.EndpointBinding, desiredEndpoints ngrokapi.AggregatedEndpoints) (toCreate []bindingsv1alpha1.EndpointBinding, toUpdate []bindingsv1alpha1.EndpointBinding, toDelete []bindingsv1alpha1.EndpointBinding) {
func (r *EndpointBindingPoller) filterEndpointBindingActions(ctx context.Context, existingEndpointBindings []bindingsv1alpha1.EndpointBinding, desiredEndpoints ngrokapi.AggregatedEndpoints) (toCreate []bindingsv1alpha1.EndpointBinding, toUpdate []bindingsv1alpha1.EndpointBinding, toDelete []bindingsv1alpha1.EndpointBinding) {
log := ctrl.LoggerFrom(ctx)

toCreate = []bindingsv1alpha1.EndpointBinding{}
toUpdate = []bindingsv1alpha1.EndpointBinding{}
toDelete = []bindingsv1alpha1.EndpointBinding{}

r.Log.V(9).Info("Filtering EndpointBindings", "existing", existingEndpointBindings, "desired", desiredEndpoints)
log.V(9).Info("Filtering EndpointBindings", "existing", existingEndpointBindings, "desired", desiredEndpoints)

for _, existingEndpointBinding := range existingEndpointBindings {
uri := existingEndpointBinding.Spec.EndpointURI
Expand Down Expand Up @@ -253,6 +265,8 @@ func (r *EndpointBindingPoller) filterEndpointBindingActions(existingEndpointBin

// TODO: Metadata
func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

name := hashURI(desired.Spec.EndpointURI)

// allocate a port
Expand Down Expand Up @@ -284,27 +298,27 @@ func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindi
},
}

r.Log.Info("Creating new EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI)
log.Info("Creating new EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI)
if err := r.Create(ctx, toCreate); err != nil {
if client.IgnoreAlreadyExists(err) == nil {
r.Log.Info("EndpointBinding already exists, skipping create...", "name", name, "uri", toCreate.Spec.EndpointURI)
log.Info("EndpointBinding already exists, skipping create...", "name", name, "uri", toCreate.Spec.EndpointURI)

if toCreate.Status.HashedName != "" && len(toCreate.Status.Endpoints) > 0 {
// Status is filled, no need to update
return nil
} else {
// intentionally blonk
// we want to fall through and fill in the status
r.Log.Info("EndpointBinding already exists, but status is empty, filling in status...", "name", name, "uri", toCreate.Spec.EndpointURI, "toCreate", toCreate)
log.Info("EndpointBinding already exists, but status is empty, filling in status...", "name", name, "uri", toCreate.Spec.EndpointURI, "toCreate", toCreate)

// refresh the toCreate object with existing data
if err := r.Get(ctx, client.ObjectKey{Namespace: r.Namespace, Name: name}, toCreate); err != nil {
r.Log.Error(err, "Failed to get existing EndpointBinding, skipping status update...", "name", name, "uri", toCreate.Spec.EndpointURI)
log.Error(err, "Failed to get existing EndpointBinding, skipping status update...", "name", name, "uri", toCreate.Spec.EndpointURI)
return nil
}
}
} else {
r.Log.Error(err, "Failed to create EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI)
log.Error(err, "Failed to create EndpointBinding", "name", name, "uri", toCreate.Spec.EndpointURI)
r.Recorder.Event(toCreate, v1.EventTypeWarning, "Created", fmt.Sprintf("Failed to create EndpointBinding: %v", err))
return err
}
Expand Down Expand Up @@ -338,24 +352,26 @@ func (r *EndpointBindingPoller) createBinding(ctx context.Context, desired bindi
}

func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

desiredName := hashURI(desired.Spec.EndpointURI)

var existing bindingsv1alpha1.EndpointBinding
err := r.Get(ctx, client.ObjectKey{Namespace: r.Namespace, Name: desiredName}, &existing)
if err != nil {
if client.IgnoreNotFound(err) == nil {
// EndpointBinding doesn't exist, create it on the next polling loop
r.Log.Info("Unable to find existing EndpointBinding, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI)
log.Info("Unable to find existing EndpointBinding, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI)
return nil // not an error
} else {
// real error
r.Log.Error(err, "Failed to find existing EndpointBinding", "name", desiredName, "uri", desired.Spec.EndpointURI)
log.Error(err, "Failed to find existing EndpointBinding", "name", desiredName, "uri", desired.Spec.EndpointURI)
return err
}
}

if !endpointBindingNeedsUpdate(existing, desired) {
r.Log.Info("EndpointBinding already matches existing state, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI)
log.Info("EndpointBinding already matches existing state, skipping update...", "name", desiredName, "uri", desired.Spec.EndpointURI)
return nil
}

Expand All @@ -367,9 +383,9 @@ func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindi
toUpdate.Spec.Target = desired.Spec.Target
toUpdate.Spec.EndpointURI = desired.Spec.EndpointURI

r.Log.Info("Updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
log.Info("Updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
if err := r.Update(ctx, toUpdate); err != nil {
r.Log.Error(err, "Failed updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
log.Error(err, "Failed updating EndpointBinding", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
r.Recorder.Event(toUpdate, v1.EventTypeWarning, "Updated", fmt.Sprintf("Failed to update EndpointBinding: %v", err))
return err
}
Expand Down Expand Up @@ -402,11 +418,13 @@ func (r *EndpointBindingPoller) updateBinding(ctx context.Context, desired bindi
}

func (r *EndpointBindingPoller) deleteBinding(ctx context.Context, endpointBinding bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

if err := r.Delete(ctx, &endpointBinding); err != nil {
r.Log.Error(err, "Failed to delete EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI)
log.Error(err, "Failed to delete EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI)
return err
} else {
r.Log.Info("Deleted EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI)
log.Info("Deleted EndpointBinding", "name", endpointBinding.Name, "uri", endpointBinding.Spec.EndpointURI)

// unset the port allocation
r.portAllocator.Unset(endpointBinding.Spec.Port)
Expand All @@ -416,15 +434,17 @@ func (r *EndpointBindingPoller) deleteBinding(ctx context.Context, endpointBindi
}

func (r *EndpointBindingPoller) updateBindingStatus(ctx context.Context, desired *bindingsv1alpha1.EndpointBinding) error {
log := ctrl.LoggerFrom(ctx)

toUpdate := desired
toUpdate.Status = desired.Status

if err := r.Status().Update(ctx, toUpdate); err != nil {
r.Log.Error(err, "Failed to update EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
log.Error(err, "Failed to update EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
return err
}

r.Log.Info("Updated EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
log.Info("Updated EndpointBinding status", "name", toUpdate.Name, "uri", toUpdate.Spec.EndpointURI)
return nil
}

Expand Down
Loading

0 comments on commit 0fde978

Please sign in to comment.