Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/header xprometheus #470

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Unreleased section should follow [Release Toolkit](https://github.com/newrelic/r

## Unreleased

### Enhancement
- Added XPrometheusHeader to allow exporters to give up complex computation when needed.

## v2.20.1 - 2024-01-03

### ⛓️ Dependencies
Expand Down Expand Up @@ -105,7 +108,7 @@ In particular, it was not possible having `lowDataMode=true` to filter out every

## 2.16.3
## Changed
- Several dependencies updated
- Several dependencies updated
- The `use_bearer` config is now exposed the config for static targets by @paologallinaharbur in https://github.com/newrelic/nri-prometheus/pull/327

## 2.16.2
Expand Down
1 change: 1 addition & 0 deletions configs/nri-prometheus-config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ integrations:
audit: false

# The HTTP client timeout when fetching data from endpoints. Defaults to "5s" if it is not set.
# This timeout in seconds is passed as well as a X-Prometheus-Scrape-Timeout-Seconds header to the exporters
# scrape_timeout: "5s"

# Length in time to distribute the scraping from the endpoints. Default to "30s" if it is not set.
Expand Down
6 changes: 4 additions & 2 deletions internal/integration/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -155,7 +156,7 @@ type prometheusFetcher struct {
httpClient prometheus.HTTPDoer
bearerClient prometheus.HTTPDoer
// Provides IoC for better testability. Its usual value is 'prometheus.Get'.
getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string) (prometheus.MetricFamiliesByName, error)
getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string, fetchTimeout string) (prometheus.MetricFamiliesByName, error)
log *logrus.Entry
}

Expand Down Expand Up @@ -253,7 +254,8 @@ func (pf *prometheusFetcher) fetch(t endpoints.Target) (prometheus.MetricFamilie
httpClient = pf.bearerClient
}

mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader)
ft := strconv.FormatFloat(pf.fetchTimeout.Seconds(), 'f', -1, 64)
mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader, ft)
timer.ObserveDuration()
if err != nil {
pf.log.WithError(err).Warnf("fetching Prometheus metrics: %s (%s)", t.URL.String(), t.Object.Name)
Expand Down
50 changes: 47 additions & 3 deletions internal/integration/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package integration

import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync/atomic"
Expand All @@ -24,17 +27,58 @@ import (
const (
fetchDuration = 1 * time.Millisecond
fetchTimeout = time.Second * 5
testAccept = "this-is-a-test"
workerThreads = 4
queueLength = 100
)

type mockClient struct {
recordedHeader http.Header
}

func (m *mockClient) Do(req *http.Request) (*http.Response, error) {
m.recordedHeader = req.Header
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(""))),
}, nil
}

func TestXHeader(t *testing.T) {
mClient := mockClient{}
fetcher := NewFetcher(fetchDuration, fetchTimeout, "this-is-a-test", workerThreads, "", "", true, queueLength)
fetcher.(*prometheusFetcher).httpClient = &mClient

pairsCh := fetcher.Fetch([]endpoints.Target{
{
URL: url.URL{Scheme: "http", Path: "hello/metrics"},
},
})

select {
case <-pairsCh:
case <-time.After(fetchTimeout):
t.Fatal("can't fetch data")
}

accept := mClient.recordedHeader.Get(prometheus.AcceptHeader)
if accept != testAccept {
t.Errorf("Expected Accept header %q, got %q", testAccept, accept)
}

xPrometheus := mClient.recordedHeader.Get(prometheus.XPrometheusScrapeTimeoutHeader)
if xPrometheus != "5" {
t.Errorf("Expected xPrometheus header %q, got %q", "5", xPrometheus)
}
}

func TestFetcher(t *testing.T) {
t.Parallel()

// Given a fetcher
fetcher := NewFetcher(fetchDuration, fetchTimeout, "", workerThreads, "", "", true, queueLength)
var invokedURL string
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
invokedURL = url
return prometheus.MetricFamiliesByName{
"some-name": dto.MetricFamily{},
Expand Down Expand Up @@ -73,7 +117,7 @@ func TestFetcher_Error(t *testing.T) {

// That fails retrieving data from one of the metrics endpoint
invokedURLs := make([]string, 0)
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
if strings.Contains(url, "fail") {
return nil, errors.New("catapun")
}
Expand Down Expand Up @@ -125,7 +169,7 @@ func TestFetcher_ConcurrencyLimit(t *testing.T) {
// Given a Fetcher
fetcher := NewFetcher(time.Millisecond, fetchTimeout, "", workerThreads, "", "", true, queueLength)

fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) {
fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) {
defer atomic.AddInt32(&parallelTasks, -1)
atomic.AddInt32(&parallelTasks, 1)
reportedParallel <- atomic.LoadInt32(&parallelTasks)
Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,23 @@ func ResetTargetSize() {
targetSize.Reset()
}

const (
// XPrometheusScrapeTimeoutHeader included in all requests. It informs exporters about its timeout.
XPrometheusScrapeTimeoutHeader = "X-Prometheus-Scrape-Timeout-Seconds"
// AcceptHeader included in all requests
AcceptHeader = "Accept"
)

// Get scrapes the given URL and decodes the retrieved payload.
func Get(client HTTPDoer, url string, acceptHeader string) (MetricFamiliesByName, error) {
func Get(client HTTPDoer, url string, acceptHeader string, fetchTimeout string) (MetricFamiliesByName, error) {
mfs := MetricFamiliesByName{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return mfs, err
}

req.Header.Add("Accept", acceptHeader)
req.Header.Add(AcceptHeader, acceptHeader)
req.Header.Add(XPrometheusScrapeTimeoutHeader, fetchTimeout)

resp, err := client.Do(req)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions internal/pkg/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ import (
const testHeader = "application/openmetrics-text"

func TestGetHeader(t *testing.T) {
fetchTimeout := "15"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
accept := r.Header.Get("Accept")
accept := r.Header.Get(prometheus.AcceptHeader)
if accept != testHeader {
t.Errorf("Expected Accept header %s, got %q", testHeader, accept)
}

xPrometheus := r.Header.Get(prometheus.XPrometheusScrapeTimeoutHeader)
if xPrometheus != fetchTimeout {
t.Errorf("Expected xPrometheus header %s, got %q", xPrometheus, fetchTimeout)
}

_, _ = w.Write([]byte("metric_a 1\nmetric_b 2\n"))
}))
defer ts.Close()

expected := []string{"metric_a", "metric_b"}
mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader)
mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader, fetchTimeout)
actual := []string{}
for k := range mfs {
actual = append(actual, k)
Expand Down
Loading