diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 81354ffd0..63e9f273c 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,7 +12,3 @@ updates: patterns: - "go.opentelemetry.io/*" open-pull-requests-limit: 20 - - package-ecosystem: "pip" - directory: "/tools/load-generator" - schedule: - interval: "monthly" diff --git a/.promu.yml b/.promu.yml index 1058f0b8a..7916312d2 100644 --- a/.promu.yml +++ b/.promu.yml @@ -18,6 +18,8 @@ build: path: ./tools/fake-webserver - name: tools/scaler path: ./tools/scaler + - name: tools/load-generator + path: ./tools/load-generator flags: -a -tags netgo crossbuild: platforms: diff --git a/tools/load-generator/Dockerfile b/tools/load-generator/Dockerfile index 309c6758e..3d8f4972c 100644 --- a/tools/load-generator/Dockerfile +++ b/tools/load-generator/Dockerfile @@ -1,10 +1,8 @@ -FROM python:3 +FROM quay.io/prometheus/busybox:latest +LABEL maintainer="The Prometheus Authors " -COPY ./requirements.txt /usr/src/app/ -RUN pip install -r /usr/src/app/requirements.txt +COPY load-generator /bin/load-generator -COPY ./*.py /usr/src/app/ +EXPOSE 8080 -WORKDIR /usr/src/app - -ENTRYPOINT ["python", "-u", "./main.py"] +ENTRYPOINT ["/bin/load-generator"] \ No newline at end of file diff --git a/tools/load-generator/main.go b/tools/load-generator/main.go new file mode 100644 index 000000000..410c4bdc2 --- /dev/null +++ b/tools/load-generator/main.go @@ -0,0 +1,239 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io" + "log" + "net/http" + "os" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/model" + "gopkg.in/yaml.v2" +) + +// Global variables and Prometheus metrics + +const max404Errors = 30 + +var ( + domainName = os.Getenv("DOMAIN_NAME") + + queryDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "loadgen", + Name: "query_duration_seconds", + Help: "Query duration", + Buckets: prometheus.LinearBuckets(0.05, 0.1, 20), + }, + []string{"prometheus", "group", "expr", "type"}, + ) + queryCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "loadgen", + Name: "queries_total", + Help: "Total amount of queries", + }, + []string{"prometheus", "group", "expr", "type"}, + ) + queryFailCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "loadgen", + Name: "failed_queries_total", + Help: "Amount of failed queries", + }, + []string{"prometheus", "group", "expr", "type"}, + ) +) + +// Querier struct and methods +type Querier struct { + target string + name string + groupID int + numberOfErrors int + + interval time.Duration + queries []Query + qtype string + start time.Duration + end time.Duration + step string + url string +} + +type Query struct { + Expr string `yaml:"expr"` +} + +type QueryGroup struct { + Name string `yaml:"name"` + Interval string `yaml:"interval"` + Queries []Query `yaml:"queries"` + Type string `yaml:"type,omitempty"` + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Step string `yaml:"step,omitempty"` +} + +func NewQuerier(groupID int, target, prNumber string, qg QueryGroup) *Querier { + qtype := qg.Type + if qtype == "" { + qtype = "instant" + } + + start := durationSeconds(qg.Start) + end := durationSeconds(qg.End) + + url := fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query", domainName, prNumber, target) + if qtype == "range" { + url = fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query_range", domainName, prNumber, target) + } + + return &Querier{ + target: target, + name: qg.Name, + groupID: groupID, + interval: durationSeconds(qg.Interval), + queries: qg.Queries, + qtype: qtype, + start: start, + end: end, + step: qg.Step, + url: url, + } +} + +func (q *Querier) run(wg *sync.WaitGroup) { + defer wg.Done() + fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url) + time.Sleep(20 * time.Second) + + for { + start := time.Now() + + for _, query := range q.queries { + q.query(query.Expr) + } + + wait := q.interval - time.Since(start) + if wait > 0 { + time.Sleep(wait) + } + } +} + +func (q *Querier) query(expr string) { + queryCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc() + start := time.Now() + + req, err := http.NewRequest("GET", q.url, nil) + if err != nil { + log.Printf("Error creating request: %v", err) + queryFailCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc() + return + } + + qParams := req.URL.Query() + qParams.Set("query", expr) + if q.qtype == "range" { + qParams.Set("start", fmt.Sprintf("%d", int64(time.Now().Add(-q.start).Unix()))) + qParams.Set("end", fmt.Sprintf("%d", int64(time.Now().Add(-q.end).Unix()))) + qParams.Set("step", q.step) + } + req.URL.RawQuery = qParams.Encode() + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("Error querying Prometheus: %v", err) + queryFailCount.WithLabelValues(q.target, q.name, expr, q.qtype).Inc() + return + } + defer resp.Body.Close() + + duration := time.Since(start) + queryDuration.WithLabelValues(q.target, q.name, expr, q.qtype).Observe(duration.Seconds()) + + if resp.StatusCode == http.StatusNotFound { + log.Printf("WARNING: GroupID#%d: Querier returned 404 for Prometheus instance %s.", q.groupID, q.url) + q.numberOfErrors++ + if q.numberOfErrors >= max404Errors { + log.Fatalf("ERROR: GroupID#%d: Querier returned 404 for Prometheus instance %s %d times.", q.groupID, q.url, max404Errors) + } + } else if resp.StatusCode != http.StatusOK { + log.Printf("WARNING: GroupID#%d: Querier returned %d for Prometheus instance %s.", q.groupID, resp.StatusCode, q.url) + } else { + body, _ := io.ReadAll(resp.Body) + log.Printf("GroupID#%d: query %s %s, status=%d, size=%d, duration=%.3f", q.groupID, q.target, expr, resp.StatusCode, len(body), duration.Seconds()) + } +} + +func durationSeconds(s string) time.Duration { + if s == "" { + return 0 + } + value, err := model.ParseDuration(s) + if err != nil { + log.Fatalf("%s", err.Error()) + } + return time.Duration(value) +} + +func main() { + if len(os.Args) < 3 { + fmt.Println("unexpected arguments") + fmt.Println("usage: ") + os.Exit(2) + } + prNumber := os.Args[2] + + configFile, err := os.ReadFile("/etc/loadgen/config.yaml") + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + var config struct { + Querier struct { + Groups []QueryGroup `yaml:"groups"` + } `yaml:"querier"` + } + if err := yaml.Unmarshal(configFile, &config); err != nil { + log.Fatalf("Failed to parse config: %v", err) + } + + fmt.Println("Loaded configuration") + + var wg sync.WaitGroup + + for i, group := range config.Querier.Groups { + wg.Add(1) + go NewQuerier(i, "pr", prNumber, group).run(&wg) + wg.Add(1) + go NewQuerier(i, "release", prNumber, group).run(&wg) + } + + prometheus.MustRegister(queryDuration, queryCount, queryFailCount) + http.Handle("/metrics", promhttp.Handler()) + go func() { + log.Println("Starting HTTP server on :8080") + log.Fatal(http.ListenAndServe(":8080", nil)) + }() + + wg.Wait() +} diff --git a/tools/load-generator/main.py b/tools/load-generator/main.py deleted file mode 100644 index 75c6f5450..000000000 --- a/tools/load-generator/main.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python - -import os -import time -import sys -import requests -import yaml -import threading -from datetime import timedelta - -from prometheus_client import start_http_server, Histogram, Counter - -namespace = "" -max_404_errors = 30 -domain_name = os.environ["DOMAIN_NAME"] - -class Querier(object): - """ - Querier launches groups of queries against a Prometheus service. - """ - - query_duration = Histogram("loadgen_query_duration_seconds", "Query duration", - ["prometheus", "group", "expr", "type"], - buckets=(0.05, 0.1, 0.3, 0.7, 1.5, 2.5, 4, 6, 8, 10, 13, 16, 20, 24, 29, 36, 42, 50, 60)) - - query_count = Counter('loadgen_queries_total', 'Total amount of queries', - ["prometheus", "group", "expr", "type"], - ) - query_fail_count = Counter('loadgen_failed_queries_total', 'Amount of failed queries', - ["prometheus", "group", "expr", "type"], - ) - - def __init__(self, groupID, target, pr_number, qg): - self.target = target - self.name = qg["name"] - self.groupID = groupID - self.numberOfErrors = 0 - - self.interval = duration_seconds(qg["interval"]) - self.queries = qg["queries"] - self.type = qg.get("type", "instant") - self.start = duration_seconds(qg.get("start", "0h")) - self.end = duration_seconds(qg.get("end", "0h")) - self.step = qg.get("step", "15s") - - if self.type == "instant": - self.url = "http://%s/%s/prometheus-%s/api/v1/query" % (domain_name, pr_number, target) - else: - self.url = "http://%s/%s/prometheus-%s/api/v1/query_range" % (domain_name, pr_number, target) - - def run(self): - print("run querier %s %s for %s" % (self.target, self.name, self.url)) - print("Waiting for 20 seconds to allow prometheus server (%s) to be properly set-up" % (self.url)) - time.sleep(20) - - while True: - start = time.time() - - for q in self.queries: - self.query(q["expr"]) - - wait = self.interval - (time.time() - start) - time.sleep(max(wait, 0)) - - def query(self, expr): - try: - Querier.query_count.labels(self.target, self.name, expr, self.type).inc() - start = time.time() - - params = {"query": expr} - if self.type == "range": - params["start"] = start - self.start - params["end"] = start - self.end - params["step"] = self.step - - resp = requests.get(self.url, params) - dur = time.time() - start - - if resp.status_code == 404: - print("WARNING :: GroupId#%d : Querier returned 404 for prometheus instance %s." % (self.groupID, self.url)) - self.numberOfErrors += 1 - if self.numberOfErrors == max_404_errors: - print("ERROR :: GroupId#%d : Querier returned 404 for prometheus instance %s %d times." % (self.groupID, self.url, max_404_errors)) - os._exit(1) - elif resp.status_code != 200: - print("WARNING :: GroupId#%d : Querier returned %d for prometheus instance %s." % (self.groupID, resp.status_code, self.url)) - else: - print("GroupId#%d : query %s %s, status=%s, size=%d, dur=%.3f" % (self.groupID, self.target, expr, resp.status_code, len(resp.text), dur)) - Querier.query_duration.labels(self.target, self.name, expr, self.type).observe(dur) - - except IOError as e: - Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() - print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) - - except Exception as e: - Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() - print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) - -def duration_seconds(s): - num = int(s[:-1]) - - if s.endswith('s'): - return timedelta(seconds=num).total_seconds() - elif s.endswith('m'): - return timedelta(minutes=num).total_seconds() - elif s.endswith('h'): - return timedelta(hours=num).total_seconds() - - raise "unknown duration %s" % s - -def main(): - if len(sys.argv) < 3: - print("unexpected arguments") - print("usage: ") - exit(2) - - global namespace - namespace = sys.argv[1] - pr_number = sys.argv[2] - - config = yaml.load(open("/etc/loadgen/config.yaml", 'r').read(), Loader=yaml.FullLoader) - - print("loaded configuration") - - for i,g in enumerate(config["querier"]["groups"]): - p = threading.Thread(target=Querier(i, "pr", pr_number, g).run) - p.start() - - for i,g in enumerate(config["querier"]["groups"]): - p = threading.Thread(target=Querier(i, "release", pr_number, g).run) - p.start() - - start_http_server(8080) - print("started HTTP server on 8080") - - while True: - time.sleep(100) - -if __name__ == "__main__": - main()