Skip to content

Commit 2b0fd76

Browse files
committed
Optimize batch usage
1 parent 4d757dd commit 2b0fd76

File tree

3 files changed

+146
-75
lines changed

3 files changed

+146
-75
lines changed

aggregators/aggregator.go

Lines changed: 102 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,40 @@ import (
2525
)
2626

2727
const (
28-
dbCommitThresholdBytes = 10 * 1024 * 1024 // commit every 10MB
29-
aggregationIvlKey = "aggregation_interval"
30-
aggregationTypeKey = "aggregation_type"
28+
// Batch grows in multiples of 2 based on the initial size. For
29+
// example, if the initial size is 1MB then the batch will grow as
30+
// {2, 4, 8, 16, ...}. If a batch of size greater than 4MBs is
31+
// consistently committed then that batch will never be retained
32+
// if the max retained size is smaller than 8MBs as the batch capacity
33+
// will always grow to 8MB.
34+
initialPebbleBatchSize = 64 << 10 // 64KB
35+
maxRetainedPebbleBatchSize = 8 << 20 // 8MB
36+
37+
// pebbleMemTableSize defines the max stead state size of a memtable.
38+
// There can be more than 1 memtable in memory at a time as it takes
39+
// time for old memtable to flush. The memtable size also defines
40+
// the size for large batches. A large batch is a batch which will
41+
// take atleast half of the memtable size. Note that the Batch#Len
42+
// is not the same as the memtable size that the batch will occupy
43+
// as data in batches are encoded differently. In general, the
44+
// memtable size of the batch will be higher than the length of the
45+
// batch data.
46+
//
47+
// On commit, data in the large batch maybe kept by pebble and thus
48+
// large batches will need to be reallocated. Note that large batch
49+
// classification uses the memtable size that a batch will occupy
50+
// rather than the length of data slice backing the batch.
51+
pebbleMemTableSize = 32 << 20 // 32MB
52+
53+
// dbCommitThresholdBytes is a soft limit and the batch is committed
54+
// to the DB as soon as it crosses this threshold. To make sure that
55+
// the commit threshold plays will with the max retained batch size
56+
// the threshold should be kept smaller than the sum of max retained
57+
// batch size and encoded size of aggregated data to be committed.
58+
dbCommitThresholdBytes = 8000 << 10 // 8000KB
59+
60+
aggregationIvlKey = "aggregation_interval"
61+
aggregationTypeKey = "aggregation_type"
3162
)
3263

