Skip to content

Commit

Permalink
rewrite load-generator in go
Browse files Browse the repository at this point in the history
Signed-off-by: Vandit Singh <[email protected]>
  • Loading branch information
Vandit1604 committed Sep 6, 2024
1 parent ad9f80b commit 1b24d3d
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 157 deletions.
4 changes: 0 additions & 4 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,3 @@ updates:
patterns:
- "go.opentelemetry.io/*"
open-pull-requests-limit: 20
- package-ecosystem: "pip"
directory: "/tools/load-generator"
schedule:
interval: "monthly"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/tools/amGithubNotifier/amGithubNotifier
/tools/commentMonitor/commentMonitor
/tools/fake-webserver/fake-webserver
/tools/load-generator/load-generator
/tools/scaler/scaler
/funcbench/funcbench
/infra/infra
Expand Down
2 changes: 2 additions & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ build:
path: ./tools/fake-webserver
- name: tools/scaler
path: ./tools/scaler
- name: tools/load-generator
path: ./tools/load-generator/
flags: -a -tags netgo
crossbuild:
platforms:
Expand Down
11 changes: 4 additions & 7 deletions tools/load-generator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
FROM python:3
FROM quay.io/prometheus/busybox:latest

COPY ./requirements.txt /usr/src/app/
RUN pip install -r /usr/src/app/requirements.txt
LABEL maintainer="The Prometheus Authors <[email protected]>"

