Skip to content

Commit

Permalink
engine: fix range query duplicate label handling
Browse files Browse the repository at this point in the history
Prometheus seems to care about duplicate samples for label only during
the same timestamp. Since our operators might return samples for one
result in two different series we need to merge them to one result
again.

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Jan 14, 2024
1 parent 4c3bd49 commit 515a5a8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
29 changes: 24 additions & 5 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
)
Expand Down Expand Up @@ -404,22 +403,42 @@ loop:
}

// For range Query we expect always a Matrix value type.
// Note: We have to zip together series that have the same label but
// appear at different timestamps.
// The engine already guarantees that we dont have a label conflict
// in the same timestamp and prometheus accepts series with the same
// labels that are populated at different timestamps just fine.
if q.t == RangeQuery {
matrix := make(promql.Matrix, 0, len(series))
for _, s := range series {
seenAt := make(map[uint64]int, len(series))
for i, s := range series {
if len(s.Floats)+len(s.Histograms) == 0 {
continue
}
matrix = append(matrix, s)
h := s.Metric.Hash()
if j, ok := seenAt[h]; ok {
matrix[j].Floats = append(matrix[j].Floats, s.Floats...)
matrix[j].Histograms = append(matrix[j].Histograms, s.Histograms...)
} else {
matrix = append(matrix, s)
seenAt[h] = i
}
}
sort.Sort(matrix)
if matrix.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
if len(seenAt) < len(series) {
// If we had collisions, we need to resort by timestamp, since we might have added
// samples from the later collision pair first.
for _, s := range matrix {
sort.Slice(s.Floats, func(i, j int) bool { return s.Floats[i].T < s.Floats[j].T })
sort.Slice(s.Histograms, func(i, j int) bool { return s.Histograms[i].T < s.Histograms[j].T })
}
}
ret.Value = matrix
return ret
}

// We dont need to zip together results for instant queries since
// they only have results for one timestamp.
var result parser.Value
switch q.expr.Type() {
case parser.ValueTypeMatrix:
Expand Down
7 changes: 7 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
(-group({__name__="http_requests_total"} @ 54.013) or {__name__="http_requests_total"} offset 1m32s)
)`,
},
{
name: "duplicate label - multiple top level series get merged ",
load: `load 1m
A 1 2 _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _ 1 2`,
query: `exp({__name__=~"(A|B)"})`,
},
{
name: "timestamp fuzz 1",
load: `load 30s
Expand Down

0 comments on commit 515a5a8

Please sign in to comment.