@@ -285,7 +285,37 @@ func TestAppenderRetry(t *testing.T) {
285
285
addMinimalDoc (t , indexer , "logs-foo-testing" )
286
286
}
287
287
288
- <- time .After (10 * time .Millisecond )
288
+ bulkRequests := func () int64 {
289
+ var rm metricdata.ResourceMetrics
290
+ assert .NoError (t , rdr .Collect (context .Background (), & rm ))
291
+
292
+ var res int64
293
+ for _ , m := range rm .ScopeMetrics [0 ].Metrics {
294
+ if m .Name == "elasticsearch.bulk_requests.count" {
295
+ counter := m .Data .(metricdata.Sum [int64 ])
296
+ for _ , dp := range counter .DataPoints {
297
+ res += dp .Value
298
+ }
299
+ }
300
+ }
301
+
302
+ return res
303
+ }
304
+ timeout := time .After (2 * time .Second )
305
+ loop:
306
+ for {
307
+ select {
308
+ case <- time .After (10 * time .Millisecond ):
309
+ // Because the internal channel is buffered to increase performance,
310
+ // the available indexer may not take documents right away, loop until
311
+ // the available bulk requests has been lowered.
312
+ if bulkRequests () == 1 {
313
+ break loop
314
+ }
315
+ case <- timeout :
316
+ t .Fatalf ("timed out waiting for the active bulk indexer to send one bulk request" )
317
+ }
318
+ }
289
319
290
320
var rm metricdata.ResourceMetrics
291
321
assert .NoError (t , rdr .Collect (context .Background (), & rm ))
@@ -1764,7 +1794,7 @@ func TestAppenderScaling(t *testing.T) {
1764
1794
for a .IndexersActive () < n {
1765
1795
require .LessOrEqual (t , a .IndexersActive (), limit )
1766
1796
select {
1767
- case <- time .After (1000 * time .Millisecond ):
1797
+ case <- time .After (10 * time .Millisecond ):
1768
1798
case <- timeout .C :
1769
1799
require .GreaterOrEqual (t , a .IndexersActive (), n )
1770
1800
}
@@ -1866,7 +1896,10 @@ func TestAppenderScaling(t *testing.T) {
1866
1896
sendDocuments (t , indexer , int (docs ))
1867
1897
1868
1898
waitForScaleUp (t , indexer , 3 )
1899
+ waitForBulkRequests (t , indexer , rdr , docs )
1900
+
1869
1901
waitForScaleDown (t , indexer , rdr , 1 )
1902
+ waitForBulkRequests (t , indexer , rdr , docs )
1870
1903
1871
1904
assert .Equal (t , int64 (1 ), indexer .IndexersActive ())
1872
1905
0 commit comments