Skip to content

Commit

Permalink
Add total samples stats for queries.
Browse files Browse the repository at this point in the history
Total Samples are only calculated from vector selector and matrix selector operators currently.
  • Loading branch information
sahnib committed Mar 27, 2023
1 parent 0d3a056 commit 9fbbbe2
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 44 deletions.
29 changes: 16 additions & 13 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta)
querySamples := stats.NewQuerySamples(opts.EnablePerStepStats)
exec, err := execution.New(lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, querySamples)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(q, opts, qs, ts)
Expand All @@ -247,12 +248,13 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
ts: ts,
t: InstantQuery,
resultSort: resultSort,
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
ts: ts,
t: InstantQuery,
resultSort: resultSort,
querySamples: querySamples,
}, nil
}

Expand Down Expand Up @@ -283,7 +285,8 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(lplan.Expr(), q, start, end, step, opts.LookbackDelta)
querySamples := stats.NewQuerySamples(opts.EnablePerStepStats)
exec, err := execution.New(lplan.Expr(), q, start, end, step, opts.LookbackDelta, querySamples)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(q, opts, qs, start, end, step)
Expand All @@ -302,6 +305,8 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
engine: e,
expr: expr,
t: RangeQuery,

querySamples: querySamples,
}, nil
}

Expand Down Expand Up @@ -422,6 +427,8 @@ type compatibilityQuery struct {
t QueryType
resultSort resultSorter

querySamples *stats.QuerySamples

cancel context.CancelFunc
}

Expand Down Expand Up @@ -581,11 +588,7 @@ func (q *compatibilityQuery) Statement() promparser.Statement { return nil }

// Stats always returns empty query stats for now to avoid panic.
func (q *compatibilityQuery) Stats() *stats.Statistics {
var enablePerStepStats bool
if q.opts != nil {
enablePerStepStats = q.opts.EnablePerStepStats
}
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: stats.NewQuerySamples(enablePerStepStats)}
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: q.querySamples}
}

func (q *compatibilityQuery) Close() { q.Cancel() }
Expand Down
55 changes: 30 additions & 25 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sort"
"time"

"github.com/prometheus/prometheus/util/stats"

"github.com/prometheus/prometheus/promql"

"github.com/thanos-community/promql-engine/execution/noop"
Expand Down Expand Up @@ -52,7 +54,8 @@ const stepsBatch = 10

