Skip to content

Commit

Permalink
Merge pull request #95 from intergral/deepql
Browse files Browse the repository at this point in the history
fix(routtrip): fix round tripper not find correct path
  • Loading branch information
Umaaz authored Apr 23, 2024
2 parents bfa2730 + 884bc76 commit 22113b6
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 20 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
<!-- main START -->
<!-- 1.0.8 START -->
# 1.0.8 (23/04/2024)
- **[BUGFIX]**: fix deepql running in distributed mode [#95](https://github.com/intergral/deep/pull/95) [@Umaaz](https://github.com/Umaaz)
<!-- 1.0.8 START -->

<!-- 1.0.7 START -->
# 1.0.7 (18/04/2024)
- **[FEATURE]**: add support for deepql [#93](https://github.com/intergral/deep/pull/93) [@Umaaz](https://github.com/Umaaz)
<!-- main START -->
<!-- 1.0.7 START -->

<!-- 1.0.6 START -->
# 1.0.6 (14/03/2024)
Expand Down
4 changes: 2 additions & 2 deletions cmd/deep/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ func (t *App) initQuerier() (services.Service, error) {
// tracepointAPI handles requests to change the config for tracepoints
func (t *App) initQueryFrontend() (services.Service, error) {
// we create to 2 bridges (roundTrippers) one for each backend
roundTripper, tpTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
roundTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
t.frontend = v1

// create query frontend
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, tpTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/docker-compose/debug/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-tracepoint-panel/releases/download/v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/grafana-deep-panel/releases/download/v1.0.1/intergral-deep-panel-1.0.1.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v1.0.2/intergral-deep-datasource-1.0.2.zip;intergral-deep-datasource
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
ports:
- "3000:3000"

Expand Down
19 changes: 17 additions & 2 deletions examples/docker-compose/distributed/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,27 @@ services:
grafana:
image: grafana/grafana-oss
volumes:
- ../shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- ../shared/dashboards:/etc/grafana/provisioning/dashboards/
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-panel/releases/download/v0.0.3/intergral-deep-panel-0.0.3.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v0.0.7/intergral-deep-datasource-0.0.7.zip;intergral-deep-datasource
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
ports:
- "3000:3000"

test_load:
image: intergral/deep-cli
depends_on:
- distributor
command:
- generate
- snapshot
- --endpoint=distributor:43315
- --count=3
- --sleep=5
- --iterations=-1
- --random-string
- --random-duration
- --duration-nanos=1000000000
3 changes: 2 additions & 1 deletion examples/docker-compose/distributed/grafana-datasources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ datasources:
apiVersion: 1
uid: deep
jsonData:
httpMethod: GET
experimental:
deepql: true
24 changes: 21 additions & 3 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net/http"
"time"

"github.com/intergral/deep/pkg/api"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -103,12 +105,28 @@ func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, limits, log, reg)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip), transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip), fr, nil
// create both round trippers
searchRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip)
tracepointRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip)
// filter the round trippers based on the paths
tripper := transport.NewSplittingRoundTripper(&transport.SplitRule{
Rules: []string{
api.PathPrefixTracepoints,
},
RoundTripper: tracepointRt,
}, &transport.SplitRule{
Rules: []string{
api.PathPrefixQuerier,
},
RoundTripper: searchRt,
})

return tripper, fr, nil
}
4 changes: 2 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type QueryFrontend struct {
}

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.SnapshotByID.QueryShards < minQueryShards || cfg.SnapshotByID.QueryShards > maxQueryShards {
Expand Down Expand Up @@ -100,7 +100,7 @@ func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overri
search := searchMiddleware.Wrap(next)

tpMiddleware := newTracepointForwardMiddleware()
tpHandler := tpMiddleware.Wrap(tpNext)
tpHandler := tpMiddleware.Wrap(next)

return &QueryFrontend{
SnapshotByID: newHandler(snapshots, snapshotByIDCounter, logger),
Expand Down
13 changes: 6 additions & 7 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error)

func TestFrontendRoundTripsSearch(t *testing.T) {
next := &mockNextTripperware{}
tpNext := &mockNextTripperware{}
f, err := New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: minQueryShards,
Expand All @@ -54,7 +53,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
},
SLO: testSLOcfg,
},
}, next, tpNext, nil, nil, log.NewNopLogger(), nil)
}, next, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)
Expand All @@ -77,7 +76,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -93,7 +92,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -109,7 +108,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

Expand All @@ -125,7 +124,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

Expand All @@ -143,7 +142,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
44 changes: 44 additions & 0 deletions modules/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ package transport
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"net/http"
"strings"

"github.com/opentracing/opentracing-go"

frontend_v1pb "github.com/intergral/deep/modules/frontend/v1/frontendv1pb"
)
Expand Down Expand Up @@ -95,3 +99,43 @@ func fromHeader(hs http.Header) []*frontend_v1pb.Header {
}
return result
}

// SplitRule allows controlling which requests go to this roundtripper
type SplitRule struct {
Rules []string
RoundTripper http.RoundTripper
}

func (r *SplitRule) matches(url string) bool {
for _, rule := range r.Rules {
if strings.HasPrefix(url, rule) {
return true
}
}
return false
}

type splitRoundTripper struct {
http.RoundTripper

rules []*SplitRule
}

// NewSplittingRoundTripper creates a new round tripper with rules
func NewSplittingRoundTripper(rules ...*SplitRule) http.RoundTripper {
return &splitRoundTripper{
rules: rules,
}
}

func (s *splitRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
span, _ := opentracing.StartSpanFromContext(r.Context(), "splitRoundTripper.RoundTrip")
defer span.Finish()

for _, rule := range s.rules {
if rule.matches(r.RequestURI) {
return rule.RoundTripper.RoundTrip(r)
}
}
return nil, errors.New("no matching round tripper")
}

0 comments on commit 22113b6

Please sign in to comment.