Skip to content

Commit

Permalink
fix: prevent the panics when we try to retry with a different number …
Browse files Browse the repository at this point in the history
…of shards (#1547)

* prevent the panics when we try to retry with a different number of shards

Signed-off-by: Kemal Akkoyun <[email protected]>

* Simplify

Signed-off-by: Kemal Akkoyun <[email protected]>

---------

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun authored Apr 12, 2023
1 parent a261ffa commit 55183f0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 21 deletions.
50 changes: 37 additions & 13 deletions pkg/cache/stats_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cache

import (
"errors"
"fmt"
"time"

Expand All @@ -37,13 +38,13 @@ const (
// StatsCounter is an interface for recording cache stats for goburrow.Cache.
// The StatsCounter can be found at
// - https://github.com/goburrow/cache/blob/f6da914dd6e3546dffa8802919dbca80cd33abe3/stats.go#L67
var _ burrow.StatsCounter = (*burrowStatsCounter)(nil)
var _ burrow.StatsCounter = (*BurrowStatsCounter)(nil)

// burrowStatsCounter is a StatsCounter implementation for burrow cache.
// BurrowStatsCounter is a StatsCounter implementation for burrow cache.
// It is a wrapper around prometheus metrics.
// It has been intended to passed through the cache using cache.WithStatsCounter option
// - https://github.com/goburrow/cache/blob/f6da914dd6e3546dffa8802919dbca80cd33abe3/local.go#L552
type burrowStatsCounter struct {
type BurrowStatsCounter struct {
logger log.Logger
reg prometheus.Registerer

Expand All @@ -56,12 +57,12 @@ type burrowStatsCounter struct {
}

// Option add options for default Cache.
type Option func(c *burrowStatsCounter)
type Option func(c *BurrowStatsCounter)

// WithTrackLoadingCacheStats enables tracking of loading cache stats.
// It is disabled by default.
func WithTrackLoadingCacheStats() Option {
return func(c *burrowStatsCounter) {
return func(c *BurrowStatsCounter) {
c.trackLoadingCacheStats = true
c.load = promauto.With(c.reg).NewCounterVec(prometheus.CounterOpts{
Name: "cache_load_total",
Expand All @@ -79,9 +80,9 @@ func WithTrackLoadingCacheStats() Option {
//
// RecordLoadSuccess and RecordLoadError methods are called by Get methods on a successful and failed load respectively.
// Get method only called by LoadingCache implementation.
func NewBurrowStatsCounter(logger log.Logger, reg prometheus.Registerer, name string, options ...Option) *burrowStatsCounter {
func NewBurrowStatsCounter(logger log.Logger, reg prometheus.Registerer, name string, options ...Option) *BurrowStatsCounter {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"cache": name}, reg)
s := &burrowStatsCounter{
s := &BurrowStatsCounter{
logger: logger,
reg: reg,

Expand All @@ -100,27 +101,50 @@ func NewBurrowStatsCounter(logger log.Logger, reg prometheus.Registerer, name st
return s
}

// Unregister removes all metrics from the registry.
func (c *BurrowStatsCounter) Unregister() error {
var err error
if ok := c.reg.Unregister(c.requests); !ok {
err = errors.Join(err, fmt.Errorf("unregistering requests counter: %w", err))
}
if ok := c.reg.Unregister(c.eviction); !ok {
err = errors.Join(err, fmt.Errorf("unregistering eviction counter: %w", err))
}
if c.trackLoadingCacheStats {
if ok := c.reg.Unregister(c.load); !ok {
err = errors.Join(err, fmt.Errorf("unregistering load counter: %w", err))
}
if ok := c.reg.Unregister(c.loadTotalTime); !ok {
err = errors.Join(err, fmt.Errorf("unregistering load total time histogram: %w", err))
}
}
if err != nil {
return fmt.Errorf("cleaning cache stats counter: %w", err)
}
return nil
}

// RecordHits records the number of hits.
// It is part of the burrow.StatsCounter interface.
//
// This method is called by Get and GetIfPresent methods on a cache hit.
func (c *burrowStatsCounter) RecordHits(hits uint64) {
func (c *BurrowStatsCounter) RecordHits(hits uint64) {
c.requests.WithLabelValues(lvHit).Add(float64(hits))
}

// RecordMisses records the number of misses.
// It is part of the burrow.StatsCounter interface.
//
// This method is called by Get and GetIfPresent methods method on a cache miss.
func (c *burrowStatsCounter) RecordMisses(misses uint64) {
func (c *BurrowStatsCounter) RecordMisses(misses uint64) {
c.requests.WithLabelValues(lvMiss).Add(float64(misses))
}

// RecordLoadSuccess records the number of successful loads.
// It is part of the burrow.StatsCounter interface.
//
// This method is called by Get methods on a successful load.
func (c *burrowStatsCounter) RecordLoadSuccess(loadTime time.Duration) {
func (c *BurrowStatsCounter) RecordLoadSuccess(loadTime time.Duration) {
if !c.trackLoadingCacheStats {
return
}
Expand All @@ -132,7 +156,7 @@ func (c *burrowStatsCounter) RecordLoadSuccess(loadTime time.Duration) {
// It is part of the burrow.StatsCounter interface.
//
// This method is called by Get methods on a failed load.
func (c *burrowStatsCounter) RecordLoadError(loadTime time.Duration) {
func (c *BurrowStatsCounter) RecordLoadError(loadTime time.Duration) {
if !c.trackLoadingCacheStats {
return
}
Expand All @@ -142,7 +166,7 @@ func (c *burrowStatsCounter) RecordLoadError(loadTime time.Duration) {

// RecordEviction records the number of evictions.
// It is part of the burrow.StatsCounter interface.
func (c *burrowStatsCounter) RecordEviction() {
func (c *BurrowStatsCounter) RecordEviction() {
c.eviction.Inc()
}

Expand All @@ -152,7 +176,7 @@ func (c *burrowStatsCounter) RecordEviction() {
// This method is called only by Stats method. And it is just for debugging purpose.
// Snapshot function is called manually and we don't plan to use it.
// For completeness, we implemented it.
func (c *burrowStatsCounter) Snapshot(s *burrow.Stats) {
func (c *BurrowStatsCounter) Snapshot(s *burrow.Stats) {
var err error
s.HitCount, err = currentCounterVecValue(c.requests, lvHit)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/profiler/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func loadBpfProgram(logger log.Logger, reg prometheus.Registerer, debugEnabled,
// There's not enough free memory for these many unwind shards, let's retry with half
// as many.
if errors.Is(err, syscall.ENOMEM) {
if err := bpfMaps.close(); err != nil { // Only required when we want to retry.
return nil, nil, fmt.Errorf("failed to cleanup previously created bpfmaps: %w", err)
}
unwindShards /= 2
} else {
break
Expand Down
46 changes: 38 additions & 8 deletions pkg/profiler/cpu/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ type bpfMaps struct {
programs *bpf.BPFMap

// Unwind stuff 🔬
processCache burrow.Cache
processCache *processCache
mappingInfoMemory profiler.EfficientBuffer

buildIDMapping map[string]uint64
Expand Down Expand Up @@ -225,6 +225,34 @@ func min[T constraints.Ordered](a, b T) T {
return b
}

type processCache struct {
burrow.Cache
statsCounter *cache.BurrowStatsCounter
}

func newProcessCache(logger log.Logger, reg prometheus.Registerer) *processCache {
statsCounter := cache.NewBurrowStatsCounter(logger, reg, "cpu_map")
return &processCache{
Cache: burrow.New(
burrow.WithMaximumSize(maxCachedProcesses),
burrow.WithStatsCounter(statsCounter),
),
statsCounter: statsCounter,
}
}

// close closes the cache and makes sure the stats counter is unregistered.
func (c *processCache) close() error {
// Unregister the stats counter before closing the cache,
// in case the cache could be initialized again.
err := c.statsCounter.Unregister()
// Close the cache.
if err := c.Cache.Close(); err != nil {
return errors.Join(err, fmt.Errorf("failed to close process cache: %w", err))
}
return err
}

func initializeMaps(logger log.Logger, reg prometheus.Registerer, m *bpf.Module, byteOrder binary.ByteOrder) (*bpfMaps, error) {
if m == nil {
return nil, fmt.Errorf("nil module")
Expand All @@ -234,13 +262,10 @@ func initializeMaps(logger log.Logger, reg prometheus.Registerer, m *bpf.Module,
unwindInfoMemory := make([]byte, maxUnwindTableSize*compactUnwindRowSizeBytes)

maps := &bpfMaps{
logger: log.With(logger, "component", "maps"),
module: m,
byteOrder: byteOrder,
processCache: burrow.New(
burrow.WithMaximumSize(maxCachedProcesses),
burrow.WithStatsCounter(cache.NewBurrowStatsCounter(logger, reg, "cpu_map")),
),
logger: log.With(logger, "component", "maps"),
module: m,
byteOrder: byteOrder,
processCache: newProcessCache(logger, reg),
mappingInfoMemory: mappingInfoMemory,
unwindInfoMemory: unwindInfoMemory,
buildIDMapping: make(map[string]uint64),
Expand All @@ -253,6 +278,11 @@ func initializeMaps(logger log.Logger, reg prometheus.Registerer, m *bpf.Module,
return maps, nil
}

// close closes all the resources associated with the maps.
func (m *bpfMaps) close() error {
return m.processCache.close()
}

// adjustMapSizes updates the amount of unwind shards.
//
// Note: It must be called before `BPFLoadObject()`.
Expand Down

0 comments on commit 55183f0

Please sign in to comment.