// New creates new physical query execution for a given query expression which represents logical plan.
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
func New(expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta time.Duration) (model.VectorOperator, error) {
func New(expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta time.Duration,
querySamples *stats.QuerySamples) (model.VectorOperator, error) {
opts := &query.Options{
Start: mint,
End: maxt,
Expand All @@ -67,10 +70,10 @@ func New(expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, st
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
}
return newOperator(expr, selectorPool, opts, hints)
return newOperator(expr, selectorPool, opts, hints, querySamples)
}

func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, querySamples *stats.QuerySamples) (model.VectorOperator, error) {
switch e := expr.(type) {
case *parser.NumberLiteral:
return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, e.Val), nil
Expand All @@ -80,14 +83,14 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
hints.Start = start
hints.End = end
filter := storage.GetSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, hints)
return newShardedVectorSelector(filter, opts, e.Offset)
return newShardedVectorSelector(filter, opts, e.Offset, querySamples)

case *logicalplan.FilteredSelector:
start, end := getTimeRangesForVectorSelector(e.VectorSelector, opts, 0)
hints.Start = start
hints.End = end
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
return newShardedVectorSelector(selector, opts, e.Offset)
return newShardedVectorSelector(selector, opts, e.Offset, querySamples)

case *parser.Call:
hints.Func = e.Func.Name
Expand All @@ -97,7 +100,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
if e.Func.Name == "histogram_quantile" {
nextOperators := make([]model.VectorOperator, len(e.Args))
for i := range e.Args {
next, err := newOperator(e.Args[i], storage, opts, hints)
next, err := newOperator(e.Args[i], storage, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,7 +145,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
operators := make([]model.VectorOperator, 0, numShards)
for i := 0; i < numShards; i++ {
operator := exchange.NewConcurrent(
scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, call, e, opts, t.Range, vs.Offset, i, numShards),
scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, call, e, opts, t.Range, vs.Offset, i, numShards, querySamples),
2,
)
operators = append(operators, operator)
Expand All @@ -155,7 +158,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
// Does not have matrix arg so create functionOperator normally.
nextOperators := make([]model.VectorOperator, len(e.Args))
for i := range e.Args {
next, err := newOperator(e.Args[i], storage, opts, hints)
next, err := newOperator(e.Args[i], storage, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand All @@ -170,13 +173,13 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
hints.By = !e.Without
var paramOp model.VectorOperator

next, err := newOperator(e.Expr, storage, opts, hints)
next, err := newOperator(e.Expr, storage, opts, hints, querySamples)
if err != nil {
return nil, err
}

if e.Param != nil {
paramOp, err = newOperator(e.Param, storage, opts, hints)
paramOp, err = newOperator(e.Param, storage, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand All @@ -196,20 +199,20 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O

case *parser.BinaryExpr:
if e.LHS.Type() == parser.ValueTypeScalar || e.RHS.Type() == parser.ValueTypeScalar {
return newScalarBinaryOperator(e, storage, opts, hints)
return newScalarBinaryOperator(e, storage, opts, hints, querySamples)
}

return newVectorBinaryOperator(e, storage, opts, hints)
return newVectorBinaryOperator(e, storage, opts, hints, querySamples)

case *parser.ParenExpr:
return newOperator(e.Expr, storage, opts, hints)
return newOperator(e.Expr, storage, opts, hints, querySamples)

case *parser.StringLiteral:
// TODO(saswatamcode): This requires separate model with strings.
return nil, errors.Wrapf(parse.ErrNotImplemented, "got: %s", e)

case *parser.UnaryExpr:
next, err := newOperator(e.Expr, storage, opts, hints)
next, err := newOperator(e.Expr, storage, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand All @@ -229,7 +232,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
case *parser.NumberLiteral:
return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, t.Val), nil
}
next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints)
next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, querySamples)
if err != nil {
return nil, err
}
Expand All @@ -246,7 +249,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O

operators := make([]model.VectorOperator, len(e.Expressions))
for i, expr := range e.Expressions {
operator, err := newOperator(expr, storage, opts, hints)
operator, err := newOperator(expr, storage, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand All @@ -268,7 +271,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
selectorOpts := *opts
selectorOpts.LookbackDelta = 0
remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), &selectorOpts)
remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), &selectorOpts, querySamples)
return exchange.NewConcurrent(remoteExec, 2), nil
case logicalplan.Noop:
return noop.NewOperator(), nil
Expand All @@ -288,7 +291,7 @@ func unpackVectorSelector(t *parser.MatrixSelector) (*parser.VectorSelector, []*
}
}

func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration) (model.VectorOperator, error) {
func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, querySamples *stats.QuerySamples) (model.VectorOperator, error) {
numShards := runtime.GOMAXPROCS(0) / 2
if numShards < 1 {
numShards = 1
Expand All @@ -297,31 +300,33 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti
for i := 0; i < numShards; i++ {
operator := exchange.NewConcurrent(
scan.NewVectorSelector(
model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards), 2)
model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards, querySamples), 2)
operators = append(operators, operator)
}

return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...), nil
}

func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints)
func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options,
hints storage.SelectHints, querySamples *stats.QuerySamples) (model.VectorOperator, error) {
leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, querySamples)
if err != nil {
return nil, err
}
rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints)
rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, querySamples)
if err != nil {
return nil, err
}
return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool)
}

func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
lhs, err := newOperator(e.LHS, selectorPool, opts, hints)
func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options,
hints storage.SelectHints, querySamples *stats.QuerySamples) (model.VectorOperator, error) {
lhs, err := newOperator(e.LHS, selectorPool, opts, hints, querySamples)
if err != nil {
return nil, err
}
rhs, err := newOperator(e.RHS, selectorPool, opts, hints)
rhs, err := newOperator(e.RHS, selectorPool, opts, hints, querySamples)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"sync"

"github.com/prometheus/prometheus/util/stats"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

Expand All @@ -24,13 +26,13 @@ type Execution struct {
vectorSelector model.VectorOperator
}

func NewExecution(query promql.Query, pool *model.VectorPool, opts *query.Options) *Execution {
func NewExecution(query promql.Query, pool *model.VectorPool, opts *query.Options, querySamples *stats.QuerySamples) *Execution {
storage := newStorageFromQuery(query, opts)
return &Execution{
storage: storage,
query: query,
opts: opts,
vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1),
vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1, querySamples),
}
}

Expand Down
Loading

0 comments on commit 9fbbbe2

Please sign in to comment.