@@ -97,8 +97,7 @@ func TestAppender(t *testing.T) {
97
97
))
98
98
99
99
indexerAttrs := attribute .NewSet (
100
- attribute .String ("a" , "b" ),
101
- attribute .String ("c" , "d" ),
100
+ attribute .String ("a" , "b" ), attribute .String ("c" , "d" ),
102
101
)
103
102
104
103
indexer , err := docappender .New (client , docappender.Config {
@@ -228,7 +227,6 @@ func TestAppenderRetry(t *testing.T) {
228
227
var bytesTotal int64
229
228
var bytesUncompressed int64
230
229
var first atomic.Bool
231
-
232
230
client := docappendertest .NewMockElasticsearchClient (t , func (w http.ResponseWriter , r * http.Request ) {
233
231
bytesTotal += r .ContentLength
234
232
_ , result , stat := docappendertest .DecodeBulkRequestWithStats (r )
@@ -265,23 +263,20 @@ func TestAppenderRetry(t *testing.T) {
265
263
))
266
264
267
265
indexerAttrs := attribute .NewSet (
268
- attribute .String ("a" , "b" ),
269
- attribute .String ("c" , "d" ),
266
+ attribute .String ("a" , "b" ), attribute .String ("c" , "d" ),
270
267
)
271
268
272
269
indexer , err := docappender .New (client , docappender.Config {
273
- FlushInterval : 2 * time .Minute ,
270
+ FlushInterval : time .Minute ,
274
271
FlushBytes : 800 , // this is enough to flush after 9 documents
275
272
MaxRequests : 1 , // to ensure the test is stable
276
273
MaxDocumentRetries : 1 , // to test the document retry logic
277
- MeterProvider : sdkmetric .NewMeterProvider (
278
- sdkmetric .WithReader (rdr ),
279
- ),
280
- MetricAttributes : indexerAttrs ,
274
+ MeterProvider : sdkmetric .NewMeterProvider (sdkmetric .WithReader (rdr )),
275
+ MetricAttributes : indexerAttrs ,
281
276
})
282
277
283
278
require .NoError (t , err )
284
- // defer indexer.Close(context.Background())
279
+ defer indexer .Close (context .Background ())
285
280
286
281
const N = 10
287
282
for i := 0 ; i < N ; i ++ {
@@ -454,7 +449,6 @@ func TestAppenderAvailableAppenders(t *testing.T) {
454
449
for i := 0 ; i < N ; i ++ {
455
450
addMinimalDoc (t , indexer , "logs-foo-testing" )
456
451
}
457
-
458
452
timeout := time .NewTimer (2 * time .Second )
459
453
defer timeout .Stop ()
460
454
for i := 0 ; i < N ; i ++ {
@@ -878,7 +872,6 @@ func TestAppenderFlushRequestError(t *testing.T) {
878
872
879
873
// Closing the indexer flushes enqueued documents.
880
874
err = indexer .Close (context .Background ())
881
-
882
875
switch includeSource {
883
876
case docappender .False , docappender .True :
884
877
// include_source=false is implemented in ES so we just assert we're not
@@ -1774,7 +1767,6 @@ func TestAppenderScaling(t *testing.T) {
1774
1767
t .Cleanup (func () { indexer .Close (context .Background ()) })
1775
1768
return indexer
1776
1769
}
1777
-
1778
1770
sendDocuments := func (t * testing.T , indexer * docappender.Appender , docs int ) {
1779
1771
for i := 0 ; i < docs ; i ++ {
1780
1772
err := indexer .Add (context .Background (), "logs-foo-testing" , newJSONReader (map [string ]any {
@@ -1866,7 +1858,6 @@ func TestAppenderScaling(t *testing.T) {
1866
1858
}
1867
1859
}
1868
1860
}
1869
-
1870
1861
t .Run ("DownscaleIdle" , func (t * testing.T ) {
1871
1862
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
1872
1863
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -1893,7 +1884,6 @@ func TestAppenderScaling(t *testing.T) {
1893
1884
})
1894
1885
docs := int64 (20 )
1895
1886
sendDocuments (t , indexer , int (docs ))
1896
-
1897
1887
waitForScaleUp (t , indexer , 3 )
1898
1888
waitForScaleDown (t , indexer , rdr , 1 )
1899
1889
@@ -1954,7 +1944,6 @@ func TestAppenderScaling(t *testing.T) {
1954
1944
}
1955
1945
})
1956
1946
})
1957
-
1958
1947
t .Run ("DownscaleActiveLimit" , func (t * testing.T ) {
1959
1948
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
1960
1949
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -1982,11 +1971,9 @@ func TestAppenderScaling(t *testing.T) {
1982
1971
})
1983
1972
docs := int64 (14 )
1984
1973
sendDocuments (t , indexer , int (docs ))
1985
-
1986
1974
waitForScaleUp (t , indexer , 3 )
1987
1975
// Set the gomaxprocs to 4, which should result in an activeLimit of 1.
1988
1976
setGOMAXPROCS (t , 4 )
1989
-
1990
1977
// Wait for the indexers to scale down from 3 to 1. The downscale cool
1991
1978
// down of `1m` isn't respected, since the active limit is breached with
1992
1979
// the gomaxprocs change.
@@ -2051,7 +2038,6 @@ func TestAppenderScaling(t *testing.T) {
2051
2038
}
2052
2039
})
2053
2040
})
2054
-
2055
2041
t .Run ("UpscaleCooldown" , func (t * testing.T ) {
2056
2042
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
2057
2043
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -2079,10 +2065,8 @@ func TestAppenderScaling(t *testing.T) {
2079
2065
})
2080
2066
docs := int64 (50 )
2081
2067
sendDocuments (t , indexer , int (docs ))
2082
-
2083
2068
waitForScaleUp (t , indexer , 2 )
2084
2069
waitForBulkRequests (t , indexer , rdr , docs )
2085
-
2086
2070
assert .Equal (t , int64 (2 ), indexer .IndexersActive ())
2087
2071
2088
2072
ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
@@ -2144,7 +2128,6 @@ func TestAppenderScaling(t *testing.T) {
2144
2128
}
2145
2129
})
2146
2130
})
2147
-
2148
2131
t .Run ("Downscale429Rate" , func (t * testing.T ) {
2149
2132
rdr := sdkmetric .NewManualReader (sdkmetric .WithTemporalitySelector (
2150
2133
func (ik sdkmetric.InstrumentKind ) metricdata.Temporality {
@@ -2177,23 +2160,19 @@ func TestAppenderScaling(t *testing.T) {
2177
2160
FlushBytes : 1 ,
2178
2161
Scaling : docappender.ScalingConfig {
2179
2162
ScaleUp : docappender.ScaleActionConfig {
2180
- Threshold : 5 ,
2181
- CoolDown : 1 ,
2163
+ Threshold : 5 , CoolDown : 1 ,
2182
2164
},
2183
2165
ScaleDown : docappender.ScaleActionConfig {
2184
- Threshold : 100 ,
2185
- CoolDown : 100 * time .Millisecond ,
2166
+ Threshold : 100 , CoolDown : 100 * time .Millisecond ,
2186
2167
},
2187
2168
IdleInterval : 100 * time .Millisecond ,
2188
2169
},
2189
2170
MeterProvider : sdkmetric .NewMeterProvider (sdkmetric .WithReader (rdr )),
2190
2171
})
2191
2172
require .NoError (t , err )
2192
2173
t .Cleanup (func () { indexer .Close (context .Background ()) })
2193
-
2194
2174
docs := int64 (20 )
2195
2175
sendDocuments (t , indexer , int (docs ))
2196
-
2197
2176
waitForScaleUp (t , indexer , 3 )
2198
2177
waitForBulkRequests (t , indexer , rdr , docs )
2199
2178
@@ -2204,7 +2183,6 @@ func TestAppenderScaling(t *testing.T) {
2204
2183
mu .Unlock ()
2205
2184
docs += 5
2206
2185
sendDocuments (t , indexer , 5 )
2207
-
2208
2186
waitForScaleDown (t , indexer , rdr , 1 )
2209
2187
waitForBulkRequests (t , indexer , rdr , docs )
2210
2188
@@ -2215,15 +2193,11 @@ func TestAppenderScaling(t *testing.T) {
2215
2193
mu .Unlock ()
2216
2194
docs += 600
2217
2195
sendDocuments (t , indexer , 600 )
2218
-
2219
2196
waitForScaleUp (t , indexer , 3 )
2220
2197
waitForBulkRequests (t , indexer , rdr , docs )
2221
-
2222
2198
assert .Equal (t , int64 (3 ), indexer .IndexersActive ())
2223
-
2224
2199
var rm metricdata.ResourceMetrics
2225
2200
assert .NoError (t , rdr .Collect (context .Background (), & rm ))
2226
-
2227
2201
docappendertest .AssertOTelMetrics (t , rm .ScopeMetrics [0 ].Metrics , func (m metricdata.Metrics ) {
2228
2202
switch n := m .Name ; n {
2229
2203
case "elasticsearch.indexer.created" :
0 commit comments