diff --git a/CHANGELOG.md b/CHANGELOG.md index f545f0de..2a19c136 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ Unreleased section should follow [Release Toolkit](https://github.com/newrelic/r ## Unreleased +### Enhancement +- Added XPrometheusHeader to allow exporters to give up complex computation when needed. + ## v2.20.1 - 2024-01-03 ### ⛓️ Dependencies @@ -105,7 +108,7 @@ In particular, it was not possible having `lowDataMode=true` to filter out every ## 2.16.3 ## Changed -- Several dependencies updated +- Several dependencies updated - The `use_bearer` config is now exposed the config for static targets by @paologallinaharbur in https://github.com/newrelic/nri-prometheus/pull/327 ## 2.16.2 diff --git a/configs/nri-prometheus-config.yml.sample b/configs/nri-prometheus-config.yml.sample index e96637e9..6abd371f 100644 --- a/configs/nri-prometheus-config.yml.sample +++ b/configs/nri-prometheus-config.yml.sample @@ -27,6 +27,7 @@ integrations: audit: false # The HTTP client timeout when fetching data from endpoints. Defaults to "5s" if it is not set. + # This timeout in seconds is passed as well as a X-Prometheus-Scrape-Timeout-Seconds header to the exporters # scrape_timeout: "5s" # Length in time to distribute the scraping from the endpoints. Default to "30s" if it is not set. diff --git a/internal/integration/fetcher.go b/internal/integration/fetcher.go index 3a5ace55..a008b772 100644 --- a/internal/integration/fetcher.go +++ b/internal/integration/fetcher.go @@ -10,6 +10,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "strings" "sync" "time" @@ -155,7 +156,7 @@ type prometheusFetcher struct { httpClient prometheus.HTTPDoer bearerClient prometheus.HTTPDoer // Provides IoC for better testability. Its usual value is 'prometheus.Get'. - getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string) (prometheus.MetricFamiliesByName, error) + getMetrics func(httpClient prometheus.HTTPDoer, url string, acceptHeader string, fetchTimeout string) (prometheus.MetricFamiliesByName, error) log *logrus.Entry } @@ -253,7 +254,8 @@ func (pf *prometheusFetcher) fetch(t endpoints.Target) (prometheus.MetricFamilie httpClient = pf.bearerClient } - mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader) + ft := strconv.FormatFloat(pf.fetchTimeout.Seconds(), 'f', -1, 64) + mfs, err := pf.getMetrics(httpClient, t.URL.String(), pf.acceptHeader, ft) timer.ObserveDuration() if err != nil { pf.log.WithError(err).Warnf("fetching Prometheus metrics: %s (%s)", t.URL.String(), t.Object.Name) diff --git a/internal/integration/fetcher_test.go b/internal/integration/fetcher_test.go index 10d9427b..795a6cf7 100644 --- a/internal/integration/fetcher_test.go +++ b/internal/integration/fetcher_test.go @@ -4,7 +4,10 @@ package integration import ( + "bytes" "fmt" + "io" + "net/http" "net/url" "strings" "sync/atomic" @@ -24,17 +27,58 @@ import ( const ( fetchDuration = 1 * time.Millisecond fetchTimeout = time.Second * 5 + testAccept = "this-is-a-test" workerThreads = 4 queueLength = 100 ) +type mockClient struct { + recordedHeader http.Header +} + +func (m *mockClient) Do(req *http.Request) (*http.Response, error) { + m.recordedHeader = req.Header + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(""))), + }, nil +} + +func TestXHeader(t *testing.T) { + mClient := mockClient{} + fetcher := NewFetcher(fetchDuration, fetchTimeout, "this-is-a-test", workerThreads, "", "", true, queueLength) + fetcher.(*prometheusFetcher).httpClient = &mClient + + pairsCh := fetcher.Fetch([]endpoints.Target{ + { + URL: url.URL{Scheme: "http", Path: "hello/metrics"}, + }, + }) + + select { + case <-pairsCh: + case <-time.After(fetchTimeout): + t.Fatal("can't fetch data") + } + + accept := mClient.recordedHeader.Get(prometheus.AcceptHeader) + if accept != testAccept { + t.Errorf("Expected Accept header %q, got %q", testAccept, accept) + } + + xPrometheus := mClient.recordedHeader.Get(prometheus.XPrometheusScrapeTimeoutHeader) + if xPrometheus != "5" { + t.Errorf("Expected xPrometheus header %q, got %q", "5", xPrometheus) + } +} + func TestFetcher(t *testing.T) { t.Parallel() // Given a fetcher fetcher := NewFetcher(fetchDuration, fetchTimeout, "", workerThreads, "", "", true, queueLength) var invokedURL string - fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) { + fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) { invokedURL = url return prometheus.MetricFamiliesByName{ "some-name": dto.MetricFamily{}, @@ -73,7 +117,7 @@ func TestFetcher_Error(t *testing.T) { // That fails retrieving data from one of the metrics endpoint invokedURLs := make([]string, 0) - fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) { + fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) { if strings.Contains(url, "fail") { return nil, errors.New("catapun") } @@ -125,7 +169,7 @@ func TestFetcher_ConcurrencyLimit(t *testing.T) { // Given a Fetcher fetcher := NewFetcher(time.Millisecond, fetchTimeout, "", workerThreads, "", "", true, queueLength) - fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string) (names prometheus.MetricFamiliesByName, e error) { + fetcher.(*prometheusFetcher).getMetrics = func(client prometheus.HTTPDoer, url string, _ string, _ string) (names prometheus.MetricFamiliesByName, e error) { defer atomic.AddInt32(¶llelTasks, -1) atomic.AddInt32(¶llelTasks, 1) reportedParallel <- atomic.LoadInt32(¶llelTasks) diff --git a/internal/pkg/prometheus/prometheus.go b/internal/pkg/prometheus/prometheus.go index 96407968..9ca6185a 100644 --- a/internal/pkg/prometheus/prometheus.go +++ b/internal/pkg/prometheus/prometheus.go @@ -36,15 +36,23 @@ func ResetTargetSize() { targetSize.Reset() } +const ( + // XPrometheusScrapeTimeoutHeader included in all requests. It informs exporters about its timeout. + XPrometheusScrapeTimeoutHeader = "X-Prometheus-Scrape-Timeout-Seconds" + // AcceptHeader included in all requests + AcceptHeader = "Accept" +) + // Get scrapes the given URL and decodes the retrieved payload. -func Get(client HTTPDoer, url string, acceptHeader string) (MetricFamiliesByName, error) { +func Get(client HTTPDoer, url string, acceptHeader string, fetchTimeout string) (MetricFamiliesByName, error) { mfs := MetricFamiliesByName{} req, err := http.NewRequest("GET", url, nil) if err != nil { return mfs, err } - req.Header.Add("Accept", acceptHeader) + req.Header.Add(AcceptHeader, acceptHeader) + req.Header.Add(XPrometheusScrapeTimeoutHeader, fetchTimeout) resp, err := client.Do(req) if err != nil { diff --git a/internal/pkg/prometheus/prometheus_test.go b/internal/pkg/prometheus/prometheus_test.go index 8c93fbc8..ee3a5dbf 100644 --- a/internal/pkg/prometheus/prometheus_test.go +++ b/internal/pkg/prometheus/prometheus_test.go @@ -15,18 +15,25 @@ import ( const testHeader = "application/openmetrics-text" func TestGetHeader(t *testing.T) { + fetchTimeout := "15" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - accept := r.Header.Get("Accept") + accept := r.Header.Get(prometheus.AcceptHeader) if accept != testHeader { t.Errorf("Expected Accept header %s, got %q", testHeader, accept) } + xPrometheus := r.Header.Get(prometheus.XPrometheusScrapeTimeoutHeader) + if xPrometheus != fetchTimeout { + t.Errorf("Expected xPrometheus header %s, got %q", xPrometheus, fetchTimeout) + } + _, _ = w.Write([]byte("metric_a 1\nmetric_b 2\n")) })) defer ts.Close() expected := []string{"metric_a", "metric_b"} - mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader) + mfs, err := prometheus.Get(http.DefaultClient, ts.URL, testHeader, fetchTimeout) actual := []string{} for k := range mfs { actual = append(actual, k)