@@ -97,8 +97,7 @@ func TestAppender(t *testing.T) {
97
97
))
98
98
99
99
indexerAttrs := attribute .NewSet (
100
- attribute .String ("a" , "b" ),
101
- attribute .String ("c" , "d" ),
100
+ attribute .String ("a" , "b" ), attribute .String ("c" , "d" ),
102
101
)
103
102
104
103
indexer , err := docappender .New (client , docappender.Config {
@@ -114,7 +113,7 @@ func TestAppender(t *testing.T) {
114
113
for i := 0 ; i < N ; i ++ {
115
114
addMinimalDoc (t , indexer , "logs-foo-testing" )
116
115
}
117
- <- time .After (2 * time .Second )
116
+ <- time .After (1 * time .Second )
118
117
119
118
// Appender has not been flushed, there is one active bulk indexer.
120
119
// assert.Equal(t, docappender.Stats{Added: N, Active: N, AvailableBulkRequests: 9, IndexersActive: 1}, indexer.Stats())
@@ -228,7 +227,6 @@ func TestAppenderRetry(t *testing.T) {
228
227
var bytesTotal int64
229
228
var bytesUncompressed int64
230
229
var first atomic.Bool
231
-
232
230
client := docappendertest .NewMockElasticsearchClient (t , func (w http.ResponseWriter , r * http.Request ) {
233
231
bytesTotal += r .ContentLength
234
232
_ , result , stat := docappendertest .DecodeBulkRequestWithStats (r )
@@ -265,33 +263,29 @@ func TestAppenderRetry(t *testing.T) {
265
263
))
266
264
267
265
indexerAttrs := attribute .NewSet (
268
- attribute .String ("a" , "b" ),
269
- attribute .String ("c" , "d" ),
266
+ attribute .String ("a" , "b" ), attribute .String ("c" , "d" ),
270
267
)
271
268
272
269
indexer , err := docappender .New (client , docappender.Config {
273
- FlushInterval : 2 * time .Minute ,
270
+ FlushInterval : time .Minute ,
274
271
FlushBytes : 800 , // this is enough to flush after 9 documents
275
272
MaxRequests : 1 , // to ensure the test is stable
276
273
MaxDocumentRetries : 1 , // to test the document retry logic
277
- MeterProvider : sdkmetric .NewMeterProvider (
278
- sdkmetric .WithReader (rdr ),
279
- ),
280
- MetricAttributes : indexerAttrs ,
274
+ MeterProvider : sdkmetric .NewMeterProvider (sdkmetric .WithReader (rdr )),
275
+ MetricAttributes : indexerAttrs ,
281
276
})
282
277
283
278
require .NoError (t , err )
284
- // defer indexer.Close(context.Background())
279
+ defer indexer .Close (context .Background ())
285
280
286
281
const N = 10
287
282
for i := 0 ; i < N ; i ++ {
288
283
addMinimalDoc (t , indexer , "logs-foo-testing" )
289
284
}
290
- <- time .After (20 * time .Second )
285
+ <- time .After (1 * time .Second )
291
286
292
287
var rm metricdata.ResourceMetrics
293
288
assert .NoError (t , rdr .Collect (context .Background (), & rm ))
294
- <- time .After (20 * time .Second )
295
289
296
290
var asserted atomic.Int64
297
291
assertCounter := docappendertest .NewAssertCounter (t , & asserted )
@@ -454,7 +448,6 @@ func TestAppenderAvailableAppenders(t *testing.T) {
454
448
for i := 0 ; i < N ; i ++ {
455
449
addMinimalDoc (t , indexer , "logs-foo-testing" )
456
450
}
457
-
458
451
timeout := time .NewTimer (2 * time .Second )
459
452
defer timeout .Stop ()
460
453
for i := 0 ; i < N ; i ++ {
@@ -878,7 +871,6 @@ func TestAppenderFlushRequestError(t *testing.T) {
878
871
879
872
// Closing the indexer flushes enqueued documents.
880
873
err = indexer .Close (context .Background ())
881
-
882
874
switch includeSource {
883
875
case docappender .False , docappender .True :
884
876
// include_source=false is implemented in ES so we just assert we're not
@@ -1774,7 +1766,6 @@ func TestAppenderScaling(t *testing.T) {
1774
1766
t .Cleanup (func () { indexer .Close (context .Background ()) })
1775
1767
return indexer
1776
1768
}
1777
-
1778
1769
sendDocuments := func (t * testing.T , indexer * docappender.Appender , docs int ) {
1779
1770
for i := 0 ; i < docs ; i ++ {
1780
1771
err := indexer .Add (context .Background (), "logs-foo-testing" , newJSONReader (map [string ]any {
@@ -1866,7 +1857,6 @@ func TestAppenderScaling(t *testing.T) {
1866
1857
}
1867
1858
}
1868
1859
}
1869
-
1870
1860
t .Run ("DownscaleIdle" , func (t * testing.T ) {
1871
1861
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
1872
1862
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -1893,7 +1883,6 @@ func TestAppenderScaling(t *testing.T) {
1893
1883
})
1894
1884
docs := int64 (20 )
1895
1885
sendDocuments (t , indexer , int (docs ))
1896
-
1897
1886
waitForScaleUp (t , indexer , 3 )
1898
1887
waitForScaleDown (t , indexer , rdr , 1 )
1899
1888
@@ -1954,7 +1943,6 @@ func TestAppenderScaling(t *testing.T) {
1954
1943
}
1955
1944
})
1956
1945
})
1957
-
1958
1946
t .Run ("DownscaleActiveLimit" , func (t * testing.T ) {
1959
1947
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
1960
1948
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -1982,11 +1970,9 @@ func TestAppenderScaling(t *testing.T) {
1982
1970
})
1983
1971
docs := int64 (14 )
1984
1972
sendDocuments (t , indexer , int (docs ))
1985
-
1986
1973
waitForScaleUp (t , indexer , 3 )
1987
1974
// Set the gomaxprocs to 4, which should result in an activeLimit of 1.
1988
1975
setGOMAXPROCS (t , 4 )
1989
-
1990
1976
// Wait for the indexers to scale down from 3 to 1. The downscale cool
1991
1977
// down of `1m` isn't respected, since the active limit is breached with
1992
1978
// the gomaxprocs change.
@@ -2051,7 +2037,6 @@ func TestAppenderScaling(t *testing.T) {
2051
2037
}
2052
2038
})
2053
2039
})
2054
-
2055
2040
t .Run ("UpscaleCooldown" , func (t * testing.T ) {
2056
2041
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
2057
2042
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -2079,12 +2064,9 @@ func TestAppenderScaling(t *testing.T) {
2079
2064
})
2080
2065
docs := int64 (50 )
2081
2066
sendDocuments (t , indexer , int (docs ))
2082
-
2083
2067
waitForScaleUp (t , indexer , 2 )
2084
2068
waitForBulkRequests (t , indexer , rdr , docs )
2085
-
2086
2069
assert .Equal (t , int64 (2 ), indexer .IndexersActive ())
2087
-
2088
2070
ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
2089
2071
defer cancel ()
2090
2072
assert .NoError (t , indexer .Close (ctx ))
@@ -2144,7 +2126,6 @@ func TestAppenderScaling(t *testing.T) {
2144
2126
}
2145
2127
})
2146
2128
})
2147
-
2148
2129
t .Run ("Downscale429Rate" , func (t * testing.T ) {
2149
2130
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
2150
2131
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -2177,23 +2158,19 @@ func TestAppenderScaling(t *testing.T) {
2177
2158
FlushBytes : 1 ,
2178
2159
Scaling : docappender.ScalingConfig {
2179
2160
ScaleUp : docappender.ScaleActionConfig {
2180
- Threshold : 5 ,
2181
- CoolDown : 1 ,
2161
+ Threshold : 5 , CoolDown : 1 ,
2182
2162
},
2183
2163
ScaleDown : docappender.ScaleActionConfig {
2184
- Threshold : 100 ,
2185
- CoolDown : 100 * time .Millisecond ,
2164
+ Threshold : 100 , CoolDown : 100 * time .Millisecond ,
2186
2165
},
2187
2166
IdleInterval : 100 * time .Millisecond ,
2188
2167
},
2189
2168
MeterProvider : sdkmetric .NewMeterProvider (sdkmetric .WithReader (rdr )),
2190
2169
})
2191
2170
require .NoError (t , err )
2192
2171
t .Cleanup (func () { indexer .Close (context .Background ()) })
2193
-
2194
2172
docs := int64 (20 )
2195
2173
sendDocuments (t , indexer , int (docs ))
2196
-
2197
2174
waitForScaleUp (t , indexer , 3 )
2198
2175
waitForBulkRequests (t , indexer , rdr , docs )
2199
2176
@@ -2204,7 +2181,6 @@ func TestAppenderScaling(t *testing.T) {
2204
2181
mu .Unlock ()
2205
2182
docs += 5
2206
2183
sendDocuments (t , indexer , 5 )
2207
-
2208
2184
waitForScaleDown (t , indexer , rdr , 1 )
2209
2185
waitForBulkRequests (t , indexer , rdr , docs )
2210
2186
@@ -2215,15 +2191,11 @@ func TestAppenderScaling(t *testing.T) {
2215
2191
mu .Unlock ()
2216
2192
docs += 600
2217
2193
sendDocuments (t , indexer , 600 )
2218
-
2219
2194
waitForScaleUp (t , indexer , 3 )
2220
2195
waitForBulkRequests (t , indexer , rdr , docs )
2221
-
2222
2196
assert .Equal (t , int64 (3 ), indexer .IndexersActive ())
2223
-
2224
2197
var rm metricdata.ResourceMetrics
2225
2198
assert .NoError (t , rdr .Collect (context .Background (), & rm ))
2226
-
2227
2199
docappendertest .AssertOTelMetrics (t , rm .ScopeMetrics [0 ].Metrics , func (m metricdata.Metrics ) {
2228
2200
switch n := m .Name ; n {
2229
2201
case "elasticsearch.indexer.created" :
0 commit comments