3364
var (
@@ -86,6 +117,7 @@ func New(opts ...Option) (*Aggregator, error) {
86117
return &merger, nil
87118
},
88119
},
120+
MemTableSize: pebbleMemTableSize,
89121
}
90122
writeOptions := pebble.Sync
91123
if cfg.InMemory {
@@ -111,6 +143,7 @@ func New(opts ...Option) (*Aggregator, error) {
111143
writeOptions: writeOptions,
112144
cfg: cfg,
113145
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
146+
batch: newBatch(pb),
114147
closed: make(chan struct{}),
115148
metrics: metrics,
116149
}, nil
@@ -248,6 +281,8 @@ func (a *Aggregator) Run(ctx context.Context) error {
248281
a.mu.Unlock()
249282
defer close(a.runStopped)
250283

284+
harvestBatch := newBatch(a.db)
285+
defer func() { harvestBatch.Close() }()
251286
to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
252287
timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
253288
defer timer.Stop()
@@ -261,14 +296,16 @@ func (a *Aggregator) Run(ctx context.Context) error {
261296
}
262297

263298
a.mu.Lock()
264-
batch := a.batch
265-
a.batch = nil
299+
harvestBatch, a.batch = a.batch, harvestBatch
266300
a.processingTime = to
267301
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
268302
a.mu.Unlock()
269303

270-
if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
271-
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
304+
if err := commitAndReset(harvestBatch, a.writeOptions); err != nil {
305+
a.cfg.Logger.Warn("failed to commit batch", zap.Error(err))
306+
}
307+
if err := a.harvest(ctx, harvestBatch, to, cachedEventsStats); err != nil {
308+
a.cfg.Logger.Warn("failed to harvest aggregated metrics", zap.Error(err))
272309
}
273310
to = to.Add(a.cfg.AggregationIntervals[0])
274311
timer.Reset(time.Until(to.Add(a.cfg.HarvestDelay)))
@@ -302,42 +339,47 @@ func (a *Aggregator) Close(ctx context.Context) error {
302339
}
303340

304341
if a.db != nil {
305-
a.cfg.Logger.Info("running final aggregation")
306342
if a.batch != nil {
307-
if err := a.batch.Commit(a.writeOptions); err != nil {
343+
a.cfg.Logger.Info("running final aggregation")
344+
if err := commitAndReset(a.batch, a.writeOptions); err != nil {
308345
span.RecordError(err)
309346
return fmt.Errorf("failed to commit batch: %w", err)
310347
}
311-
if err := a.batch.Close(); err != nil {
312-
span.RecordError(err)
313-
return fmt.Errorf("failed to close batch: %w", err)
348+
var errs []error
349+
for _, ivl := range a.cfg.AggregationIntervals {
350+
// At any particular time there will be 1 harvest candidate for
351+
// each aggregation interval. We will align the end time and
352+
// process each of these.
353+
//
354+
// TODO (lahsivjar): It is possible to harvest the same
355+
// time multiple times, not an issue but can be optimized.
356+
to := a.processingTime.Truncate(ivl).Add(ivl)
357+
if err := a.harvest(ctx, a.batch, to, a.cachedEvents.loadAndDelete(to)); err != nil {
358+
span.RecordError(err)
359+
errs = append(errs, fmt.Errorf(
360+
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
361+
)
362+
}
314363
}
315-
a.batch = nil
316-
}
317-
var errs []error
318-
for _, ivl := range a.cfg.AggregationIntervals {
319-
// At any particular time there will be 1 harvest candidate for
320-
// each aggregation interval. We will align the end time and
321-
// process each of these.
322-
//
323-
// TODO (lahsivjar): It is possible to harvest the same
324-
// time multiple times, not an issue but can be optimized.
325-
to := a.processingTime.Truncate(ivl).Add(ivl)
326-
if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
327-
span.RecordError(err)
328-
errs = append(errs, fmt.Errorf(
329-
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
330-
)
364+
if len(errs) > 0 {
365+
return fmt.Errorf("failed while running final harvest: %w", errors.Join(errs...))
331366
}
332367
}
333-
if len(errs) > 0 {
334-
return fmt.Errorf("failed while running final harvest: %w", errors.Join(errs...))
368+
if err := a.batch.Close(); err != nil {
369+
// Failing to close batch is a non-fatal error as we are simply failing to return
370+
// the batch to the pool. This error should not be retried so it is ignored but
371+
// recorded for telemetry.
372+
span.RecordError(err)
373+
a.cfg.Logger.Warn("failed to close batch, this is non-fatal and doesn't lead to data loss")
335374
}
375+
// No need to retry final aggregation.
376+
a.batch = nil
377+
336378
if err := a.db.Close(); err != nil {
337379
span.RecordError(err)
338380
return fmt.Errorf("failed to close pebble: %w", err)
339381
}
340-
// All future operations are invalid after db is closed
382+
// All future operations are invalid after db is closed.
341383
a.db = nil
342384
}
343385
if err := a.metrics.CleanUp(); err != nil {
@@ -372,12 +414,6 @@ func (a *Aggregator) aggregate(
372414
cmk CombinedMetricsKey,
373415
cm *aggregationpb.CombinedMetrics,
374416
) (int, error) {
375-
if a.batch == nil {
376-
// Batch is backed by a sync pool. After each commit we will release the batch
377-
// back to the pool by calling Batch#Close and subsequently acquire a new batch.
378-
a.batch = a.db.NewBatch()
379-
}
380-
381417
op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
382418
if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
383419
return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
@@ -391,52 +427,19 @@ func (a *Aggregator) aggregate(
391427

392428
bytesIn := cm.SizeVT()
393429
if a.batch.Len() >= dbCommitThresholdBytes {
394-
if err := a.batch.Commit(a.writeOptions); err != nil {
430+
if err := commitAndReset(a.batch, a.writeOptions); err != nil {
395431
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
396432
}
397-
if err := a.batch.Close(); err != nil {
398-
return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
399-
}
400-
a.batch = nil
401433
}
402434
return bytesIn, nil
403435
}
404436

405-
func (a *Aggregator) commitAndHarvest(
406-
ctx context.Context,
407-
batch *pebble.Batch,
408-
to time.Time,
409-
cachedEventsStats map[time.Duration]map[[16]byte]float64,
410-
) error {
411-
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
412-
defer span.End()
413-
414-
var errs []error
415-
if batch != nil {
416-
if err := batch.Commit(a.writeOptions); err != nil {
417-
span.RecordError(err)
418-
errs = append(errs, fmt.Errorf("failed to commit batch before harvest: %w", err))
419-
}
420-
if err := batch.Close(); err != nil {
421-
span.RecordError(err)
422-
errs = append(errs, fmt.Errorf("failed to close batch before harvest: %w", err))
423-
}
424-
}
425-
if err := a.harvest(ctx, to, cachedEventsStats); err != nil {
426-
span.RecordError(err)
427-
errs = append(errs, fmt.Errorf("failed to harvest aggregated metrics: %w", err))
428-
}
429-
if len(errs) > 0 {
430-
return errors.Join(errs...)
431-
}
432-
return nil
433-
}
434-
435437
// harvest collects the mature metrics for all aggregation intervals and
436438
// deletes the entries in db once the metrics are fully harvested. Harvest
437439
// takes an end time denoting the exclusive upper bound for harvesting.
438440
func (a *Aggregator) harvest(
439441
ctx context.Context,
442+
batch *pebble.Batch,
440443
end time.Time,
441444
cachedEventsStats map[time.Duration]map[[16]byte]float64,
442445
) error {
@@ -449,7 +452,7 @@ func (a *Aggregator) harvest(
449452
if end.Truncate(ivl).Equal(end) {
450453
start := end.Add(-ivl).Add(-a.cfg.Lookback)
451454
cmCount, err := a.harvestForInterval(
452-
ctx, snap, start, end, ivl, cachedEventsStats[ivl],
455+
ctx, batch, snap, start, end, ivl, cachedEventsStats[ivl],
453456
)
454457
if err != nil {
455458
errs = append(errs, fmt.Errorf(
@@ -475,6 +478,7 @@ func (a *Aggregator) harvest(
475478
// combined metrics if some of the combined metrics failed harvest.
476479
func (a *Aggregator) harvestForInterval(
477480
ctx context.Context,
481+
batch *pebble.Batch,
478482
snap *pebble.Snapshot,
479483
start, end time.Time,
480484
ivl time.Duration,
@@ -499,7 +503,7 @@ func (a *Aggregator) harvestForInterval(
499503
KeyTypes: pebble.IterKeyTypePointsOnly,
500504
})
501505
if err != nil {
502-
return 0, fmt.Errorf("failed to create iter: %w", err)
506+
return 0, fmt.Errorf("failed to create pebble iterator: %w", err)
503507
}
504508
defer iter.Close()
505509

@@ -566,7 +570,12 @@ func (a *Aggregator) harvestForInterval(
566570
a.metrics.EventsProcessed.Add(context.Background(), harvestStats.eventsTotal, commonAttrsOpt, outcomeAttrOpt)
567571
cachedEventsStats[cmk.ID] -= harvestStats.eventsTotal
568572
}
569-
err = a.db.DeleteRange(lb, ub, a.writeOptions)
573+
if err := batch.DeleteRange(lb, ub, a.writeOptions); err != nil {
574+
errs = append(errs, fmt.Errorf("failed to delete harvested interval: %w", err))
575+
}
576+
if err := commitAndReset(batch, a.writeOptions); err != nil {
577+
errs = append(errs, fmt.Errorf("failed to commit batch: %w", err))
578+
}
570579
if len(errs) > 0 {
571580
err = errors.Join(err, fmt.Errorf(
572581
"failed to process %d out of %d metrics:\n%w",
@@ -777,3 +786,21 @@ func (hs *harvestStats) addOverflows(cm *aggregationpb.CombinedMetrics, limits L
777786
addOverflow(ksm.GetMetrics().GetOverflowGroups(), ksm)
778787
}
779788
}
789+
790+
// commitAndReset commits and resets a pebble batch. Note that the batch
791+
// is reset even if the commit fails dropping any data in the batch and
792+
// resetting it for reuse.
793+
func commitAndReset(b *pebble.Batch, opts *pebble.WriteOptions) error {
794+
defer b.Reset()
795+
if err := b.Commit(opts); err != nil {
796+
return fmt.Errorf("failed to commit batch: %w", err)
797+
}
798+
return nil
799+
}
800+
801+
func newBatch(db *pebble.DB) *pebble.Batch {
802+
return db.NewBatch(
803+
pebble.WithInitialSizeBytes(initialPebbleBatchSize),
804+
pebble.WithMaxRetainedSizeBytes(maxRetainedPebbleBatchSize),
805+
)
806+
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,5 @@ require (
7272
gopkg.in/yaml.v3 v3.0.1 // indirect
7373
howett.net/plist v1.0.0 // indirect
7474
)
75+
76+
replace github.com/cockroachdb/pebble => /Users/lahsivjar/Projects/pebble

0 commit comments

Comments
 (0)