@@ -126,7 +126,8 @@ func New(client elastictransport.Interface, cfg Config) (*Appender, error) {
126
126
// Register the Appender ID in the pool.
127
127
indexer .id = fmt .Sprintf ("%p" , indexer )
128
128
indexer .pool .Register (indexer .id )
129
- indexer .addUpDownCount (int64 (cfg .MaxRequests ), ms .availableBulkRequests )
129
+ attrs := metric .WithAttributeSet (indexer .config .MetricAttributes )
130
+ indexer .metrics .availableBulkRequests .Add (context .Background (), int64 (cfg .MaxRequests ), attrs )
130
131
// We create a cancellable context for the errgroup.Group for unblocking
131
132
// flushes when Close returns. We intentionally do not use errgroup.WithContext,
132
133
// because one flush failure should not cause the context to be cancelled.
@@ -226,20 +227,15 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo)
226
227
a .docsAdded .Add (1 )
227
228
attrs := metric .WithAttributeSet (a .config .MetricAttributes )
228
229
a .metrics .docsAdded .Add (context .Background (), 1 , attrs )
230
+ a .metrics .docsActive .Add (context .Background (), 1 , attrs )
229
231
230
- a .addUpDownCount (1 , a .metrics .docsActive )
231
232
return nil
232
233
}
233
234
234
235
func (a * Appender ) IndexersActive () int64 {
235
236
return a .scalingInformation ().activeIndexers
236
237
}
237
238
238
- func (a * Appender ) addUpDownCount (delta int64 , m metric.Int64UpDownCounter , opts ... metric.AddOption ) {
239
- attrs := metric .WithAttributeSet (a .config .MetricAttributes )
240
- m .Add (context .Background (), delta , append (opts , attrs )... )
241
- }
242
-
243
239
func (a * Appender ) flush (ctx context.Context , bulkIndexer * BulkIndexer ) error {
244
240
n := bulkIndexer .Items ()
245
241
if n == 0 {
@@ -291,7 +287,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
291
287
a .metrics .bytesUncompressedTotal .Add (context .Background (), int64 (flushed ), attrs )
292
288
}
293
289
if err != nil {
294
- a .addUpDownCount (- int64 (n ), a .metrics .docsActive )
290
+ attrs := metric .WithAttributeSet (a .config .MetricAttributes )
291
+ a .metrics .docsActive .Add (context .Background (), - int64 (n ), attrs )
295
292
logger .Error ("bulk indexing request failed" , zap .Error (err ))
296
293
if a .otelTracingEnabled () && span .IsRecording () {
297
294
span .RecordError (err )
@@ -350,7 +347,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
350
347
}
351
348
docsFailed = int64 (len (resp .FailedDocs ))
352
349
totalFlushed := docsFailed + docsIndexed
353
- a .addUpDownCount (- totalFlushed , a .metrics .docsActive )
350
+ attrs := metric .WithAttributeSet (a .config .MetricAttributes )
351
+ a .metrics .docsActive .Add (context .Background (), - totalFlushed , attrs )
354
352
for _ , info := range resp .FailedDocs {
355
353
if info .Status >= 400 && info .Status < 500 {
356
354
if info .Status == http .StatusTooManyRequests {
@@ -498,8 +496,9 @@ func (a *Appender) runActiveIndexer() {
498
496
// to reset it to ensure we're using the right client.
499
497
active .SetClient (a .client )
500
498
501
- a .addUpDownCount (- 1 , a .metrics .availableBulkRequests )
502
- a .addUpDownCount (1 , a .metrics .inflightBulkrequests )
499
+ attrs := metric .WithAttributeSet (a .config .MetricAttributes )
500
+ a .metrics .availableBulkRequests .Add (context .Background (), - 1 , attrs )
501
+ a .metrics .inflightBulkrequests .Add (context .Background (), 1 , attrs )
503
502
flushTimer .Reset (a .config .FlushInterval )
504
503
}
505
504
if err := active .Add (item ); err != nil {
@@ -564,8 +563,9 @@ func (a *Appender) runActiveIndexer() {
564
563
})
565
564
indexer .Reset ()
566
565
a .pool .Put (a .id , indexer )
567
- a .addUpDownCount (1 , a .metrics .availableBulkRequests )
568
- a .addUpDownCount (- 1 , a .metrics .inflightBulkrequests , attrs )
566
+ attrs := metric .WithAttributeSet (a .config .MetricAttributes )
567
+ a .metrics .availableBulkRequests .Add (context .Background (), 1 , attrs )
568
+ a .metrics .inflightBulkrequests .Add (context .Background (), - 1 , attrs )
569
569
a .metrics .flushDuration .Record (context .Background (), took .Seconds (),
570
570
attrs ,
571
571
)
0 commit comments