diff --git a/.chloggen/exporter-helper-cleanup-error-logs.yaml b/.chloggen/exporter-helper-cleanup-error-logs.yaml new file mode 100755 index 00000000000..28ed9ea5bc0 --- /dev/null +++ b/.chloggen/exporter-helper-cleanup-error-logs.yaml @@ -0,0 +1,27 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporters + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Cleanup log messages for export failures + +# One or more tracking issues or pull requests related to the change +issues: [9219] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + 1. Ensure an error message is logged every time and only once when data is dropped/rejected due to export failure. + 2. Update the wording. Specifically, don't use "dropped" term when an error is reported back to the pipeline. + Keep the "dropped" wording for failures happened after the enabled queue. + 3. Properly report any error reported by a queue. For example, a persistent storage error must be reported as a storage error, not as "queue overflow". + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 2c5a4e96692..b5e7aa39a33 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -38,20 +38,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) { b.nextSender = nextSender } -type errorLoggingRequestSender struct { - baseRequestSender - logger *zap.Logger - message string -} - -func (l *errorLoggingRequestSender) send(ctx context.Context, req Request) error { - err := l.baseRequestSender.send(ctx, req) - if err != nil { - l.logger.Error(l.message, zap.Int("dropped_items", req.ItemsCount()), zap.Error(err)) - } - return err -} - type obsrepSenderFactory func(obsrep *ObsReport) requestSender // Option apply changes to baseExporter. @@ -86,10 +72,7 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option { func WithRetry(config configretry.BackOffConfig) Option { return func(o *baseExporter) { if !config.Enabled { - o.retrySender = &errorLoggingRequestSender{ - logger: o.set.Logger, - message: "Exporting failed. Try enabling retry_on_failure config option to retry on retryable errors", - } + o.exportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors." return } o.retrySender = newRetrySender(config, o.set) @@ -105,13 +88,14 @@ func WithQueue(config QueueSettings) Option { panic("queueing is not available for the new request exporters yet") } if !config.Enabled { - o.queueSender = &errorLoggingRequestSender{ - logger: o.set.Logger, - message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - } + o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return } - o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler) + consumeErrHandler := func(err error, req Request) { + o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + } + o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler) } } @@ -137,6 +121,9 @@ type baseExporter struct { set exporter.CreateSettings obsrep *ObsReport + // Message for the user to be added with an export failure message. + exportFailureMessage string + // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. @@ -182,7 +169,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req // send sends the request using the first sender in the chain. func (be *baseExporter) send(ctx context.Context, req Request) error { - return be.queueSender.send(ctx, req) + err := be.queueSender.send(ctx, req) + if err != nil { + be.set.Logger.Error("Exporting failed. Rejecting data."+be.exportFailureMessage, + zap.Error(err), zap.Int("rejected_items", req.ItemsCount())) + } + return err } // connectSenders connects the senders in the predefined order. diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 423b7657e10..1ee3c1ad5d1 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -17,7 +17,6 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig" @@ -86,7 +85,7 @@ type queueSender struct { } func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, - marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *queueSender { + marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender { isPersistent := config.StorageID != nil var queue internal.Queue[Request] @@ -114,21 +113,15 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co logger: set.TelemetrySettings.Logger, meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), } - qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume) - return qs -} - -// consume is the function that is executed by the queue consumers to send the data to the next consumerSender. -func (qs *queueSender) consume(ctx context.Context, req Request) error { - err := qs.nextSender.send(ctx, req) - if err != nil && !consumererror.IsPermanent(err) { - qs.logger.Error( - "Exporting failed. No more retries left. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.ItemsCount()), - ) + consumeFunc := func(ctx context.Context, req Request) error { + err := qs.nextSender.send(ctx, req) + if err != nil { + consumeErrHandler(err, req) + } + return err } - return err + qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc) + return qs } // Start is invoked during service startup. @@ -210,11 +203,7 @@ func (qs *queueSender) send(ctx context.Context, req Request) error { span := trace.SpanFromContext(c) if err := qs.queue.Offer(c, req); err != nil { - qs.logger.Error( - "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.ItemsCount()), - ) - span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) + span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute)) return err } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 831098c3308..3f3b2fc8105 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -81,17 +83,23 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } -func TestQueuedRetry_DropOnFull(t *testing.T) { +func TestQueuedRetry_RejectOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 qCfg.NumConsumers = 0 - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg)) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) require.Error(t, be.send(context.Background(), newMockRequest(2, nil))) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) + assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -223,8 +231,11 @@ func TestQueueSettings_Validate(t *testing.T) { func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.IsType(t, &errorLoggingRequestSender{}, be.queueSender) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, + WithQueue(qs)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -232,6 +243,8 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { ocs.run(func() { require.Error(t, be.send(context.Background(), mockR)) }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) @@ -239,6 +252,21 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } +func TestQueueFailedRequestDropped(t *testing.T) { + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueSettings())) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + mockR := newMockRequest(2, errors.New("some error")) + require.NoError(t, be.send(context.Background(), mockR)) + require.NoError(t, be.Shutdown(context.Background())) + mockR.checkNumRequests(t, 1) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) +} + func TestQueuedRetryPersistenceEnabled(t *testing.T) { tt, err := componenttest.SetupTelemetry(defaultID) require.NoError(t, err) @@ -331,7 +359,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { } func TestQueueSenderNoStartShutdown(t *testing.T) { - qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil) + qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil, nil) assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 1de3a23c587..1bf24157898 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -93,19 +93,14 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { // Immediately drop data on permanent errors. if consumererror.IsPermanent(err) { - rs.logger.Error( - "Exporting failed. The error is not retryable. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.ItemsCount()), - ) - return err + return fmt.Errorf("not retryable error: %w", err) } req = extractPartialRequest(req, err) backoffDelay := expBackoff.NextBackOff() if backoffDelay == backoff.Stop { - return fmt.Errorf("max elapsed time expired %w", err) + return fmt.Errorf("no more retries left: %w", err) } throttleErr := throttleRetry{} diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 99efd945794..06154d5fbc2 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -17,6 +17,8 @@ import ( "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/tag" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" @@ -239,8 +241,10 @@ func TestQueueRetryWithNoQueue(t *testing.T) { func TestQueueRetryWithDisabledRetires(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) - require.IsType(t, &errorLoggingRequestSender{}, be.retrySender) + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -248,6 +252,9 @@ func TestQueueRetryWithDisabledRetires(t *testing.T) { ocs.run(func() { require.Error(t, be.send(context.Background(), mockR)) }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0)