Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan,execution: late coalesce experiments #372

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func BenchmarkRangeQuery(b *testing.B) {
query string
storage *teststorage.TestStorage
}{
{
name: "experiment",
query: "sum(http_requests_total)",
storage: sixHourDataset,
},
{
name: "vector selector",
query: "http_requests_total",
Expand Down Expand Up @@ -298,7 +303,6 @@ func BenchmarkRangeQuery(b *testing.B) {
EnableAtModifier: true,
EnableNegativeOffset: true,
},
SelectorBatchSize: 256,
}

for _, tc := range cases {
Expand Down
10 changes: 10 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,16 @@ func TestInstantQuery(t *testing.T) {
query string
queryTime time.Time
}{
{
name: "experiment",
load: `load 30s
http_requests_total{pod="nginx-0", route="/"} 1+1x30
http_requests_total{pod="nginx-1", route="/"} 2+1x30
http_requests_total{pod="nginx-2", route="/"} 3+1x30
http_requests_total{pod="nginx-3", route="/"} 4+1x30`,
query: `sum(exp(http_requests_total))`,
queryTime: time.Unix(300, 0),
},
{
name: "offset and @ modifiers",
load: `load 30s
Expand Down
21 changes: 21 additions & 0 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
return newRemoteExecution(e, opts, hints)
case logicalplan.Noop:
return noop.NewOperator(), nil
case logicalplan.Coalesce:
return newCoalesce(e, storage, opts, hints)
case logicalplan.UserDefinedExpr:
return e.MakeExecutionOperator(model.NewVectorPool(opts.StepsBatch), storage, opts, hints)
default:
Expand Down Expand Up @@ -107,6 +109,13 @@ func newVectorSelector(expr parser.Expr, storage *engstore.SelectorPool, opts *q
offset = e.Offset
batchsize = e.BatchSize
selector = storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)

if e.Shards > 0 {
return exchange.NewConcurrent(
scan.NewVectorSelector(
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, e.N, e.Shards),
2), nil
}
default:
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
}
Expand Down Expand Up @@ -465,6 +474,18 @@ func newRemoteExecution(e logicalplan.RemoteExecution, opts *query.Options, hint
return exchange.NewConcurrent(remoteExec, 2), nil
}

func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
operators := make([]model.VectorOperator, len(e.Shards))
for i, expr := range e.Shards {
operator, err := newOperator(expr, storage, opts, hints)
if err != nil {
return nil, err
}
operators[i] = operator
}
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
}

// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791.
func getTimeRangesForVectorSelector(n *parser.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) {
start := opts.Start.UnixMilli()
Expand Down
33 changes: 27 additions & 6 deletions logicalplan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package logicalplan

import (
"fmt"
"strings"

"github.com/prometheus/prometheus/model/labels"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
)
Expand All @@ -17,18 +17,39 @@ import (
// This should help us avoid dealing with both types in the entire codebase.
type VectorSelector struct {
*parser.VectorSelector

Filters []*labels.Matcher
BatchSize int64

Shards int
N int
}

func (f VectorSelector) String() string {
if f.BatchSize != 0 && len(f.Filters) != 0 {
return fmt.Sprintf("filter(%s, %s[batch=%d])", f.Filters, f.VectorSelector.String(), f.BatchSize)
var b strings.Builder
var needComma bool
b.WriteString(f.VectorSelector.String())
b.WriteRune('[')
if len(f.Filters) > 0 {
b.WriteString(fmt.Sprintf("filters=%s", f.Filters))
needComma = true
}
if f.BatchSize != 0 {
return fmt.Sprintf("%s[batch=%d]", f.VectorSelector.String(), f.BatchSize)
if f.BatchSize > 0 {
if needComma {
b.WriteRune(',')
}
b.WriteString(fmt.Sprintf("batch=%d", f.BatchSize))
needComma = true
}
return fmt.Sprintf("filter(%s, %s)", f.Filters, f.VectorSelector.String())
if f.Shards > 0 {
if needComma {
b.WriteRune(',')
}
b.WriteString(fmt.Sprintf("shard=%d/%d", f.N, f.Shards))
}
b.WriteRune(']')

return b.String()
}

func (f VectorSelector) Pretty(level int) string { return f.String() }
Expand Down
3 changes: 1 addition & 2 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/annotations"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/query"
)
Expand Down
88 changes: 88 additions & 0 deletions logicalplan/sharded_aggregations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"fmt"
"strings"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/query"
)

type Coalesce struct {
Shards []parser.Expr
}

func (r Coalesce) String() string {
parts := make([]string, len(r.Shards))
for i, r := range r.Shards {
parts[i] = r.String()
}
return fmt.Sprintf("coalesce(%s)", strings.Join(parts, ", "))
}

func (r Coalesce) Pretty(level int) string { return r.String() }

func (r Coalesce) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r Coalesce) Type() parser.ValueType { return r.Shards[0].Type() }

func (r Coalesce) PromQLExpr() {}

type ShardedAggregations struct{ Shards int }

func (m ShardedAggregations) Optimize(plan parser.Expr, _ *query.Options) (parser.Expr, annotations.Annotations) {
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
if parent == nil {
return false
}
aggr, ok := (*parent).(*parser.AggregateExpr)
if !ok {
return false
}
// TODO: only care about sum now
if aggr.Op == parser.COUNT {
return false
}
vs, ok := (*current).(*parser.VectorSelector)
if !ok {
return false
}

coalesce := Coalesce{make([]parser.Expr, m.Shards)}
for i := range coalesce.Shards {
coalesce.Shards[i] = &parser.AggregateExpr{
Op: aggr.Op,
Expr: vectorSelectorForShard(vs, i, m.Shards),
Param: aggr.Param,
Grouping: aggr.Grouping,
Without: aggr.Without,
PosRange: aggr.PosRange,
}
}

*parent = &parser.AggregateExpr{
Op: aggr.Op,
Expr: coalesce,
Param: aggr.Param,
Grouping: aggr.Grouping,
Without: aggr.Without,
PosRange: aggr.PosRange,
}
return true
})
return plan, nil
}

func vectorSelectorForShard(expr *parser.VectorSelector, n, shards int) parser.Expr {
return &VectorSelector{
VectorSelector: expr,
N: n,
Shards: shards,
}
}
39 changes: 39 additions & 0 deletions logicalplan/sharded_aggregations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/query"
)

func TestShardedAggregations(t *testing.T) {
cases := []struct {
name string
expr string
expected string
}{
{
name: "sum",
expr: `topk(10, X)`,
expected: `topk(10, coalesce(topk(10, X[shard=0/2]), topk(10, X[shard=1/2])))`,
},
}

optimizers := []Optimizer{ShardedAggregations{Shards: 2}}
for _, tcase := range cases {
t.Run(tcase.expr, func(t *testing.T) {
expr, err := parser.ParseExpr(tcase.expr)
testutil.Ok(t, err)

plan := New(expr, &query.Options{})
optimizedPlan, _ := plan.Optimize(optimizers)
testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String())
})
}
}
Loading