Skip to content

Commit

Permalink
storage/prometheus: close querier at the end
Browse files Browse the repository at this point in the history
We are trying to reuse memory and hitting a bug where the querier is
closed too soon. Prometheus closes the querier at the end of Exec(). We
should do that too.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Sep 19, 2024
1 parent de6d9f8 commit bf53324
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 45 deletions.
2 changes: 1 addition & 1 deletion engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestDistributedEngineWarnings(t *testing.T) {
end = time.UnixMilli(600)
step = 30 * time.Second
)
q, err := ng.NewRangeQuery(context.Background(), nil, nil, "test", start, end, step)
q, err := ng.NewRangeQuery(context.Background(), querier, nil, "test", start, end, step)
testutil.Ok(t, err)

res := q.Exec(context.Background())
Expand Down
67 changes: 49 additions & 18 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,14 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
}
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.logicalOptimizers)

scanners, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
Expand All @@ -292,6 +297,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
warns: warns,
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
}, nil
}

Expand Down Expand Up @@ -320,7 +326,13 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)

scnrs, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts)
Expand All @@ -339,6 +351,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
t: InstantQuery,
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
}, nil
}

Expand Down Expand Up @@ -375,7 +388,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
scnrs, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
Expand All @@ -386,11 +404,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
}, nil
}

Expand All @@ -416,9 +435,14 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.logicalOptimizers)

scnrs, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step)
Expand All @@ -428,11 +452,12 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
return nil, err
}
return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
}, nil
}

Expand All @@ -452,11 +477,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
return qOpts
}

func (e *Engine) storageScanners(queryable storage.Queryable) engstorage.Scanners {
func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
if e.scanners == nil {
return promstorage.NewPrometheusScanners(queryable)
return promstorage.NewPrometheusScanners(queryable, qOpts, lplan)
}
return e.scanners
return e.scanners, nil
}

func (e *Engine) triggerFallback(err error) bool {
Expand Down Expand Up @@ -495,6 +520,8 @@ type compatibilityQuery struct {
t QueryType
resultSort resultSorter
cancel context.CancelFunc

scanners engstorage.Scanners
}

func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
Expand Down Expand Up @@ -671,7 +698,11 @@ func (q *compatibilityQuery) Stats() *stats.Statistics {
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: samples}
}

func (q *compatibilityQuery) Close() { q.Cancel() }
func (q *compatibilityQuery) Close() {
if err := q.scanners.Close(); err != nil {
level.Warn(q.engine.logger).Log("msg", "error closing storage scanners, some memory might have leaked", "err", err)
}
}

func (q *compatibilityQuery) String() string { return q.plan.Root().String() }

Expand Down
88 changes: 81 additions & 7 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,71 @@ func TestVectorSelectorWithGaps(t *testing.T) {
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1))
}

type queryableCloseChecker struct {
closed bool

storage.Queryable
}

func (q *queryableCloseChecker) Querier(mint, maxt int64) (storage.Querier, error) {
qr, err := q.Queryable.Querier(mint, maxt)
if err != nil {
return nil, err
}
return &querierCloseChecker{Querier: qr, closed: &q.closed}, nil
}

type querierCloseChecker struct {
storage.Querier

closed *bool
}

func (q *querierCloseChecker) Close() error {
*q.closed = true
return q.Querier.Close()
}

// TestQuerierClosedAfterQueryClosed tests that the querier is only closed
// after the query is closed.
func TestQuerierClosedAfterQueryClosed(t *testing.T) {
t.Parallel()
opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
}

load := `load 30s
http_requests_total{pod="nginx-1", route="/"} 41.00+0.20x40
http_requests_total{pod="nginx-2", route="/"} 51+21.71x40`

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()

optimizers := logicalplan.AllOptimizers
newEngine := engine.New(engine.Opts{
EngineOpts: opts,
DisableFallback: true,
LogicalOptimizers: optimizers,
// Set to 1 to make sure batching is tested.
SelectorBatchSize: 1,
})
ctx := context.Background()
qr := &queryableCloseChecker{
Queryable: storage,
}
q1, err := newEngine.NewInstantQuery(ctx, qr, nil, "sum(http_requests_total)", time.Unix(0, 0))
testutil.Ok(t, err)
_ = q1.Exec(ctx)

require.Equal(t, false, qr.closed)
q1.Close()

require.Equal(t, true, qr.closed)
}

func TestQueriesAgainstOldEngine(t *testing.T) {
t.Parallel()
start := time.Unix(0, 0)
Expand Down Expand Up @@ -2132,15 +2197,21 @@ type scannersWithWarns struct {
promScanners *prometheus.Scanners
}

func newScannersWithWarns(warn error) *scannersWithWarns {
return &scannersWithWarns{
warn: warn,
promScanners: prometheus.NewPrometheusScanners(&storage.MockQueryable{
MockQuerier: storage.NoopQuerier(),
}),
func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) {
scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{
MockQuerier: storage.NoopQuerier(),
}, qOpts, lplan)
if err != nil {
return nil, err
}
return &scannersWithWarns{
warn: warn,
promScanners: scanners,
}, nil
}

func (s *scannersWithWarns) Close() error { return nil }

func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
warnings.AddToContext(s.warn, ctx)
return s.promScanners.NewVectorSelector(ctx, opts, hints, selector)
Expand All @@ -2156,7 +2227,10 @@ func TestWarningsPlanCreation(t *testing.T) {
opts = engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}}
expectedWarn = errors.New("test warning")
)
newEngine := engine.NewWithScanners(opts, newScannersWithWarns(expectedWarn))

scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{}, nil)
testutil.Ok(t, err)
newEngine := engine.NewWithScanners(opts, scnrs)
q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second)
testutil.Ok(t, err)

Expand Down
8 changes: 8 additions & 0 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ func Traverse(expr *Node, transform func(*Node)) {
}
}

func TraverseWithParents(parents []*Node, current *Node, transform func(parents []*Node, node *Node)) {
children := (*current).Children()
transform(parents, current)
for _, c := range children {
TraverseWithParents(append(parents, current), c, transform)
}
}

func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node, node *Node) bool) bool {
var stop bool
for _, c := range (*current).Children() {
Expand Down
1 change: 1 addition & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type Scanners interface {
Close() error
NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error)
NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error)
}
10 changes: 5 additions & 5 deletions storage/prometheus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ var sep = []byte{'\xff'}
type SelectorPool struct {
selectors map[uint64]*seriesSelector

queryable storage.Queryable
querier storage.Querier
}

func NewSelectorPool(queryable storage.Queryable) *SelectorPool {
func NewSelectorPool(querier storage.Querier) *SelectorPool {
return &SelectorPool{
selectors: make(map[uint64]*seriesSelector),
queryable: queryable,
querier: querier,
}
}

func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newSeriesSelector(p.querier, matchers, hints)
}
return p.selectors[key]
}

func (p *SelectorPool) GetFilteredSelector(mint, maxt, step int64, matchers, filters []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newSeriesSelector(p.querier, matchers, hints)
}

return NewFilteredSelector(p.selectors[key], NewFilter(filters))
Expand Down
Loading

0 comments on commit bf53324

Please sign in to comment.