@@ -25,9 +25,40 @@ import (
2525)
2626
2727const (
28- dbCommitThresholdBytes = 10 * 1024 * 1024 // commit every 10MB
29- aggregationIvlKey = "aggregation_interval"
30- aggregationTypeKey = "aggregation_type"
28+ // Batch grows in multiples of 2 based on the initial size. For
29+ // example, if the initial size is 1MB then the batch will grow as
30+ // {2, 4, 8, 16, ...}. If a batch of size greater than 4MBs is
31+ // consistently committed then that batch will never be retained
32+ // if the max retained size is smaller than 8MBs as the batch capacity
33+ // will always grow to 8MB.
34+ initialPebbleBatchSize = 64 << 10 // 64KB
35+ maxRetainedPebbleBatchSize = 8 << 20 // 8MB
36+
37+ // pebbleMemTableSize defines the max stead state size of a memtable.
38+ // There can be more than 1 memtable in memory at a time as it takes
39+ // time for old memtable to flush. The memtable size also defines
40+ // the size for large batches. A large batch is a batch which will
41+ // take atleast half of the memtable size. Note that the Batch#Len
42+ // is not the same as the memtable size that the batch will occupy
43+ // as data in batches are encoded differently. In general, the
44+ // memtable size of the batch will be higher than the length of the
45+ // batch data.
46+ //
47+ // On commit, data in the large batch maybe kept by pebble and thus
48+ // large batches will need to be reallocated. Note that large batch
49+ // classification uses the memtable size that a batch will occupy
50+ // rather than the length of data slice backing the batch.
51+ pebbleMemTableSize = 32 << 20 // 32MB
52+
53+ // dbCommitThresholdBytes is a soft limit and the batch is committed
54+ // to the DB as soon as it crosses this threshold. To make sure that
55+ // the commit threshold plays will with the max retained batch size
56+ // the threshold should be kept smaller than the sum of max retained
57+ // batch size and encoded size of aggregated data to be committed.
58+ dbCommitThresholdBytes = 8000 << 10 // 8000KB
59+
60+ aggregationIvlKey = "aggregation_interval"
61+ aggregationTypeKey = "aggregation_type"
3162)
3263
3364var (
@@ -86,6 +117,7 @@ func New(opts ...Option) (*Aggregator, error) {
86117 return & merger , nil
87118 },
88119 },
120+ MemTableSize : pebbleMemTableSize ,
89121 }
90122 writeOptions := pebble .Sync
91123 if cfg .InMemory {
@@ -111,6 +143,7 @@ func New(opts ...Option) (*Aggregator, error) {
111143 writeOptions : writeOptions ,
112144 cfg : cfg ,
113145 processingTime : time .Now ().Truncate (cfg .AggregationIntervals [0 ]),
146+ batch : newBatch (pb ),
114147 closed : make (chan struct {}),
115148 metrics : metrics ,
116149 }, nil
@@ -246,6 +279,8 @@ func (a *Aggregator) Run(ctx context.Context) error {
246279 a .mu .Unlock ()
247280 defer close (a .runStopped )
248281
282+ harvestBatch := newBatch (a .db )
283+ defer func () { harvestBatch .Close () }()
249284 to := a .processingTime .Add (a .cfg .AggregationIntervals [0 ])
250285 timer := time .NewTimer (time .Until (to .Add (a .cfg .HarvestDelay )))
251286 defer timer .Stop ()
@@ -259,14 +294,16 @@ func (a *Aggregator) Run(ctx context.Context) error {
259294 }
260295
261296 a .mu .Lock ()
262- batch := a .batch
263- a .batch = nil
297+ harvestBatch , a .batch = a .batch , harvestBatch
264298 a .processingTime = to
265299 cachedEventsStats := a .cachedEvents .loadAndDelete (to )
266300 a .mu .Unlock ()
267301
268- if err := a .commitAndHarvest (ctx , batch , to , cachedEventsStats ); err != nil {
269- a .cfg .Logger .Warn ("failed to commit and harvest metrics" , zap .Error (err ))
302+ if err := commitAndReset (harvestBatch , a .writeOptions ); err != nil {
303+ a .cfg .Logger .Warn ("failed to commit batch" , zap .Error (err ))
304+ }
305+ if err := a .harvest (ctx , harvestBatch , to , cachedEventsStats ); err != nil {
306+ a .cfg .Logger .Warn ("failed to harvest aggregated metrics" , zap .Error (err ))
270307 }
271308 to = to .Add (a .cfg .AggregationIntervals [0 ])
272309 timer .Reset (time .Until (to .Add (a .cfg .HarvestDelay )))
@@ -300,42 +337,47 @@ func (a *Aggregator) Close(ctx context.Context) error {
300337 }
301338
302339 if a .db != nil {
303- a .cfg .Logger .Info ("running final aggregation" )
304340 if a .batch != nil {
305- if err := a .batch .Commit (a .writeOptions ); err != nil {
341+ a .cfg .Logger .Info ("running final aggregation" )
342+ if err := commitAndReset (a .batch , a .writeOptions ); err != nil {
306343 span .RecordError (err )
307344 return fmt .Errorf ("failed to commit batch: %w" , err )
308345 }
309- if err := a .batch .Close (); err != nil {
310- span .RecordError (err )
311- return fmt .Errorf ("failed to close batch: %w" , err )
346+ var errs []error
347+ for _ , ivl := range a .cfg .AggregationIntervals {
348+ // At any particular time there will be 1 harvest candidate for
349+ // each aggregation interval. We will align the end time and
350+ // process each of these.
351+ //
352+ // TODO (lahsivjar): It is possible to harvest the same
353+ // time multiple times, not an issue but can be optimized.
354+ to := a .processingTime .Truncate (ivl ).Add (ivl )
355+ if err := a .harvest (ctx , a .batch , to , a .cachedEvents .loadAndDelete (to )); err != nil {
356+ span .RecordError (err )
357+ errs = append (errs , fmt .Errorf (
358+ "failed to harvest metrics for interval %s: %w" , formatDuration (ivl ), err ),
359+ )
360+ }
312361 }
313- a .batch = nil
314- }
315- var errs []error
316- for _ , ivl := range a .cfg .AggregationIntervals {
317- // At any particular time there will be 1 harvest candidate for
318- // each aggregation interval. We will align the end time and
319- // process each of these.
320- //
321- // TODO (lahsivjar): It is possible to harvest the same
322- // time multiple times, not an issue but can be optimized.
323- to := a .processingTime .Truncate (ivl ).Add (ivl )
324- if err := a .harvest (ctx , to , a .cachedEvents .loadAndDelete (to )); err != nil {
325- span .RecordError (err )
326- errs = append (errs , fmt .Errorf (
327- "failed to harvest metrics for interval %s: %w" , formatDuration (ivl ), err ),
328- )
362+ if len (errs ) > 0 {
363+ return fmt .Errorf ("failed while running final harvest: %w" , errors .Join (errs ... ))
329364 }
330365 }
331- if len (errs ) > 0 {
332- return fmt .Errorf ("failed while running final harvest: %w" , errors .Join (errs ... ))
366+ if err := a .batch .Close (); err != nil {
367+ // Failing to close batch is a non-fatal error as we are simply failing to return
368+ // the batch to the pool. This error should not be retried so it is ignored but
369+ // recorded for telemetry.
370+ span .RecordError (err )
371+ a .cfg .Logger .Warn ("failed to close batch, this is non-fatal and doesn't lead to data loss" )
333372 }
373+ // No need to retry final aggregation.
374+ a .batch = nil
375+
334376 if err := a .db .Close (); err != nil {
335377 span .RecordError (err )
336378 return fmt .Errorf ("failed to close pebble: %w" , err )
337379 }
338- // All future operations are invalid after db is closed
380+ // All future operations are invalid after db is closed.
339381 a .db = nil
340382 }
341383 if err := a .metrics .CleanUp (); err != nil {
@@ -370,12 +412,6 @@ func (a *Aggregator) aggregate(
370412 cmk CombinedMetricsKey ,
371413 cm * aggregationpb.CombinedMetrics ,
372414) (int , error ) {
373- if a .batch == nil {
374- // Batch is backed by a sync pool. After each commit we will release the batch
375- // back to the pool by calling Batch#Close and subsequently acquire a new batch.
376- a .batch = a .db .NewBatch ()
377- }
378-
379415 op := a .batch .MergeDeferred (cmk .SizeBinary (), cm .SizeVT ())
380416 if err := cmk .MarshalBinaryToSizedBuffer (op .Key ); err != nil {
381417 return 0 , fmt .Errorf ("failed to marshal combined metrics key: %w" , err )
@@ -389,52 +425,19 @@ func (a *Aggregator) aggregate(
389425
390426 bytesIn := cm .SizeVT ()
391427 if a .batch .Len () >= dbCommitThresholdBytes {
392- if err := a .batch . Commit ( a .writeOptions ); err != nil {
428+ if err := commitAndReset ( a .batch , a .writeOptions ); err != nil {
393429 return bytesIn , fmt .Errorf ("failed to commit pebble batch: %w" , err )
394430 }
395- if err := a .batch .Close (); err != nil {
396- return bytesIn , fmt .Errorf ("failed to close pebble batch: %w" , err )
397- }
398- a .batch = nil
399431 }
400432 return bytesIn , nil
401433}
402434
403- func (a * Aggregator ) commitAndHarvest (
404- ctx context.Context ,
405- batch * pebble.Batch ,
406- to time.Time ,
407- cachedEventsStats map [time.Duration ]map [[16 ]byte ]float64 ,
408- ) error {
409- ctx , span := a .cfg .Tracer .Start (ctx , "commitAndHarvest" )
410- defer span .End ()
411-
412- var errs []error
413- if batch != nil {
414- if err := batch .Commit (a .writeOptions ); err != nil {
415- span .RecordError (err )
416- errs = append (errs , fmt .Errorf ("failed to commit batch before harvest: %w" , err ))
417- }
418- if err := batch .Close (); err != nil {
419- span .RecordError (err )
420- errs = append (errs , fmt .Errorf ("failed to close batch before harvest: %w" , err ))
421- }
422- }
423- if err := a .harvest (ctx , to , cachedEventsStats ); err != nil {
424- span .RecordError (err )
425- errs = append (errs , fmt .Errorf ("failed to harvest aggregated metrics: %w" , err ))
426- }
427- if len (errs ) > 0 {
428- return errors .Join (errs ... )
429- }
430- return nil
431- }
432-
433435// harvest collects the mature metrics for all aggregation intervals and
434436// deletes the entries in db once the metrics are fully harvested. Harvest
435437// takes an end time denoting the exclusive upper bound for harvesting.
436438func (a * Aggregator ) harvest (
437439 ctx context.Context ,
440+ batch * pebble.Batch ,
438441 end time.Time ,
439442 cachedEventsStats map [time.Duration ]map [[16 ]byte ]float64 ,
440443) error {
@@ -447,7 +450,7 @@ func (a *Aggregator) harvest(
447450 if end .Truncate (ivl ).Equal (end ) {
448451 start := end .Add (- ivl ).Add (- a .cfg .Lookback )
449452 cmCount , err := a .harvestForInterval (
450- ctx , snap , start , end , ivl , cachedEventsStats [ivl ],
453+ ctx , batch , snap , start , end , ivl , cachedEventsStats [ivl ],
451454 )
452455 if err != nil {
453456 errs = append (errs , fmt .Errorf (
@@ -473,6 +476,7 @@ func (a *Aggregator) harvest(
473476// combined metrics if some of the combined metrics failed harvest.
474477func (a * Aggregator ) harvestForInterval (
475478 ctx context.Context ,
479+ batch * pebble.Batch ,
476480 snap * pebble.Snapshot ,
477481 start , end time.Time ,
478482 ivl time.Duration ,
@@ -491,11 +495,14 @@ func (a *Aggregator) harvestForInterval(
491495 from .MarshalBinaryToSizedBuffer (lb )
492496 to .MarshalBinaryToSizedBuffer (ub )
493497
494- iter := snap .NewIter (& pebble.IterOptions {
498+ iter , err := snap .NewIter (& pebble.IterOptions {
495499 LowerBound : lb ,
496500 UpperBound : ub ,
497501 KeyTypes : pebble .IterKeyTypePointsOnly ,
498502 })
503+ if err != nil {
504+ return 0 , fmt .Errorf ("failed to create pebble iterator: %w" , err )
505+ }
499506 defer iter .Close ()
500507
501508 var errs []error
@@ -557,7 +564,12 @@ func (a *Aggregator) harvestForInterval(
557564 a .metrics .EventsProcessed .Add (context .Background (), harvestStats .eventsTotal , commonAttrsOpt , outcomeAttrOpt )
558565 cachedEventsStats [cmk .ID ] -= harvestStats .eventsTotal
559566 }
560- err := a .db .DeleteRange (lb , ub , a .writeOptions )
567+ if err := batch .DeleteRange (lb , ub , a .writeOptions ); err != nil {
568+ errs = append (errs , fmt .Errorf ("failed to delete harvested interval: %w" , err ))
569+ }
570+ if err := commitAndReset (batch , a .writeOptions ); err != nil {
571+ errs = append (errs , fmt .Errorf ("failed to commit batch: %w" , err ))
572+ }
561573 if len (errs ) > 0 {
562574 err = errors .Join (err , fmt .Errorf (
563575 "failed to process %d out of %d metrics:\n %w" ,
@@ -768,3 +780,21 @@ func (hs *harvestStats) addOverflows(cm *aggregationpb.CombinedMetrics, limits L
768780 addOverflow (ksm .GetMetrics ().GetOverflowGroups (), ksm )
769781 }
770782}
783+
784+ // commitAndReset commits and resets a pebble batch. Note that the batch
785+ // is reset even if the commit fails dropping any data in the batch and
786+ // resetting it for reuse.
787+ func commitAndReset (b * pebble.Batch , opts * pebble.WriteOptions ) error {
788+ defer b .Reset ()
789+ if err := b .Commit (opts ); err != nil {
790+ return fmt .Errorf ("failed to commit batch: %w" , err )
791+ }
792+ return nil
793+ }
794+
795+ func newBatch (db * pebble.DB ) * pebble.Batch {
796+ return db .NewBatch (
797+ pebble .WithInitialSizeBytes (initialPebbleBatchSize ),
798+ pebble .WithMaxRetainedSizeBytes (maxRetainedPebbleBatchSize ),
799+ )
800+ }
0 commit comments