Skip to content

Commit

Permalink
feat: add header to skip split by interval
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Dec 3, 2024
1 parent 2fb07a3 commit 573b65e
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
toMerge := []middleware.Interface{
httpreq.ExtractQueryMetricsMiddleware(),
httpreq.ExtractQueryTagsMiddleware(),
httpreq.ExtractQueryNoSplitMiddleware(),
httpreq.PropagateHeadersMiddleware(httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
Expand Down Expand Up @@ -1128,6 +1129,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {

toMerge := []middleware.Interface{
httpreq.ExtractQueryTagsMiddleware(),
httpreq.ExtractQueryNoSplitMiddleware(),
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader),
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
Expand All @@ -1148,6 +1150,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) {
httpMiddleware := middleware.Merge(
httpreq.ExtractQueryTagsMiddleware(),
httpreq.ExtractQueryNoSplitMiddleware(),
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
)
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/math"

"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -182,6 +183,11 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}

noSplit := httpreq.ExtractQueryNoSplitFromContext(ctx)
if noSplit {
return h.next.Do(ctx, r)
}

var interval time.Duration
switch r.(type) {
case *LokiSeriesRequest, *LabelRequest:
Expand Down
70 changes: 69 additions & 1 deletion pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/httpreq"
)

var nilMetrics = NewSplitByMetrics(nil)
Expand Down Expand Up @@ -1408,6 +1409,7 @@ func Test_splitByInterval_Do(t *testing.T) {
name string
req *LokiRequest
want *LokiResponse
ctx context.Context
}{
{
"backward",
Expand Down Expand Up @@ -1441,6 +1443,38 @@ func Test_splitByInterval_Do(t *testing.T) {
},
},
},
ctx,
},
{
"backward - no split header",
&LokiRequest{
StartTs: time.Unix(0, 0),
EndTs: time.Unix(0, (4 * time.Hour).Nanoseconds()),
Query: "",
Limit: 1000,
Step: 1,
Direction: logproto.BACKWARD,
Path: "/api/prom/query_range",
},
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.BACKWARD,
Limit: 1000,
Version: 1,
Statistics: stats.Result{Summary: stats.Summary{Splits: 0}},
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: []logproto.Stream{
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0), Line: fmt.Sprintf("%d", 0)},
},
},
},
},
},
httpreq.InjectQueryNoSplit(ctx, true),
},
{
"forward",
Expand Down Expand Up @@ -1474,6 +1508,38 @@ func Test_splitByInterval_Do(t *testing.T) {
},
},
},
ctx,
},
{
"forward - no split header",
&LokiRequest{
StartTs: time.Unix(0, 0),
EndTs: time.Unix(0, (4 * time.Hour).Nanoseconds()),
Query: "",
Limit: 1000,
Step: 1,
Direction: logproto.FORWARD,
Path: "/api/prom/query_range",
},
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
Statistics: stats.Result{Summary: stats.Summary{Splits: 0}},
Limit: 1000,
Version: 1,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: []logproto.Stream{
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0), Line: fmt.Sprintf("%d", 0)},
},
},
},
},
},
httpreq.InjectQueryNoSplit(ctx, true),
},
{
"forward limited",
Expand Down Expand Up @@ -1505,6 +1571,7 @@ func Test_splitByInterval_Do(t *testing.T) {
},
},
},
ctx,
},
{
"backward limited",
Expand Down Expand Up @@ -1536,12 +1603,13 @@ func Test_splitByInterval_Do(t *testing.T) {
},
},
},
ctx,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := split.Do(ctx, tt.req)
res, err := split.Do(tt.ctx, tt.req)
require.NoError(t, err)
require.Equal(t, tt.want, res)
})
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/httpreq/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
// LokiActorPathHeader is the name of the header e.g. used to enqueue requests in hierarchical queues.
LokiActorPathHeader = "X-Loki-Actor-Path"
LokiDisablePipelineWrappersHeader = "X-Loki-Disable-Pipeline-Wrappers"
QueryNoSplitHTTPHeader = "X-Query-No-Split"

// LokiActorPathDelimiter is the delimiter used to serialise the hierarchy of the actor.
LokiActorPathDelimiter = "|"
Expand Down Expand Up @@ -55,3 +56,31 @@ func InjectActorPath(ctx context.Context, value string) context.Context {
func InjectHeader(ctx context.Context, key, value string) context.Context {
return context.WithValue(ctx, headerContextKey(key), value)
}

func ExtractQueryNoSplitMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()

noSplit := ExtractQueryNoSplitFromHTTP(req)
ctx = InjectQueryNoSplit(ctx, noSplit)
req = req.WithContext(ctx)
next.ServeHTTP(w, req)
})
})
}

func ExtractQueryNoSplitFromHTTP(req *http.Request) bool {
tags := req.Header.Get(string(QueryNoSplitHTTPHeader))

Check failure on line 74 in pkg/util/httpreq/headers.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unnecessary conversion (unconvert)
return tags != "" && strings.ToLower(tags) == "true"
}

func ExtractQueryNoSplitFromContext(ctx context.Context) bool {
// if the cast fails then v will be an empty string
v, _ := ctx.Value(QueryNoSplitHTTPHeader).(bool)
return v
}

func InjectQueryNoSplit(ctx context.Context, noSplit bool) context.Context {
return context.WithValue(ctx, QueryNoSplitHTTPHeader, noSplit)

Check warning on line 85 in pkg/util/httpreq/headers.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

context-keys-type: should not use basic type string as key in context.WithValue (revive)
}
61 changes: 61 additions & 0 deletions pkg/util/httpreq/headers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package httpreq

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestQueryNoSplit(t *testing.T) {
for _, tc := range []struct {
desc string
in string
exp bool
error bool
}{
{
desc: "true",
in: `true`,
exp: true,
},
{
desc: "false",
in: `false`,
exp: false,
},
{
desc: "absent",
in: ``,
exp: false,
},
{
desc: "garbage",
in: `garbage`,
exp: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
req := httptest.NewRequest("GET", "http://testing.com", nil)
if tc.in != "" {
req.Header.Set(string(QueryNoSplitHTTPHeader), tc.in)

Check failure on line 43 in pkg/util/httpreq/headers_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unnecessary conversion (unconvert)
}

w := httptest.NewRecorder()
checked := false
mware := ExtractQueryNoSplitMiddleware().Wrap(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {
noSplit := req.Context().Value(QueryNoSplitHTTPHeader)
noSplitBool, ok := noSplit.(bool)
require.True(t, ok)
require.Equal(t, tc.exp, noSplitBool)
checked = true
}))

mware.ServeHTTP(w, req)

assert.True(t, true, checked)
})
}
}

0 comments on commit 573b65e

Please sign in to comment.