diff --git a/appender.go b/appender.go index bfd2fda..0a2aa66 100644 --- a/appender.go +++ b/appender.go @@ -60,21 +60,9 @@ var ( // Up to `config.MaxRequests` bulk requests may be flushing/active concurrently, to allow the // server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests. type Appender struct { - // legacy metrics for Stats() - bulkRequests int64 - docsAdded int64 - docsActive int64 - docsFailed int64 - docsFailedClient int64 - docsFailedServer int64 - docsIndexed int64 - tooManyRequests int64 - bytesTotal int64 - bytesUncompressedTotal int64 - availableBulkRequests int64 - activeCreated int64 - activeDestroyed int64 - blockedAdd int64 + // Used internally to calculate indexFailureRate for scaling reasons only. + docsAdded atomic.Int64 + tooManyRequests atomic.Int64 scalingInfo atomic.Value @@ -137,8 +125,8 @@ func New(client elastictransport.Interface, cfg Config) (*Appender, error) { // Register the Appender ID in the pool. indexer.id = fmt.Sprintf("%p", indexer) indexer.pool.Register(indexer.id) - indexer.addUpDownCount(int64(cfg.MaxRequests), &indexer.availableBulkRequests, ms.availableBulkRequests) - + attrs := metric.WithAttributeSet(indexer.config.MetricAttributes) + indexer.metrics.availableBulkRequests.Add(context.Background(), int64(cfg.MaxRequests), attrs) // We create a cancellable context for the errgroup.Group for unblocking // flushes when Close returns. We intentionally do not use errgroup.WithContext, // because one flush failure should not cause the context to be cancelled. @@ -197,26 +185,6 @@ func (a *Appender) Close(ctx context.Context) error { return nil } -// Stats returns the bulk indexing stats. -func (a *Appender) Stats() Stats { - return Stats{ - Added: atomic.LoadInt64(&a.docsAdded), - Active: atomic.LoadInt64(&a.docsActive), - BulkRequests: atomic.LoadInt64(&a.bulkRequests), - Failed: atomic.LoadInt64(&a.docsFailed), - FailedClient: atomic.LoadInt64(&a.docsFailedClient), - FailedServer: atomic.LoadInt64(&a.docsFailedServer), - Indexed: atomic.LoadInt64(&a.docsIndexed), - TooManyRequests: atomic.LoadInt64(&a.tooManyRequests), - BytesTotal: atomic.LoadInt64(&a.bytesTotal), - BytesUncompressedTotal: atomic.LoadInt64(&a.bytesUncompressedTotal), - AvailableBulkRequests: atomic.LoadInt64(&a.availableBulkRequests), - IndexersActive: a.scalingInformation().activeIndexers, - IndexersCreated: atomic.LoadInt64(&a.activeCreated), - IndexersDestroyed: atomic.LoadInt64(&a.activeDestroyed), - } -} - // Add enqueues document for appending to index. // // The document body will be copied to a buffer using io.Copy, and document may @@ -240,7 +208,8 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) Body: document, } if len(a.bulkItems) == cap(a.bulkItems) { - a.addCount(1, &a.blockedAdd, a.metrics.blockedAdd) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.blockedAdd.Add(context.Background(), 1, attrs) } select { @@ -250,29 +219,17 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) return ErrClosed case a.bulkItems <- item: } - a.addCount(1, &a.docsAdded, a.metrics.docsAdded) - a.addUpDownCount(1, &a.docsActive, a.metrics.docsActive) - return nil -} - -func (a *Appender) addCount(delta int64, lm *int64, m metric.Int64Counter, opts ...metric.AddOption) { - // legacy metric - if lm != nil { - atomic.AddInt64(lm, delta) - } + a.docsAdded.Add(1) attrs := metric.WithAttributeSet(a.config.MetricAttributes) - m.Add(context.Background(), delta, append(opts, attrs)...) -} + a.metrics.docsAdded.Add(context.Background(), 1, attrs) + a.metrics.docsActive.Add(context.Background(), 1, attrs) -func (a *Appender) addUpDownCount(delta int64, lm *int64, m metric.Int64UpDownCounter, opts ...metric.AddOption) { - // legacy metric - if lm != nil { - atomic.AddInt64(lm, delta) - } + return nil +} - attrs := metric.WithAttributeSet(a.config.MetricAttributes) - m.Add(context.Background(), delta, append(opts, attrs)...) +func (a *Appender) IndexersActive() int64 { + return a.scalingInformation().activeIndexers } func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { @@ -280,7 +237,10 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { if n == 0 { return nil } - defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests) + defer func() { + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.bulkRequests.Add(context.Background(), 1, attrs) + }() logger := a.config.Logger var span trace.Span @@ -313,16 +273,18 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { // Record the BulkIndexer buffer's length as the bytesTotal metric after // the request has been flushed. if flushed := bulkIndexer.BytesFlushed(); flushed > 0 { - a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.bytesTotal.Add(context.Background(), int64(flushed), attrs) } // Record the BulkIndexer uncompressed bytes written to the buffer // as the bytesUncompressedTotal metric after the request has been flushed. if flushed := bulkIndexer.BytesUncompressedFlushed(); flushed > 0 { - a.addCount(int64(flushed), &a.bytesUncompressedTotal, a.metrics.bytesUncompressedTotal) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.bytesUncompressedTotal.Add(context.Background(), int64(flushed), attrs) } if err != nil { - a.addUpDownCount(-int64(n), &a.docsActive, a.metrics.docsActive) - atomic.AddInt64(&a.docsFailed, int64(n)) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.docsActive.Add(context.Background(), -int64(n), attrs) logger.Error("bulk indexing request failed", zap.Error(err)) if a.otelTracingEnabled() && span.IsRecording() { span.RecordError(err) @@ -330,28 +292,36 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - a.addCount(int64(n), nil, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + int64(n), metric.WithAttributes(attribute.String("status", "Timeout")), + metric.WithAttributeSet(a.config.MetricAttributes), ) } // Bulk indexing may fail with different status codes. var errFailed ErrorFlushFailed if errors.As(err, &errFailed) { - var legacy *int64 var status string switch { case errFailed.tooMany: - legacy, status = &a.tooManyRequests, "TooMany" + a.tooManyRequests.Add(a.tooManyRequests.Load()) + status = "TooMany" case errFailed.clientError: - legacy, status = &a.docsFailedClient, "FailedClient" + status = "FailedClient" case errFailed.serverError: - legacy, status = &a.docsFailedServer, "FailedServer" + status = "FailedServer" } if status != "" { - a.addCount(int64(n), legacy, a.metrics.docsIndexed, - metric.WithAttributes(attribute.String("status", status), semconv.HTTPResponseStatusCode(errFailed.statusCode)), + a.metrics.docsIndexed.Add( + context.Background(), + int64(n), + metric.WithAttributes( + attribute.String("status", status), + semconv.HTTPResponseStatusCode(errFailed.statusCode), + ), + metric.WithAttributeSet(a.config.MetricAttributes), ) } } @@ -371,7 +341,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { } docsFailed = int64(len(resp.FailedDocs)) totalFlushed := docsFailed + docsIndexed - a.addUpDownCount(-totalFlushed, &a.docsActive, a.metrics.docsActive) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.docsActive.Add(context.Background(), -totalFlushed, attrs) for _, info := range resp.FailedDocs { if info.Status >= 400 && info.Status < 500 { if info.Status == http.StatusTooManyRequests { @@ -396,64 +367,79 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { key.Index, key.Error.Type, key.Error.Reason, ), zap.Int("documents", count)) } - if docsFailed > 0 { - atomic.AddInt64(&a.docsFailed, docsFailed) - } if resp.RetriedDocs > 0 { // docs are scheduled to be retried but not yet failed due to retry limit - a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried, + a.metrics.docsRetried.Add( + context.Background(), + resp.RetriedDocs, + metric.WithAttributeSet(a.config.MetricAttributes), metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)), ) } if docsIndexed > 0 { - a.addCount(docsIndexed, &a.docsIndexed, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + docsIndexed, + metric.WithAttributeSet(a.config.MetricAttributes), metric.WithAttributes(attribute.String("status", "Success")), ) } if tooManyRequests > 0 { - a.addCount(tooManyRequests, &a.tooManyRequests, - a.metrics.docsIndexed, + a.tooManyRequests.Add(tooManyRequests) + a.metrics.docsIndexed.Add( + context.Background(), + tooManyRequests, metric.WithAttributes(attribute.String("status", "TooMany")), + metric.WithAttributeSet(a.config.MetricAttributes), ) } if clientFailed > 0 { - a.addCount(clientFailed, &a.docsFailedClient, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + clientFailed, metric.WithAttributes(attribute.String("status", "FailedClient")), + metric.WithAttributeSet(a.config.MetricAttributes), ) } if serverFailed > 0 { - a.addCount(serverFailed, &a.docsFailedServer, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + serverFailed, metric.WithAttributes(attribute.String("status", "FailedServer")), + metric.WithAttributeSet(a.config.MetricAttributes), ) } if failureStoreDocs.Used > 0 { - a.addCount(failureStoreDocs.Used, nil, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + failureStoreDocs.Used, metric.WithAttributes( attribute.String("status", "FailureStore"), attribute.String("failure_store", string(FailureStoreStatusUsed)), ), + metric.WithAttributeSet(a.config.MetricAttributes), ) } if failureStoreDocs.Failed > 0 { - a.addCount(failureStoreDocs.Failed, nil, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + failureStoreDocs.Failed, metric.WithAttributes( attribute.String("status", "FailureStore"), attribute.String("failure_store", string(FailureStoreStatusFailed)), ), + metric.WithAttributeSet(a.config.MetricAttributes), ) } if failureStoreDocs.NotEnabled > 0 { - a.addCount(failureStoreDocs.NotEnabled, nil, - a.metrics.docsIndexed, + a.metrics.docsIndexed.Add( + context.Background(), + failureStoreDocs.NotEnabled, metric.WithAttributes( attribute.String("status", "FailureStore"), attribute.String("failure_store", string(FailureStoreStatusNotEnabled)), ), + metric.WithAttributeSet(a.config.MetricAttributes), ) } logger.Debug( @@ -504,8 +490,9 @@ func (a *Appender) runActiveIndexer() { // to reset it to ensure we're using the right client. active.SetClient(a.client) - a.addUpDownCount(-1, &a.availableBulkRequests, a.metrics.availableBulkRequests) - a.addUpDownCount(1, nil, a.metrics.inflightBulkrequests) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.availableBulkRequests.Add(context.Background(), -1, attrs) + a.metrics.inflightBulkrequests.Add(context.Background(), 1, attrs) flushTimer.Reset(a.config.FlushInterval) } if err := active.Add(item); err != nil { @@ -570,8 +557,9 @@ func (a *Appender) runActiveIndexer() { }) indexer.Reset() a.pool.Put(a.id, indexer) - a.addUpDownCount(1, &a.availableBulkRequests, a.metrics.availableBulkRequests) - a.addUpDownCount(-1, nil, a.metrics.inflightBulkrequests, attrs) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.availableBulkRequests.Add(context.Background(), 1, attrs) + a.metrics.inflightBulkrequests.Add(context.Background(), -1, attrs) a.metrics.flushDuration.Record(context.Background(), took.Seconds(), attrs, ) @@ -587,11 +575,13 @@ func (a *Appender) runActiveIndexer() { now := time.Now() info := a.scalingInformation() if a.maybeScaleDown(now, info, &timedFlush) { - a.addCount(1, &a.activeDestroyed, a.metrics.activeDestroyed) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.activeDestroyed.Add(context.Background(), 1, attrs) return } if a.maybeScaleUp(now, info, &fullFlush) { - a.addCount(1, &a.activeCreated, a.metrics.activeCreated) + attrs := metric.WithAttributeSet(a.config.MetricAttributes) + a.metrics.activeCreated.Add(context.Background(), 1, attrs) a.errgroup.Go(func() error { a.runActiveIndexer() return nil @@ -712,8 +702,7 @@ func (a *Appender) scalingInformation() scalingInfo { // indexFailureRate returns the decimal percentage of 429 / total docs. func (a *Appender) indexFailureRate() float64 { - return float64(atomic.LoadInt64(&a.tooManyRequests)) / - float64(atomic.LoadInt64(&a.docsAdded)) + return float64(a.tooManyRequests.Load()) / float64(a.docsAdded.Load()) } // otelTracingEnabled checks whether we should be doing tracing @@ -756,65 +745,6 @@ func (s scalingInfo) withinCoolDown(cooldown time.Duration, now time.Time) bool return s.lastAction.Add(cooldown).After(now) } -// Stats holds bulk indexing statistics. -type Stats struct { - // Active holds the active number of items waiting in the indexer's queue. - Active int64 - - // Added holds the number of items added to the indexer. - Added int64 - - // BulkRequests holds the number of bulk requests completed. - BulkRequests int64 - - // Failed holds the number of indexing operations that failed. It includes - // all failures. - Failed int64 - - // FailedClient holds the number of indexing operations that failed with a - // status_code >= 400 < 500, but not 429. - FailedClient int64 - - // FailedServer holds the number of indexing operations that failed with a - // status_code >= 500. - FailedServer int64 - - // Indexed holds the number of indexing operations that have completed - // successfully. - Indexed int64 - - // TooManyRequests holds the number of indexing operations that failed due - // to Elasticsearch responding with 429 Too many Requests. - TooManyRequests int64 - - // BytesTotal represents the total number of bytes written to the request - // body that is sent in the outgoing _bulk request to Elasticsearch. - // The number of bytes written will be smaller when compression is enabled. - // This implementation differs from the previous number reported by libbeat - // which counts bytes at the transport level. - BytesTotal int64 - - // BytesUncompressedTotal represents the total number of bytes written to - // the request body before compression. - // The number of bytes written will be equal to BytesTotal if compression is disabled. - BytesUncompressedTotal int64 - - // AvailableBulkRequests represents the number of bulk indexers - // available for making bulk index requests. - AvailableBulkRequests int64 - - // IndexersActive represents the number of active bulk indexers that are - // concurrently processing batches. - IndexersActive int64 - - // IndexersCreated represents the number of times new active indexers were - // created. - IndexersCreated int64 - - // IndexersDestroyed represents the number of times an active indexer was destroyed. - IndexersDestroyed int64 -} - func timeFunc(f func()) time.Duration { t0 := time.Now() if f != nil { diff --git a/appender_test.go b/appender_test.go index 0ca05e3..2404c9f 100644 --- a/appender_test.go +++ b/appender_test.go @@ -92,7 +92,7 @@ func TestAppender(t *testing.T) { rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( func(ik sdkmetric.InstrumentKind) metricdata.Temporality { - return metricdata.DeltaTemporality + return metricdata.CumulativeTemporality }, )) @@ -109,53 +109,42 @@ func TestAppender(t *testing.T) { require.NoError(t, err) defer indexer.Close(context.Background()) - available := indexer.Stats().AvailableBulkRequests const N = 10 for i := 0; i < N; i++ { addMinimalDoc(t, indexer, "logs-foo-testing") } + <-time.After(1 * time.Second) - timeout := time.After(2 * time.Second) -loop: - for { - select { - case <-time.After(10 * time.Millisecond): - // Because the internal channel is buffered to increase performance, - // the available indexer may not take documents right away, loop until - // the available bulk requests has been lowered. - if indexer.Stats().AvailableBulkRequests < available { - break loop - } - case <-timeout: - t.Fatalf("timed out waiting for the active bulk indexer to pull from the available queue") - } - } // Appender has not been flushed, there is one active bulk indexer. - assert.Equal(t, docappender.Stats{Added: N, Active: N, AvailableBulkRequests: 9, IndexersActive: 1}, indexer.Stats()) + // assert.Equal(t, docappender.Stats{Added: N, Active: N, AvailableBulkRequests: 9, IndexersActive: 1}, indexer.Stats()) + var asserted atomic.Int64 + assertCounter := docappendertest.NewAssertCounter(t, &asserted) + + // Collect metrics before flushing. + var rm metricdata.ResourceMetrics + require.NoError(t, rdr.Collect(context.Background(), &rm)) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + assertCounter(m, N, indexerAttrs) + case "elasticsearch.events.queued": + assertCounter(m, N, indexerAttrs) + case "elasticsearch.bulk_requests.available": + assertCounter(m, 9, indexerAttrs) + case "elasticsearch.bulk_requests.inflight": + assertCounter(m, 1, indexerAttrs) + } + }) + assert.Equal(t, int64(4), asserted.Load()) // Closing the indexer flushes enqueued documents. err = indexer.Close(context.Background()) require.NoError(t, err) - stats := indexer.Stats() - failed := int64(3) - assert.Equal(t, docappender.Stats{ - Added: N, - Active: 0, - BulkRequests: 1, - Failed: failed, - FailedClient: 1, - FailedServer: 1, - Indexed: N - failed, - TooManyRequests: 1, - AvailableBulkRequests: 10, - BytesTotal: bytesTotal, - BytesUncompressedTotal: bytesUncompressed, - }, stats) - var rm metricdata.ResourceMetrics + // Collect metrics after flushing. assert.NoError(t, rdr.Collect(context.Background(), &rm)) - var asserted atomic.Int64 - assertCounter := docappendertest.NewAssertCounter(t, &asserted) + asserted.Store(0) + assertCounter = docappendertest.NewAssertCounter(t, &asserted) var processedAsserted int assertProcessedCounter := func(metric metricdata.Metrics, attrs attribute.Set) { @@ -168,16 +157,16 @@ loop: switch status.AsString() { case "Success": processedAsserted++ - assert.Equal(t, stats.Indexed, dp.Value) + assert.Equal(t, int64(N-3), dp.Value) case "FailedClient": processedAsserted++ - assert.Equal(t, stats.FailedClient, dp.Value) + assert.Equal(t, int64(1), dp.Value) case "FailedServer": processedAsserted++ - assert.Equal(t, stats.FailedServer, dp.Value) + assert.Equal(t, int64(1), dp.Value) case "TooMany": processedAsserted++ - assert.Equal(t, stats.TooManyRequests, dp.Value) + assert.Equal(t, int64(1), dp.Value) case "FailureStore": processedAsserted++ fs, exist := dp.Attributes.Value(attribute.Key("failure_store")) @@ -196,24 +185,29 @@ loop: } } } - // check the set of names and then check the counter or histogram + + // Check the set of names and then check the counter or histogram. unexpectedMetrics := []string{} docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { switch m.Name { case "elasticsearch.events.count": - assertCounter(m, stats.Added, indexerAttrs) + assertCounter(m, N, indexerAttrs) case "elasticsearch.events.queued": - assertCounter(m, stats.Active, indexerAttrs) + assertCounter(m, 0, indexerAttrs) case "elasticsearch.bulk_requests.count": - assertCounter(m, stats.BulkRequests, indexerAttrs) + assertCounter(m, 1, indexerAttrs) case "elasticsearch.events.processed": assertProcessedCounter(m, indexerAttrs) case "elasticsearch.bulk_requests.available": - assertCounter(m, stats.AvailableBulkRequests, indexerAttrs) + assertCounter(m, 10, indexerAttrs) + case "elasticsearch.indexer.created": + assertCounter(m, 1, indexerAttrs) + case "elasticsearch.indexer.destroyed": + assertCounter(m, 1, indexerAttrs) case "elasticsearch.flushed.bytes": - assertCounter(m, stats.BytesTotal, indexerAttrs) + assertCounter(m, bytesTotal, indexerAttrs) case "elasticsearch.flushed.uncompressed.bytes": - assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs) + assertCounter(m, bytesUncompressed, indexerAttrs) case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency": // expect this metric name but no assertions done // as it's histogram and it's checked elsewhere @@ -264,7 +258,7 @@ func TestAppenderRetry(t *testing.T) { rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( func(ik sdkmetric.InstrumentKind) metricdata.Temporality { - return metricdata.DeltaTemporality + return metricdata.CumulativeTemporality }, )) @@ -274,7 +268,7 @@ func TestAppenderRetry(t *testing.T) { indexer, err := docappender.New(client, docappender.Config{ FlushInterval: time.Minute, - FlushBytes: 750, // this is enough to flush after 9 documents + FlushBytes: 800, // this is enough to flush after 9 documents MaxRequests: 1, // to ensure the test is stable MaxDocumentRetries: 1, // to test the document retry logic MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), @@ -288,67 +282,62 @@ func TestAppenderRetry(t *testing.T) { for i := 0; i < N; i++ { addMinimalDoc(t, indexer, "logs-foo-testing") } + <-time.After(1 * time.Second) - timeout := time.After(2 * time.Second) -loop: - for { - select { - case <-time.After(10 * time.Millisecond): - // Because the internal channel is buffered to increase performance, - // the available indexer may not take documents right away, loop until - // the available bulk requests has been lowered. - if indexer.Stats().BulkRequests == 1 { - break loop - } - case <-timeout: - t.Fatalf("timed out waiting for the active bulk indexer to send one bulk request") - } - } - - stats := indexer.Stats() var rm metricdata.ResourceMetrics assert.NoError(t, rdr.Collect(context.Background(), &rm)) + var asserted atomic.Int64 assertCounter := docappendertest.NewAssertCounter(t, &asserted) + // Check the set of names and then check the counter or histogram. var processedAsserted int - assertProcessedCounter := func(metric metricdata.Metrics, attrs attribute.Set) { - asserted.Add(1) - counter := metric.Data.(metricdata.Sum[int64]) - for _, dp := range counter.DataPoints { - metricdatatest.AssertHasAttributes(t, dp, attrs.ToSlice()...) - status, exist := dp.Attributes.Value(attribute.Key("status")) - assert.True(t, exist) - switch status.AsString() { - case "Success": - processedAsserted++ - assert.Equal(t, stats.Indexed, dp.Value) - case "FailedClient": - processedAsserted++ - assert.Equal(t, stats.FailedClient, dp.Value) - case "FailedServer": - processedAsserted++ - assert.Equal(t, stats.FailedServer, dp.Value) - case "TooMany": - processedAsserted++ - assert.Equal(t, stats.TooManyRequests, dp.Value) - default: - assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) - } - } - } - // check the set of names and then check the counter or histogram unexpectedMetrics := []string{} docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { switch m.Name { case "elasticsearch.events.count": - assertCounter(m, stats.Added, indexerAttrs) + assertCounter(m, int64(N), indexerAttrs) case "elasticsearch.events.queued": - assertCounter(m, stats.Active, indexerAttrs) + assertCounter(m, int64(1), indexerAttrs) case "elasticsearch.bulk_requests.count": - assertCounter(m, stats.BulkRequests, indexerAttrs) + assertCounter(m, int64(1), indexerAttrs) case "elasticsearch.events.processed": - assertProcessedCounter(m, indexerAttrs) + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + metricdatatest.AssertHasAttributes(t, dp, indexerAttrs.ToSlice()...) + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + processedAsserted++ + assert.Equal(t, int64(7), dp.Value) + case "FailedClient": + processedAsserted++ + assert.Equal(t, int64(1), dp.Value) + case "FailedServer": + processedAsserted++ + assert.Equal(t, int64(1), dp.Value) + case "TooMany": + processedAsserted++ + assert.Equal(t, int64(1), dp.Value) + case "FailureStore": + processedAsserted++ + fs, exist := dp.Attributes.Value(attribute.Key("failure_store")) + assert.True(t, exist) + assert.Contains( + t, + []docappender.FailureStoreStatus{ + docappender.FailureStoreStatusUsed, + docappender.FailureStoreStatusFailed, + docappender.FailureStoreStatusNotEnabled, + }, + docappender.FailureStoreStatus(fs.AsString()), + ) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } case "elasticsearch.events.retried": assertCounter(m, 1, attribute.NewSet( attribute.String("a", "b"), @@ -356,11 +345,11 @@ loop: attribute.Int("greatest_retry", 1), )) case "elasticsearch.bulk_requests.available": - assertCounter(m, stats.AvailableBulkRequests, indexerAttrs) + assertCounter(m, int64(1), indexerAttrs) case "elasticsearch.flushed.bytes": - assertCounter(m, stats.BytesTotal, indexerAttrs) + assertCounter(m, bytesTotal, indexerAttrs) case "elasticsearch.flushed.uncompressed.bytes": - assertCounter(m, stats.BytesUncompressedTotal, indexerAttrs) + assertCounter(m, bytesUncompressed, indexerAttrs) case "elasticsearch.buffer.latency", "elasticsearch.flushed.latency": // expect this metric name but no assertions done // as it's histogram and it's checked elsewhere @@ -378,21 +367,56 @@ loop: // Closing the indexer flushes enqueued documents. err = indexer.Close(context.Background()) require.NoError(t, err) - stats = indexer.Stats() - failed := int64(2) - assert.Equal(t, docappender.Stats{ - Added: N, - Active: 0, - BulkRequests: 2, - Failed: failed, - FailedClient: 1, - FailedServer: 1, - Indexed: N - failed, - TooManyRequests: 0, - AvailableBulkRequests: 1, - BytesTotal: bytesTotal, - BytesUncompressedTotal: bytesUncompressed, - }, stats) + + // Collect metrics before flushing. + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + assertCounter(m, int64(N), indexerAttrs) + case "elasticsearch.events.queued": + assertCounter(m, int64(0), indexerAttrs) + case "elasticsearch.events.processed": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + metricdatatest.AssertHasAttributes(t, dp, indexerAttrs.ToSlice()...) + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, int64(8), dp.Value) + case "FailedClient": + assert.Equal(t, int64(1), dp.Value) + case "FailedServer": + assert.Equal(t, int64(1), dp.Value) + case "TooMany": + assert.Equal(t, int64(1), dp.Value) + case "FailureStore": + fs, exist := dp.Attributes.Value(attribute.Key("failure_store")) + assert.True(t, exist) + assert.Contains( + t, + []docappender.FailureStoreStatus{ + docappender.FailureStoreStatusUsed, + docappender.FailureStoreStatusFailed, + docappender.FailureStoreStatusNotEnabled, + }, + docappender.FailureStoreStatus(fs.AsString()), + ) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + case "elasticsearch.bulk_requests.available": + assertCounter(m, int64(1), indexerAttrs) + case "elasticsearch.flushed.bytes": + assertCounter(m, bytesTotal, indexerAttrs) + case "elasticsearch.flushed.uncompressed.bytes": + assertCounter(m, bytesUncompressed, indexerAttrs) + } + }) } func TestAppenderAvailableAppenders(t *testing.T) { @@ -405,7 +429,18 @@ func TestAppenderAvailableAppenders(t *testing.T) { _, result := docappendertest.DecodeBulkRequest(r) json.NewEncoder(w).Encode(result) }) - indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute, FlushBytes: 1}) + + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + + indexer, err := docappender.New(client, docappender.Config{ + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), + FlushInterval: time.Minute, + FlushBytes: 1, + }) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -422,23 +457,71 @@ func TestAppenderAvailableAppenders(t *testing.T) { t.Fatalf("timed out waiting for %d, received %d", N, i) } } - stats := indexer.Stats() + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + // FlushBytes is set arbitrarily low, forcing a flush on each new // document. There should be no available bulk indexers. - assert.Equal(t, docappender.Stats{Added: N, Active: N, AvailableBulkRequests: 0, IndexersActive: 1}, stats) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.events.queued": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.bulk_requests.inflight": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + } + }) close(unblockRequests) err = indexer.Close(context.Background()) require.NoError(t, err) - stats = indexer.Stats() - stats.BytesTotal = 0 // Asserted elsewhere. - stats.BytesUncompressedTotal = 0 // Asserted elsewhere. - assert.Equal(t, docappender.Stats{ - Added: N, - BulkRequests: N, - Indexed: N, - AvailableBulkRequests: 10, - }, stats) + + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.events.processed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + } + }) } func TestAppenderEncoding(t *testing.T) { @@ -487,9 +570,17 @@ func TestAppenderCompressionLevel(t *testing.T) { bytesUncompressedTotal += stat.UncompressedBytes json.NewEncoder(w).Encode(result) }) + + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + indexer, err := docappender.New(client, docappender.Config{ CompressionLevel: gzip.BestSpeed, FlushInterval: time.Minute, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -499,18 +590,43 @@ func TestAppenderCompressionLevel(t *testing.T) { // Closing the indexer flushes enqueued documents. err = indexer.Close(context.Background()) require.NoError(t, err) - stats := indexer.Stats() - assert.Equal(t, docappender.Stats{ - Added: 1, - Active: 0, - BulkRequests: 1, - Failed: 0, - Indexed: 1, - TooManyRequests: 0, - AvailableBulkRequests: 10, - BytesTotal: bytesTotal, - BytesUncompressedTotal: bytesUncompressedTotal, - }, stats) + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch m.Name { + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(1), dp.Value) + } + case "elasticsearch.events.queued": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(1), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.flushed.bytes": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesTotal), dp.Value) + } + case "elasticsearch.flushed.uncompressed.bytes": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesUncompressedTotal), dp.Value) + } + } + }) } func TestAppenderFlushInterval(t *testing.T) { @@ -709,8 +825,19 @@ func TestAppenderFlushRequestError(t *testing.T) { // failed requests with different failure scenarios. Since a bulk request // contains N documents, the appender should increment the categorized // failure by the same number of documents in the request. - for _, includeSource := range []docappender.Value{docappender.Unset, docappender.True, docappender.False} { - for _, sc := range []int{http.StatusBadRequest, http.StatusForbidden, http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusServiceUnavailable, http.StatusGatewayTimeout} { + for _, includeSource := range []docappender.Value{ + docappender.Unset, + docappender.True, + docappender.False, + } { + for _, sc := range []int{ + http.StatusBadRequest, + http.StatusForbidden, + http.StatusTooManyRequests, + http.StatusInternalServerError, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout, + } { t.Run(strconv.Itoa(sc)+"/"+strconv.Itoa(int(includeSource)), func(t *testing.T) { var bytesTotal int64 var bytesUncompressedTotal int64 @@ -730,8 +857,8 @@ func TestAppenderFlushRequestError(t *testing.T) { indexer, err := docappender.New(client, docappender.Config{ FlushInterval: time.Minute, - MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), IncludeSourceOnError: includeSource, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -756,42 +883,84 @@ func TestAppenderFlushRequestError(t *testing.T) { default: t.Fatal("unknown include source setting") } - stats := indexer.Stats() - wantStats := docappender.Stats{ - Added: int64(docs), - Active: 0, - BulkRequests: 1, // Single bulk request - Failed: int64(docs), - AvailableBulkRequests: 10, - BytesTotal: bytesTotal, - BytesUncompressedTotal: bytesUncompressedTotal, - } + var status string switch { case sc == 429: status = "TooMany" - wantStats.TooManyRequests = int64(docs) case sc >= 500: status = "FailedServer" - wantStats.FailedServer = int64(docs) case sc >= 400 && sc != 429: status = "FailedClient" - wantStats.FailedClient = int64(docs) } - assert.Equal(t, wantStats, stats) var rm metricdata.ResourceMetrics assert.NoError(t, rdr.Collect(context.Background(), &rm)) var asserted atomic.Int64 - assertCounter := docappendertest.NewAssertCounter(t, &asserted) docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { switch m.Name { + case "elasticsearch.events.count": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.events.queued": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.bulk_requests.count": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(1), dp.Value) + } + case "elasticsearch.bulk_requests.available": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.flushed.bytes": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesTotal), dp.Value) + } + case "elasticsearch.flushed.uncompressed.bytes": + asserted.Add(1) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesUncompressedTotal), dp.Value) + } case "elasticsearch.events.processed": - assertCounter(m, 3, attribute.NewSet(attribute.String("status", status), semconv.HTTPResponseStatusCode(sc))) + asserted.Add(1) + indexerAttrs := attribute.NewSet( + attribute.String("status", status), + semconv.HTTPResponseStatusCode(sc), + ) + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + metricdatatest.AssertHasAttributes(t, dp, indexerAttrs.ToSlice()...) + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "FailedClient": + assert.Equal(t, int64(docs), dp.Value) + case "FailedServer": + assert.Equal(t, int64(docs), dp.Value) + case "TooMany": + assert.Equal(t, int64(docs), dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } } }) - assert.Equal(t, int64(1), asserted.Load()) + assert.Equal(t, int64(7), asserted.Load()) }) } } @@ -1271,17 +1440,38 @@ func TestAppenderCloseInterruptAdd(t *testing.T) { case <-r.Context().Done(): } }) + + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + documentBufferSize := 100 indexer, err := docappender.New(client, docappender.Config{ // Set FlushBytes to 1 so a single document causes a flush. FlushBytes: 1, DocumentBufferSize: documentBufferSize, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) require.NoError(t, err) defer indexer.Close(context.Background()) + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + var availableBulkRequests int + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + if m.Name == "elasticsearch.bulk_requests.available" { + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + availableBulkRequests = int(dp.Value) + } + } + }) + // Fill up all the bulk requests and the buffered channel. - for n := indexer.Stats().AvailableBulkRequests + int64(documentBufferSize); n >= 0; n-- { + for n := int64(availableBulkRequests) + int64(documentBufferSize); n >= 0; n-- { addMinimalDoc(t, indexer, "logs-foo-testing") } @@ -1386,9 +1576,8 @@ func TestAppenderUnknownResponseFields(t *testing.T) { assert.NoError(t, err) } +// This test ensures that all the channel items are consumed and indexed when the indexer is closed. func TestAppenderCloseBusyIndexer(t *testing.T) { - // This test ensures that all the channel items are consumed and indexed - // when the indexer is closed. var bytesTotal int64 var bytesUncompressedTotal int64 client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -1397,7 +1586,16 @@ func TestAppenderCloseBusyIndexer(t *testing.T) { bytesUncompressedTotal = stat.UncompressedBytes json.NewEncoder(w).Encode(result) }) - indexer, err := docappender.New(client, docappender.Config{}) + + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + + indexer, err := docappender.New(client, docappender.Config{ + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), + }) require.NoError(t, err) t.Cleanup(func() { indexer.Close(context.Background()) }) @@ -1408,15 +1606,50 @@ func TestAppenderCloseBusyIndexer(t *testing.T) { assert.NoError(t, indexer.Close(context.Background())) - assert.Equal(t, docappender.Stats{ - Added: N, - Indexed: N, - BulkRequests: 1, - BytesTotal: bytesTotal, - BytesUncompressedTotal: bytesUncompressedTotal, - AvailableBulkRequests: 10, - IndexersActive: 0, - }, indexer.Stats()) + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch n := m.Name; n { + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(N), dp.Value) + } + case "elasticsearch.events.processed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, int64(N), dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(1), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.flushed.bytes": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesTotal), dp.Value) + } + case "elasticsearch.flushed.uncompressed.bytes": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(bytesUncompressedTotal), dp.Value) + } + } + }) } func TestAppenderPipeline(t *testing.T) { @@ -1544,51 +1777,93 @@ func TestAppenderScaling(t *testing.T) { require.NoError(t, err) } } - waitForScaleUp := func(t *testing.T, indexer *docappender.Appender, n int64) { + + waitForScaleUp := func(t *testing.T, a *docappender.Appender, n int64) { timeout := time.NewTimer(5 * time.Second) - stats := indexer.Stats() limit := int64(runtime.GOMAXPROCS(0) / 4) - for stats.IndexersActive < n { - stats = indexer.Stats() - require.LessOrEqual(t, stats.IndexersActive, limit) + for a.IndexersActive() < n { + require.LessOrEqual(t, a.IndexersActive(), limit) select { case <-time.After(10 * time.Millisecond): case <-timeout.C: - stats = indexer.Stats() - require.GreaterOrEqual(t, stats.IndexersActive, n, "stats: %+v", stats) + require.GreaterOrEqual(t, a.IndexersActive(), n) } } - stats = indexer.Stats() - assert.Greater(t, stats.IndexersCreated, int64(0), "No upscales took place: %+v", stats) + assert.Greater(t, a.IndexersActive(), int64(0), "No upscales took place") } - waitForScaleDown := func(t *testing.T, indexer *docappender.Appender, n int64) { + + waitForScaleDown := func( + t *testing.T, + a *docappender.Appender, + rdr *sdkmetric.ManualReader, + n int64, + ) { timeout := time.NewTimer(5 * time.Second) - stats := indexer.Stats() - for stats.IndexersActive > n { - stats = indexer.Stats() - require.Greater(t, stats.IndexersActive, int64(0)) + for a.IndexersActive() > n { + require.Greater(t, a.IndexersActive(), int64(0)) select { case <-time.After(10 * time.Millisecond): case <-timeout.C: - stats = indexer.Stats() - require.LessOrEqual(t, stats.IndexersActive, n, "stats: %+v", stats) + require.LessOrEqual(t, a.IndexersActive(), n) + } + } + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + var indexersDestroyed int64 + for _, m := range rm.ScopeMetrics[0].Metrics { + if m.Name == "elasticsearch.indexer.destroyed" { + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + indexersDestroyed += dp.Value + } } } - stats = indexer.Stats() - assert.Greater(t, stats.IndexersDestroyed, int64(0), "No downscales took place: %+v", stats) - assert.Equal(t, stats.IndexersActive, int64(n), "%+v", stats) + + assert.Greater(t, indexersDestroyed, int64(0), "No downscales took place") + assert.Equal(t, a.IndexersActive(), int64(n)) } - waitForBulkRequests := func(t *testing.T, indexer *docappender.Appender, n int64) { + + waitForBulkRequests := func( + t *testing.T, + indexer *docappender.Appender, + rdr *sdkmetric.ManualReader, + n int64, + ) { + bulkRequests := func() int64 { + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + var res int64 + for _, m := range rm.ScopeMetrics[0].Metrics { + if m.Name == "elasticsearch.bulk_requests.count" { + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + res += dp.Value + } + } + } + + return res + } + timeout := time.After(time.Second) - for indexer.Stats().BulkRequests < n { + for bulkRequests() < n { select { case <-time.After(time.Millisecond): case <-timeout: - t.Fatalf("timed out while waiting for documents to be indexed: %+v", indexer.Stats()) + t.Fatalf("timed out while waiting for documents to be indexed") } } } t.Run("DownscaleIdle", func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + }, + )) + // Override the default GOMAXPROCS, ensuring the active indexers can scale up. setGOMAXPROCS(t, 12) indexer := newIndexer(t, docappender.Config{ @@ -1599,30 +1874,82 @@ func TestAppenderScaling(t *testing.T) { Threshold: 1, CoolDown: 1, }, ScaleDown: docappender.ScaleActionConfig{ - Threshold: 2, CoolDown: time.Millisecond, + Threshold: 2, + CoolDown: time.Millisecond, }, IdleInterval: 50 * time.Millisecond, }, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) docs := int64(20) sendDocuments(t, indexer, int(docs)) waitForScaleUp(t, indexer, 3) - waitForScaleDown(t, indexer, 1) - stats := indexer.Stats() - stats.BytesTotal = 0 - stats.BytesUncompressedTotal = 0 - assert.Equal(t, docappender.Stats{ - Active: 0, - Added: docs, - Indexed: docs, - BulkRequests: docs, - IndexersCreated: 2, - IndexersDestroyed: 2, - AvailableBulkRequests: 10, - IndexersActive: 1, - }, stats) + waitForScaleDown(t, indexer, rdr, 1) + + assert.Equal(t, int64(1), indexer.IndexersActive()) + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch n := m.Name; n { + case "elasticsearch.events.queued": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.events.processed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, int64(docs), dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.indexer.created": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(2), dp.Value) + } + case "elasticsearch.indexer.destroyed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(2), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.bulk_requests.inflight": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + } + }) }) t.Run("DownscaleActiveLimit", func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + }, + )) + // Override the default GOMAXPROCS, ensuring the active indexers can scale up. setGOMAXPROCS(t, 12) indexer := newIndexer(t, docappender.Config{ @@ -1630,13 +1957,16 @@ func TestAppenderScaling(t *testing.T) { FlushBytes: 1, Scaling: docappender.ScalingConfig{ ScaleUp: docappender.ScaleActionConfig{ - Threshold: 5, CoolDown: 1, + Threshold: 5, + CoolDown: 1, }, ScaleDown: docappender.ScaleActionConfig{ - Threshold: 100, CoolDown: time.Minute, + Threshold: 100, + CoolDown: time.Minute, }, IdleInterval: 100 * time.Millisecond, }, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) docs := int64(14) sendDocuments(t, indexer, int(docs)) @@ -1646,25 +1976,74 @@ func TestAppenderScaling(t *testing.T) { // Wait for the indexers to scale down from 3 to 1. The downscale cool // down of `1m` isn't respected, since the active limit is breached with // the gomaxprocs change. - waitForScaleDown(t, indexer, 1) + waitForScaleDown(t, indexer, rdr, 1) // Wait for all the documents to be indexed. - waitForBulkRequests(t, indexer, docs) - - stats := indexer.Stats() - stats.BytesTotal = 0 - stats.BytesUncompressedTotal = 0 - assert.Equal(t, docappender.Stats{ - Active: 0, - Added: docs, - Indexed: docs, - BulkRequests: docs, - AvailableBulkRequests: 10, - IndexersActive: 1, - IndexersCreated: 2, - IndexersDestroyed: 2, - }, stats) + waitForBulkRequests(t, indexer, rdr, docs) + + assert.Equal(t, int64(1), indexer.IndexersActive()) + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch n := m.Name; n { + case "elasticsearch.events.queued": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.events.processed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, int64(docs), dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.indexer.created": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(2), dp.Value) + } + case "elasticsearch.indexer.destroyed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(2), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.bulk_requests.inflight": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + } + }) }) t.Run("UpscaleCooldown", func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + }, + )) + // Override the default GOMAXPROCS, ensuring the active indexers can scale up. setGOMAXPROCS(t, 12) indexer := newIndexer(t, docappender.Config{ @@ -1681,32 +2060,79 @@ func TestAppenderScaling(t *testing.T) { }, IdleInterval: 100 * time.Millisecond, }, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) docs := int64(50) sendDocuments(t, indexer, int(docs)) waitForScaleUp(t, indexer, 2) - // Wait for all the documents to be indexed. - waitForBulkRequests(t, indexer, docs) - - assert.Equal(t, int64(2), indexer.Stats().IndexersActive) + waitForBulkRequests(t, indexer, rdr, docs) + assert.Equal(t, int64(2), indexer.IndexersActive()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() assert.NoError(t, indexer.Close(ctx)) - stats := indexer.Stats() - stats.BytesTotal = 0 - stats.BytesUncompressedTotal = 0 - assert.Equal(t, docappender.Stats{ - Active: 0, - Added: docs, - Indexed: docs, - BulkRequests: docs, - AvailableBulkRequests: 10, - IndexersActive: 0, - IndexersCreated: 1, - IndexersDestroyed: 0, - }, stats) + + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch n := m.Name; n { + case "elasticsearch.events.queued": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.events.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.events.processed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, int64(docs), dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + case "elasticsearch.bulk_requests.count": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(docs), dp.Value) + } + case "elasticsearch.indexer.created": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(1), dp.Value) + } + case "elasticsearch.indexer.destroyed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + case "elasticsearch.bulk_requests.available": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(10), dp.Value) + } + case "elasticsearch.bulk_requests.inflight": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(0), dp.Value) + } + } + }) }) t.Run("Downscale429Rate", func(t *testing.T) { + rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + }, + )) + // Override the default GOMAXPROCS, ensuring the active indexers can scale up. setGOMAXPROCS(t, 12) var mu sync.RWMutex @@ -1739,13 +2165,14 @@ func TestAppenderScaling(t *testing.T) { }, IdleInterval: 100 * time.Millisecond, }, + MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)), }) require.NoError(t, err) t.Cleanup(func() { indexer.Close(context.Background()) }) docs := int64(20) sendDocuments(t, indexer, int(docs)) waitForScaleUp(t, indexer, 3) - waitForBulkRequests(t, indexer, docs) + waitForBulkRequests(t, indexer, rdr, docs) // Make the mocked elasticsaerch return 429 responses and wait for the // active indexers to be scaled down to the minimum. @@ -1754,8 +2181,8 @@ func TestAppenderScaling(t *testing.T) { mu.Unlock() docs += 5 sendDocuments(t, indexer, 5) - waitForScaleDown(t, indexer, 1) - waitForBulkRequests(t, indexer, docs) + waitForScaleDown(t, indexer, rdr, 1) + waitForBulkRequests(t, indexer, rdr, docs) // index 600 documents and ensure that scale ups happen to the maximum after // the threshold is exceeded. @@ -1765,12 +2192,24 @@ func TestAppenderScaling(t *testing.T) { docs += 600 sendDocuments(t, indexer, 600) waitForScaleUp(t, indexer, 3) - waitForBulkRequests(t, indexer, docs) - - stats := indexer.Stats() - assert.Equal(t, int64(3), stats.IndexersActive) - assert.Equal(t, int64(4), stats.IndexersCreated) - assert.Equal(t, int64(2), stats.IndexersDestroyed) + waitForBulkRequests(t, indexer, rdr, docs) + assert.Equal(t, int64(3), indexer.IndexersActive()) + var rm metricdata.ResourceMetrics + assert.NoError(t, rdr.Collect(context.Background(), &rm)) + docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { + switch n := m.Name; n { + case "elasticsearch.indexer.created": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(4), dp.Value) + } + case "elasticsearch.indexer.destroyed": + counter := m.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + assert.Equal(t, int64(2), dp.Value) + } + } + }) }) }