diff --git a/go.mod b/go.mod index 2940785ad..92388fb88 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/mock v0.5.0 golang.org/x/sync v0.12.0 google.golang.org/protobuf v1.36.5 @@ -78,8 +80,6 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 0347fa1c9..7744cc76f 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -23,6 +23,7 @@ import ( "github.com/argoproj/gitops-engine/pkg/sync" "github.com/argoproj/gitops-engine/pkg/sync/common" "github.com/argoproj/gitops-engine/pkg/utils/kube" + "github.com/argoproj/gitops-engine/pkg/utils/tracing" ) const ( @@ -84,7 +85,7 @@ func (e *gitOpsEngine) Sync(ctx context.Context, return nil, err } opts = append(opts, sync.WithSkipHooks(!diffRes.Modified)) - syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...) + syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), tracing.NopTracer{}, "", "", opts...) if err != nil { return nil, err } diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 7d43899c9..c0d9151af 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sort" + "strconv" "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( "github.com/argoproj/gitops-engine/pkg/sync/hook" resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource" kubeutil "github.com/argoproj/gitops-engine/pkg/utils/kube" + "github.com/argoproj/gitops-engine/pkg/utils/tracing" ) type reconciledResource struct { @@ -209,6 +211,8 @@ func NewSyncContext( kubectl kubeutil.Kubectl, namespace string, openAPISchema openapi.Resources, + syncTracer tracing.Tracer, + syncTraceID, syncTraceRootSpanID string, opts ...SyncOpt, ) (SyncContext, func(), error) { dynamicIf, err := dynamic.NewForConfig(restConfig) @@ -246,6 +250,9 @@ func NewSyncContext( permissionValidator: func(_ *unstructured.Unstructured, _ *metav1.APIResource) error { return nil }, + syncTracer: syncTracer, + syncTraceID: syncTraceID, + syncTraceRootSpanID: syncTraceRootSpanID, } for _, opt := range opts { opt(ctx) @@ -357,6 +364,11 @@ type syncContext struct { // lock to protect concurrent updates of the result list lock sync.Mutex + // tracer for tracing the sync operation + syncTraceID string + syncTraceRootSpanID string + syncTracer tracing.Tracer + // syncNamespace is a function that will determine if the managed // namespace should be synced syncNamespace func(*unstructured.Unstructured, *unstructured.Unstructured) (bool, error) @@ -1262,6 +1274,8 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState { ss.Go(func(state runState) runState { logCtx := sc.log.WithValues("dryRun", dryRun, "task", t) logCtx.V(1).Info("Pruning") + span := sc.createSpan("pruneObject", dryRun) + defer span.Finish() result, message := sc.pruneObject(t.liveObj, sc.prune, dryRun) if result == common.ResultCodeSyncFailed { state = failed @@ -1270,6 +1284,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState { if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed { sc.setResourceResult(t, result, operationPhases[result], message) } + sc.setBaggageItemForTasks(&span, t, message, result, operationPhases[result]) return state }) } @@ -1289,19 +1304,27 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState { ss.Go(func(state runState) runState { sc.log.WithValues("dryRun", dryRun, "task", t).V(1).Info("Deleting") if !dryRun { + span := sc.createSpan("hooksDeletion", dryRun) + defer span.Finish() err := sc.deleteResource(t) + message := "deleted" + operationPhase := common.OperationRunning if err != nil { // it is possible to get a race condition here, such that the resource does not exist when // delete is requested, we treat this as a nop if !apierrors.IsNotFound(err) { state = failed - sc.setResourceResult(t, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err)) + message = fmt.Sprintf("failed to delete resource: %v", err) + operationPhase = common.OperationError + sc.setResourceResult(t, "", operationPhase, message) } } else { // if there is anything that needs deleting, we are at best now in pending and // want to return and wait for sync to be invoked again state = pending + operationPhase = common.OperationSucceeded } + sc.setBaggageItemForTasks(&span, t, message, "", operationPhase) } return state }) @@ -1330,6 +1353,24 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState { return state } +func (sc *syncContext) createSpan(operation string, dryrun bool) tracing.Span { + // skip tracing if dryrun + if dryrun || sc.syncTracer == nil { + return tracing.NopTracer{}.StartSpan(operation) + } + return sc.syncTracer.StartSpanFromTraceParent(operation, sc.syncTraceID, sc.syncTraceRootSpanID) +} + +func (sc *syncContext) setBaggageItemForTasks(span *tracing.Span, t *syncTask, message string, result common.ResultCode, operationPhase common.OperationPhase) { + resourceKey := t.resourceKey() + (*span).SetBaggageItem("resource", resourceKey.String()) + (*span).SetBaggageItem("result", string(result)) + (*span).SetBaggageItem("operationPhase", string(operationPhase)) + (*span).SetBaggageItem("message", message) + (*span).SetBaggageItem("phase", string(t.phase)) + (*span).SetBaggageItem("wave", strconv.Itoa(t.wave())) +} + func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRun bool) runState { ss := newStateSync(state) for _, task := range tasks { @@ -1341,11 +1382,14 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu logCtx := sc.log.WithValues("dryRun", dryRun, "task", t) logCtx.V(1).Info("Applying") validate := sc.validate && !resourceutil.HasAnnotationOption(t.targetObj, common.AnnotationSyncOptions, common.SyncOptionsDisableValidation) + span := sc.createSpan("applyObject", dryRun) + defer span.Finish() result, message := sc.applyObject(t, dryRun, validate) if result == common.ResultCodeSyncFailed { logCtx.WithValues("message", message).Info("Apply failed") state = failed } + var phase common.OperationPhase if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed { phase := operationPhases[result] // no resources are created in dry-run, so running phase means validation was @@ -1355,6 +1399,7 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu } sc.setResourceResult(t, result, phase, message) } + sc.setBaggageItemForTasks(&span, t, message, result, phase) return state }) } diff --git a/pkg/utils/tracing/api.go b/pkg/utils/tracing/api.go index 89670a67b..29148962f 100644 --- a/pkg/utils/tracing/api.go +++ b/pkg/utils/tracing/api.go @@ -8,9 +8,12 @@ package tracing type Tracer interface { StartSpan(operationName string) Span + StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span } type Span interface { SetBaggageItem(key string, value any) Finish() + SpanID() string + TraceID() string } diff --git a/pkg/utils/tracing/logging.go b/pkg/utils/tracing/logging.go index fd0619f99..2c4c30f83 100644 --- a/pkg/utils/tracing/logging.go +++ b/pkg/utils/tracing/logging.go @@ -30,6 +30,12 @@ func (l LoggingTracer) StartSpan(operationName string) Span { } } +// loggingSpan is not a real distributed tracing system. +// so no need to implement real StartSpanFromTraceParent method. +func (l LoggingTracer) StartSpanFromTraceParent(operationName string, _, _ string) Span { + return l.StartSpan(operationName) +} + type loggingSpan struct { logger logr.Logger operationName string @@ -54,3 +60,11 @@ func baggageToVals(baggage map[string]any) []any { } return result } + +func (s loggingSpan) TraceID() string { + return "" +} + +func (s loggingSpan) SpanID() string { + return "" +} diff --git a/pkg/utils/tracing/nop.go b/pkg/utils/tracing/nop.go index e39b67b99..3af4c725d 100644 --- a/pkg/utils/tracing/nop.go +++ b/pkg/utils/tracing/nop.go @@ -11,6 +11,10 @@ func (n NopTracer) StartSpan(_ string) Span { return nopSpan{} } +func (n NopTracer) StartSpanFromTraceParent(_, _, _ string) Span { + return nopSpan{} +} + type nopSpan struct{} func (n nopSpan) SetBaggageItem(_ string, _ any) { @@ -18,3 +22,11 @@ func (n nopSpan) SetBaggageItem(_ string, _ any) { func (n nopSpan) Finish() { } + +func (n nopSpan) TraceID() string { + return "" +} + +func (n nopSpan) SpanID() string { + return "" +} diff --git a/pkg/utils/tracing/opentelemetry.go b/pkg/utils/tracing/opentelemetry.go new file mode 100644 index 000000000..d4fcb643c --- /dev/null +++ b/pkg/utils/tracing/opentelemetry.go @@ -0,0 +1,56 @@ +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type OpenTelemetryTracer struct { + realTracer trace.Tracer +} + +func NewOpenTelemetryTracer(t trace.Tracer) Tracer { + return &OpenTelemetryTracer{ + realTracer: t, + } +} + +func (t OpenTelemetryTracer) StartSpan(operationName string) Span { + _, realspan := t.realTracer.Start(context.Background(), operationName) + return openTelemetrySpan{realSpan: realspan} +} + +func (t OpenTelemetryTracer) StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span { + traceID, _ := trace.TraceIDFromHex(parentTraceId) + parentSpanID, _ := trace.SpanIDFromHex(parentSpanId) + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: parentSpanID, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(context.Background(), spanCtx) + _, realSpan := t.realTracer.Start(ctx, operationName) + return openTelemetrySpan{realSpan: realSpan} +} + +type openTelemetrySpan struct { + realSpan trace.Span +} + +func (s openTelemetrySpan) SetBaggageItem(key string, value any) { + s.realSpan.SetAttributes(attribute.Key(key).String(value.(string))) +} + +func (s openTelemetrySpan) Finish() { + s.realSpan.End() +} + +func (s openTelemetrySpan) TraceID() string { + return s.realSpan.SpanContext().TraceID().String() +} + +func (s openTelemetrySpan) SpanID() string { + return s.realSpan.SpanContext().SpanID().String() +}