COPY ./*.py /usr/src/app/
COPY ./load-generator /bin/load-generator

WORKDIR /usr/src/app

ENTRYPOINT ["python", "-u", "./main.py"]
ENTRYPOINT [ "/bin/load-generator" ]
227 changes: 227 additions & 0 deletions tools/load-generator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package main

import (
"fmt"
"log"
"net/http"
"os"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gopkg.in/yaml.v2"
)

var (
namespace string
max404Errors = 30
domainName = os.Getenv("DOMAIN_NAME")
queryDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "loadgen_query_duration_seconds",
Help: "Query duration",
Buckets: prometheus.LinearBuckets(0.05, 0.1, 15),
},
[]string{"prometheus", "group", "expr", "type"},
)
queryCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "loadgen_queries_total",
Help: "Total amount of queries",
},
[]string{"prometheus", "group", "expr", "type"},
)
queryFailCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "loadgen_failed_queries_total",
Help: "Amount of failed queries",
},
[]string{"prometheus", "group", "expr", "type"},
)
)

type Querier struct {
target string
name string
groupID int
numberOfErrors int
interval time.Duration
queries []Query
queryType string
start time.Duration
end time.Duration
step string
url string
}

type Query struct {
Expr string `yaml:"expr"`
}

type QuerierGroup struct {
Name string `yaml:"name"`
Interval string `yaml:"interval"`
Queries []Query `yaml:"queries"`
Type string `yaml:"type,omitempty"`
Start string `yaml:"start,omitempty"`
End string `yaml:"end,omitempty"`
Step string `yaml:"step,omitempty"`
}

type Config struct {
Querier struct {
Groups []QuerierGroup `yaml:"groups"`
} `yaml:"querier"`
}

func (q *Querier) Run() {
fmt.Printf("Running querier %s %s for %s\n", q.target, q.name, q.url)
fmt.Printf("Waiting for 20 seconds to allow Prometheus server (%s) to be properly set up\n", q.url)
time.Sleep(20 * time.Second)

for {
startTime := time.Now()

for _, query := range q.queries {
q.executeQuery(query.Expr)
}

wait := q.interval - time.Since(startTime)
if wait > 0 {
time.Sleep(wait)
}
}
}

func (q *Querier) executeQuery(expr string) {
queryCount.WithLabelValues(q.target, q.name, expr, q.queryType).Inc()
start := time.Now()

params := map[string]string{
"query": expr,
}

if q.queryType == "range" {
params["start"] = fmt.Sprintf("%.0f", start.Add(-q.start).Unix())
params["end"] = fmt.Sprintf("%.0f", start.Add(-q.end).Unix())
params["step"] = q.step
}

resp, err := http.Get(fmt.Sprintf("%s?%s", q.url, params))
duration := time.Since(start).Seconds()

if err != nil {
log.Printf("WARNING :: GroupID#%d : Could not query Prometheus instance %s. \n %v", q.groupID, q.url, err)
queryFailCount.WithLabelValues(q.target, q.name, expr, q.queryType).Inc()
return
}

if resp.StatusCode == http.StatusNotFound {
log.Printf("WARNING :: GroupID#%d : Querier returned 404 for Prometheus instance %s.", q.groupID, q.url)
q.numberOfErrors++
if q.numberOfErrors == max404Errors {
log.Fatalf("ERROR :: GroupID#%d : Querier returned 404 for Prometheus instance %s %d times.", q.groupID, q.url, max404Errors)
}
} else if resp.StatusCode != http.StatusOK {
log.Printf("WARNING :: GroupID#%d : Querier returned %d for Prometheus instance %s.", q.groupID, resp.StatusCode, q.url)
} else {
log.Printf("GroupID#%d : query %s %s, status=%d, duration=%.3fs", q.groupID, q.target, expr, resp.StatusCode, duration)
queryDuration.WithLabelValues(q.target, q.name, expr, q.queryType).Observe(duration)
}
}

func durationToSeconds(duration string) time.Duration {
var dur time.Duration
var err error

switch {
case duration[len(duration)-1] == 's':
dur, err = time.ParseDuration(duration)
case duration[len(duration)-1] == 'm':
dur, err = time.ParseDuration(duration + "in")
case duration[len(duration)-1] == 'h':
dur, err = time.ParseDuration(duration + "our")
default:
log.Fatalf("unknown duration %s", duration)
}

if err != nil {
log.Fatalf("Invalid duration: %v", err)
}

return dur
}

func main() {
if len(os.Args) < 3 {
log.Fatalf("unexpected arguments\nusage: <load_generator> <namespace> <pr_number>")
}

namespace = os.Args[1]
prNumber := os.Args[2]

file, err := os.Open("/etc/loadgen/config.yaml")
if err != nil {
log.Fatalf("Could not open config file: %v", err)
}
defer file.Close()

var config Config
decoder := yaml.NewDecoder(file)
if err := decoder.Decode(&config); err != nil {
log.Fatalf("Could not parse config: %v", err)
}

fmt.Println("Loaded configuration")

var wg sync.WaitGroup

for i, g := range config.Querier.Groups {
wg.Add(1)
go func(i int, g QuerierGroup) {
defer wg.Done()
querier := &Querier{
target: "pr",
name: g.Name,
groupID: i,
interval: durationToSeconds(g.Interval),
queries: g.Queries,
queryType: g.Type,
start: durationToSeconds(g.Start),
end: durationToSeconds(g.End),
step: g.Step,
url: fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query", domainName, prNumber, "pr"),
}
querier.Run()
}(i, g)
}

for i, g := range config.Querier.Groups {
wg.Add(1)
go func(i int, g QuerierGroup) {
defer wg.Done()
querier := &Querier{
target: "release",
name: g.Name,
groupID: i,
interval: durationToSeconds(g.Interval),
queries: g.Queries,
queryType: g.Type,
start: durationToSeconds(g.Start),
end: durationToSeconds(g.End),
step: g.Step,
url: fmt.Sprintf("http://%s/%s/prometheus-%s/api/v1/query", domainName, prNumber, "release"),
}
querier.Run()
}(i, g)
}

http.Handle("/metrics", promhttp.Handler())
log.Println("Starting HTTP server on :8080")
go func() {
log.Fatal(http.ListenAndServe(":8080", nil))
}()

wg.Wait()
}
Loading

0 comments on commit 1b24d3d

Please sign in to comment.