Skip to content

Commit

Permalink
plan,execution: late coalesce
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaHoffmann committed Dec 22, 2023
1 parent 7e2f0a1 commit ccd2596
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 3 deletions.
21 changes: 21 additions & 0 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
return newRemoteExecution(e, opts, hints)
case logicalplan.Noop:
return noop.NewOperator(), nil
case logicalplan.Coalesce:
return newCoalesce(e, storage, opts, hints)
case logicalplan.UserDefinedExpr:
return e.MakeExecutionOperator(model.NewVectorPool(opts.StepsBatch), storage, opts, hints)
default:
Expand Down Expand Up @@ -107,6 +109,13 @@ func newVectorSelector(expr parser.Expr, storage *engstore.SelectorPool, opts *q
offset = e.Offset
batchsize = e.BatchSize
selector = storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)

if e.Shards > 0 {
return exchange.NewConcurrent(
scan.NewVectorSelector(
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, e.N, e.Shards),
2), nil
}
default:
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
}
Expand Down Expand Up @@ -465,6 +474,18 @@ func newRemoteExecution(e logicalplan.RemoteExecution, opts *query.Options, hint
return exchange.NewConcurrent(remoteExec, 2), nil
}

func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
operators := make([]model.VectorOperator, len(e.Shards))
for i, expr := range e.Shards {
operator, err := newOperator(expr, storage, opts, hints)
if err != nil {
return nil, err
}
operators[i] = operator
}
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
}

// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791.
func getTimeRangesForVectorSelector(n *parser.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) {
start := opts.Start.UnixMilli()
Expand Down
4 changes: 3 additions & 1 deletion logicalplan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"

"github.com/prometheus/prometheus/model/labels"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
)
Expand All @@ -19,6 +18,9 @@ type VectorSelector struct {
*parser.VectorSelector
Filters []*labels.Matcher
BatchSize int64

N int
Shards int
}

func (f VectorSelector) String() string {
Expand Down
4 changes: 2 additions & 2 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/annotations"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/query"
)
Expand All @@ -24,6 +23,7 @@ var (
)

var DefaultOptimizers = []Optimizer{
ShardedAggregations{Shards: 8},
SortMatchers{},
MergeSelectsOptimizer{},
}
Expand Down
92 changes: 92 additions & 0 deletions logicalplan/sharded_aggregations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"fmt"
"strings"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/query"
)

type Coalesce struct {
Shards []parser.Expr
}

func (r Coalesce) String() string {
parts := make([]string, len(r.Shards))
for i, r := range r.Shards {
parts[i] = r.String()
}
return fmt.Sprintf("coalesce(%s)", strings.Join(parts, ", "))
}

func (r Coalesce) Pretty(level int) string { return r.String() }

func (r Coalesce) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r Coalesce) Type() parser.ValueType { return r.Shards[0].Type() }

func (r Coalesce) PromQLExpr() {}

type ShardedAggregations struct{ Shards int }

func (m ShardedAggregations) Optimize(plan parser.Expr, _ *query.Options) (parser.Expr, annotations.Annotations) {
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
if parent == nil {
return false
}
aggr, ok := (*parent).(*parser.AggregateExpr)
if !ok {
return false
}
// TODO: only care about sum now
if aggr.Op != parser.SUM {
return false
}
call, ok := (*current).(*parser.Call)
if !ok {
return false
}
if len(call.Args) != 1 {
return false
}
vs, ok := call.Args[0].(*parser.VectorSelector)
if !ok {
return false
}

coalesce := Coalesce{make([]parser.Expr, m.Shards)}
for i := range coalesce.Shards {
coalesce.Shards[i] = &parser.Call{
Func: call.Func,
PosRange: call.PosRange,
Args: []parser.Expr{vectorSelectorForShard(vs, i, m.Shards)},
}
}

*parent = &parser.AggregateExpr{
Op: aggr.Op,
Expr: coalesce,
Param: aggr.Param,
Grouping: aggr.Grouping,
Without: aggr.Without,
PosRange: aggr.PosRange,
}
return true
})
return plan, nil
}

func vectorSelectorForShard(expr *parser.VectorSelector, n, shards int) parser.Expr {
return &VectorSelector{
VectorSelector: expr,
N: n,
Shards: shards,
}
}
39 changes: 39 additions & 0 deletions logicalplan/sharded_aggregations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/query"
)

func TestShardedAggregations(t *testing.T) {
cases := []struct {
name string
expr string
expected string
}{
{
name: "sum exp",
expr: `sum(exp(X))`,
expected: ``,
},
}

optimizers := []Optimizer{ShardedAggregations{Shards: 2}}
for _, tcase := range cases {
t.Run(tcase.expr, func(t *testing.T) {
expr, err := parser.ParseExpr(tcase.expr)
testutil.Ok(t, err)

plan := New(expr, &query.Options{})
optimizedPlan, _ := plan.Optimize(optimizers)
testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String())
})
}
}

0 comments on commit ccd2596

Please sign in to comment.