From 1b24d3d3cef040d53dca55dbf1052437686e91e2 Mon Sep 17 00:00:00 2001 From: Vandit Singh Date: Fri, 6 Sep 2024 14:17:01 +0530 Subject: [PATCH] rewrite load-generator in go Signed-off-by: Vandit Singh --- .github/dependabot.yml | 4 - .gitignore | 1 + .promu.yml | 2 + tools/load-generator/Dockerfile | 11 +- tools/load-generator/main.go | 227 ++++++++++++++++++++++++++ tools/load-generator/main.py | 140 ---------------- tools/load-generator/requirements.txt | 6 - 7 files changed, 234 insertions(+), 157 deletions(-) create mode 100644 tools/load-generator/main.go delete mode 100644 tools/load-generator/main.py delete mode 100644 tools/load-generator/requirements.txt diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 81354ffd0..63e9f273c 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,7 +12,3 @@ updates: patterns: - "go.opentelemetry.io/*" open-pull-requests-limit: 20 - - package-ecosystem: "pip" - directory: "/tools/load-generator" - schedule: - interval: "monthly" diff --git a/.gitignore b/.gitignore index ce0afe612..c8b5cd7a1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /tools/amGithubNotifier/amGithubNotifier /tools/commentMonitor/commentMonitor /tools/fake-webserver/fake-webserver +/tools/load-generator/load-generator /tools/scaler/scaler /funcbench/funcbench /infra/infra diff --git a/.promu.yml b/.promu.yml index 1058f0b8a..cd60222d3 100644 --- a/.promu.yml +++ b/.promu.yml @@ -18,6 +18,8 @@ build: path: ./tools/fake-webserver - name: tools/scaler path: ./tools/scaler + - name: tools/load-generator + path: ./tools/load-generator/ flags: -a -tags netgo crossbuild: platforms: diff --git a/tools/load-generator/Dockerfile b/tools/load-generator/Dockerfile index 309c6758e..0aa272ff8 100644 --- a/tools/load-generator/Dockerfile +++ b/tools/load-generator/Dockerfile @@ -1,10 +1,7 @@ -FROM python:3 +FROM quay.io/prometheus/busybox:latest -COPY ./requirements.txt /usr/src/app/ -RUN pip install -r /usr/src/app/requirements.txt +LABEL maintainer="The Prometheus Authors " -COPY ./*.py /usr/src/app/ +COPY ./load-generator /bin/load-generator -WORKDIR /usr/src/app - -ENTRYPOINT ["python", "-u", "./main.py"] +ENTRYPOINT [ "/bin/load-generator" ] diff --git a/tools/load-generator/main.go b/tools/load-generator/main.go new file mode 100644 index 000000000..740ffcd31 --- /dev/null +++ b/tools/load-generator/main.go @@ -0,0 +1,227 @@ +package main + +import ( + "fmt" + "log" + "net/http" + "os" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "gopkg.in/yaml.v2" +) + +var ( + namespace string + max404Errors = 30 + domainName = os.Getenv("DOMAIN_NAME") + queryDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "loadgen_query_duration_seconds", + Help: "Query duration", + Buckets: prometheus.LinearBuckets(0.05, 0.1, 15), + }, + []string{"prometheus", "group", "expr", "type"}, + ) + queryCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "loadgen_queries_total", + Help: "Total amount of queries", + }, + []string{"prometheus", "group", "expr", "type"}, + ) + queryFailCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "loadgen_failed_queries_total", + Help: "Amount of failed queries", + }, + []string{"prometheus", "group", "expr", "type"}, + ) +) + +type Querier struct { + target string + name string + groupID int + numberOfErrors int + interval time.Duration + queries []Query + queryType string + start time.Duration + end time.Duration + step string + url string +} + +type Query struct { + Expr string `yaml:"expr"` +} + +type QuerierGroup struct { + Name string `yaml:"name"` + Interval string `yaml:"interval"` + Queries []Query `yaml:"queries"` + Type string `yaml:"type,omitempty"` + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Step string `yaml:"step,omitempty"` +} + +type Config struct { + Querier struct { + Groups []QuerierGroup `yaml:"groups"` + } `yaml:"querier"` +} + +func (q *Querier) Run() { + fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url) + fmt.Printf("Waiting for 20 seconds to allow Prometheus server (%s) to be properly set up\n", q.url) + time.Sleep(20 * time.Second) + + for { + startTime := time.Now() + + for _, query := range q.queries { + q.executeQuery(query.Expr) + } + + wait := q.interval - time.Since(startTime) + if wait > 0 { + time.Sleep(wait) + } + } +} + +func (q *Querier) executeQuery(expr string) { + queryCount.WithLabelValues(q.target, q.name, expr, q.queryType).Inc() + start := time.Now() + + params := map[string]string{ + "query": expr, + } + + if q.queryType == "range" { + params["start"] = fmt.Sprintf("%.0f", start.Add(-q.start).Unix()) + params["end"] = fmt.Sprintf("%.0f", start.Add(-q.end).Unix()) + params["step"] = q.step + } + + resp, err := http.Get(fmt.Sprintf("%s?%s", q.url, params)) + duration := time.Since(start).Seconds() + + if err != nil { + log.Printf("WARNING :: GroupID#%d : Could not query Prometheus instance %s. \n %v", q.groupID, q.url, err) + queryFailCount.WithLabelValues(q.target, q.name, expr, q.queryType).Inc() + return + } + + if resp.StatusCode == http.StatusNotFound { + log.Printf("WARNING :: GroupID#%d : Querier returned 404 for Prometheus instance %s.", q.groupID, q.url) + q.numberOfErrors++ + if q.numberOfErrors == max404Errors { + log.Fatalf("ERROR :: GroupID#%d : Querier returned 404 for Prometheus instance %s %d times.", q.groupID, q.url, max404Errors) + } + } else if resp.StatusCode != http.StatusOK { + log.Printf("WARNING :: GroupID#%d : Querier returned %d for Prometheus instance %s.", q.groupID, resp.StatusCode, q.url) + } else { + log.Printf("GroupID#%d : query %s %s, status=%d, duration=%.3fs", q.groupID, q.target, expr, resp.StatusCode, duration) + queryDuration.WithLabelValues(q.target, q.name, expr, q.queryType).Observe(duration) + } +} + +func durationToSeconds(duration string) time.Duration { + var dur time.Duration + var err error + + switch { + case duration[len(duration)-1] == 's': + dur, err = time.ParseDuration(duration) + case duration[len(duration)-1] == 'm': + dur, err = time.ParseDuration(duration + "in") + case duration[len(duration)-1] == 'h': + dur, err = time.ParseDuration(duration + "our") + default: + log.Fatalf("unknown duration %s", duration) + } + + if err != nil { + log.Fatalf("Invalid duration: %v", err) + } + + return dur +} + +func main() { + if len(os.Args) < 3 { + log.Fatalf("unexpected arguments\nusage: ") + } + + namespace = os.Args[1] + prNumber := os.Args[2] + + file, err := os.Open("/etc/loadgen/config.yaml") + if err != nil { + log.Fatalf("Could not open config file: %v", err) + } + defer file.Close() + + var config Config + decoder := yaml.NewDecoder(file) + if err := decoder.Decode(&config); err != nil { + log.Fatalf("Could not parse config: %v", err) + } + + fmt.Println("Loaded configuration") + + var wg sync.WaitGroup + + for i, g := range config.Querier.Groups { + wg.Add(1) + go func(i int, g QuerierGroup) { + defer wg.Done() + querier := &Querier{ + target: "pr", + name: g.Name, + groupID: i, + interval: durationToSeconds(g.Interval), + queries: g.Queries, + queryType: g.Type, + start: durationToSeconds(g.Start), + end: durationToSeconds(g.End), + step: g.Step, + url: fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query", domainName, prNumber, "pr"), + } + querier.Run() + }(i, g) + } + + for i, g := range config.Querier.Groups { + wg.Add(1) + go func(i int, g QuerierGroup) { + defer wg.Done() + querier := &Querier{ + target: "release", + name: g.Name, + groupID: i, + interval: durationToSeconds(g.Interval), + queries: g.Queries, + queryType: g.Type, + start: durationToSeconds(g.Start), + end: durationToSeconds(g.End), + step: g.Step, + url: fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query", domainName, prNumber, "release"), + } + querier.Run() + }(i, g) + } + + http.Handle("/metrics", promhttp.Handler()) + log.Println("Starting HTTP server on :8080") + go func() { + log.Fatal(http.ListenAndServe(":8080", nil)) + }() + + wg.Wait() +} diff --git a/tools/load-generator/main.py b/tools/load-generator/main.py deleted file mode 100644 index 75c6f5450..000000000 --- a/tools/load-generator/main.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python - -import os -import time -import sys -import requests -import yaml -import threading -from datetime import timedelta - -from prometheus_client import start_http_server, Histogram, Counter - -namespace = "" -max_404_errors = 30 -domain_name = os.environ["DOMAIN_NAME"] - -class Querier(object): - """ - Querier launches groups of queries against a Prometheus service. - """ - - query_duration = Histogram("loadgen_query_duration_seconds", "Query duration", - ["prometheus", "group", "expr", "type"], - buckets=(0.05, 0.1, 0.3, 0.7, 1.5, 2.5, 4, 6, 8, 10, 13, 16, 20, 24, 29, 36, 42, 50, 60)) - - query_count = Counter('loadgen_queries_total', 'Total amount of queries', - ["prometheus", "group", "expr", "type"], - ) - query_fail_count = Counter('loadgen_failed_queries_total', 'Amount of failed queries', - ["prometheus", "group", "expr", "type"], - ) - - def __init__(self, groupID, target, pr_number, qg): - self.target = target - self.name = qg["name"] - self.groupID = groupID - self.numberOfErrors = 0 - - self.interval = duration_seconds(qg["interval"]) - self.queries = qg["queries"] - self.type = qg.get("type", "instant") - self.start = duration_seconds(qg.get("start", "0h")) - self.end = duration_seconds(qg.get("end", "0h")) - self.step = qg.get("step", "15s") - - if self.type == "instant": - self.url = "http://%s/%s/prometheus-%s/api/v1/query" % (domain_name, pr_number, target) - else: - self.url = "http://%s/%s/prometheus-%s/api/v1/query_range" % (domain_name, pr_number, target) - - def run(self): - print("run querier %s %s for %s" % (self.target, self.name, self.url)) - print("Waiting for 20 seconds to allow prometheus server (%s) to be properly set-up" % (self.url)) - time.sleep(20) - - while True: - start = time.time() - - for q in self.queries: - self.query(q["expr"]) - - wait = self.interval - (time.time() - start) - time.sleep(max(wait, 0)) - - def query(self, expr): - try: - Querier.query_count.labels(self.target, self.name, expr, self.type).inc() - start = time.time() - - params = {"query": expr} - if self.type == "range": - params["start"] = start - self.start - params["end"] = start - self.end - params["step"] = self.step - - resp = requests.get(self.url, params) - dur = time.time() - start - - if resp.status_code == 404: - print("WARNING :: GroupId#%d : Querier returned 404 for prometheus instance %s." % (self.groupID, self.url)) - self.numberOfErrors += 1 - if self.numberOfErrors == max_404_errors: - print("ERROR :: GroupId#%d : Querier returned 404 for prometheus instance %s %d times." % (self.groupID, self.url, max_404_errors)) - os._exit(1) - elif resp.status_code != 200: - print("WARNING :: GroupId#%d : Querier returned %d for prometheus instance %s." % (self.groupID, resp.status_code, self.url)) - else: - print("GroupId#%d : query %s %s, status=%s, size=%d, dur=%.3f" % (self.groupID, self.target, expr, resp.status_code, len(resp.text), dur)) - Querier.query_duration.labels(self.target, self.name, expr, self.type).observe(dur) - - except IOError as e: - Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() - print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) - - except Exception as e: - Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() - print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) - -def duration_seconds(s): - num = int(s[:-1]) - - if s.endswith('s'): - return timedelta(seconds=num).total_seconds() - elif s.endswith('m'): - return timedelta(minutes=num).total_seconds() - elif s.endswith('h'): - return timedelta(hours=num).total_seconds() - - raise "unknown duration %s" % s - -def main(): - if len(sys.argv) < 3: - print("unexpected arguments") - print("usage: ") - exit(2) - - global namespace - namespace = sys.argv[1] - pr_number = sys.argv[2] - - config = yaml.load(open("/etc/loadgen/config.yaml", 'r').read(), Loader=yaml.FullLoader) - - print("loaded configuration") - - for i,g in enumerate(config["querier"]["groups"]): - p = threading.Thread(target=Querier(i, "pr", pr_number, g).run) - p.start() - - for i,g in enumerate(config["querier"]["groups"]): - p = threading.Thread(target=Querier(i, "release", pr_number, g).run) - p.start() - - start_http_server(8080) - print("started HTTP server on 8080") - - while True: - time.sleep(100) - -if __name__ == "__main__": - main() diff --git a/tools/load-generator/requirements.txt b/tools/load-generator/requirements.txt deleted file mode 100644 index e75acd68b..000000000 --- a/tools/load-generator/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -PyYAML==6.0.1 -certifi==2024.6.2 -idna==3.7 -prometheus-client==0.20.0 -requests==2.32.3 -urllib3==2.2.2