From 555e5a750b3ddfa643334e67120ec8966520b2ea Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Tue, 25 Apr 2023 15:27:09 -0400 Subject: [PATCH] add stress test (#609) --- cmd/root.go | 3 + config/stress-test/config.yaml | 13 ++ config/stress-test/deployment.yaml | 56 +++++ config/stress-test/kustomization.yaml | 9 + config/stress-test/namespace.yaml | 4 + config/stress-test/rbac/kustomization.yaml | 4 + config/stress-test/rbac/role.yaml | 16 ++ config/stress-test/rbac/role_binding.yaml | 11 + config/stress-test/rbac/service_account.yaml | 4 + config/stress-test/service.yaml | 12 + go.mod | 6 +- internal/stress/cmd/command.go | 173 ++++++++++++++ internal/stress/echo.go | 37 +++ internal/stress/ingress.go | 232 +++++++++++++++++++ internal/stress/traffic.go | 69 ++++++ 15 files changed, 646 insertions(+), 3 deletions(-) create mode 100644 config/stress-test/config.yaml create mode 100644 config/stress-test/deployment.yaml create mode 100644 config/stress-test/kustomization.yaml create mode 100644 config/stress-test/namespace.yaml create mode 100644 config/stress-test/rbac/kustomization.yaml create mode 100644 config/stress-test/rbac/role.yaml create mode 100644 config/stress-test/rbac/role_binding.yaml create mode 100644 config/stress-test/rbac/service_account.yaml create mode 100644 config/stress-test/service.yaml create mode 100644 internal/stress/cmd/command.go create mode 100644 internal/stress/echo.go create mode 100644 internal/stress/ingress.go create mode 100644 internal/stress/traffic.go diff --git a/cmd/root.go b/cmd/root.go index c5abedab..5a484563 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/spf13/cobra" + + stress_cmd "github.com/pomerium/ingress-controller/internal/stress/cmd" ) // RootCommand generates default secrets @@ -18,6 +20,7 @@ func RootCommand() (*cobra.Command, error) { "gen-secrets": GenSecretsCommand, "controller": ControllerCommand, "all-in-one": AllInOneCommand, + "stress-test": stress_cmd.Command, } { cmd, err := fn() if err != nil { diff --git a/config/stress-test/config.yaml b/config/stress-test/config.yaml new file mode 100644 index 00000000..4e7bda06 --- /dev/null +++ b/config/stress-test/config.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: stress-test +data: + # how many ingresses to create + ingress-count: "100" + # what is the domain name to use for the ingresses + ingress-domain: "" + # how long to wait for the ingress to be ready. + # this may be proportional to the number of ingresses + # the test would crash and start from scratch if the readiness timeout is not long enough + readiness-timeout: "5m" diff --git a/config/stress-test/deployment.yaml b/config/stress-test/deployment.yaml new file mode 100644 index 00000000..b2826381 --- /dev/null +++ b/config/stress-test/deployment.yaml @@ -0,0 +1,56 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: stress-test +spec: + replicas: 1 + template: + spec: + serviceAccountName: pomerium-stress-test + containers: + - name: stress-test + args: + - "stress-test" + image: pomerium/ingress-controller:main + imagePullPolicy: Always + resources: + limits: + memory: "256Mi" + cpu: "500m" + env: + - name: SERVICE_NAME + value: "stress-test-echo" + - name: SERVICE_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: SERVICE_PORT_NAMES + value: "echo1,echo2" + - name: CONTAINER_PORT_NUMBERS + value: "8081,8082" + - name: INGRESS_CLASS + value: "pomerium" + - name: INGRESS_DOMAIN + valueFrom: + configMapKeyRef: + optional: false + name: stress-test + key: ingress-domain + - name: INGRESS_COUNT + valueFrom: + configMapKeyRef: + optional: false + name: stress-test + key: ingress-count + - name: READINESS_TIMEOUT + valueFrom: + configMapKeyRef: + optional: false + name: stress-test + key: readiness-timeout + ports: + - containerPort: 8081 + name: echo1 + - containerPort: 8082 + name: echo2 diff --git a/config/stress-test/kustomization.yaml b/config/stress-test/kustomization.yaml new file mode 100644 index 00000000..0c9cf62c --- /dev/null +++ b/config/stress-test/kustomization.yaml @@ -0,0 +1,9 @@ +namespace: pomerium-stress-test +commonLabels: + app.kubernetes.io/name: pomerium-stress-test +resources: + - namespace.yaml + - ./rbac + - config.yaml + - deployment.yaml + - service.yaml diff --git a/config/stress-test/namespace.yaml b/config/stress-test/namespace.yaml new file mode 100644 index 00000000..8ff99f03 --- /dev/null +++ b/config/stress-test/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: pomerium-stress-test diff --git a/config/stress-test/rbac/kustomization.yaml b/config/stress-test/rbac/kustomization.yaml new file mode 100644 index 00000000..25a37bf7 --- /dev/null +++ b/config/stress-test/rbac/kustomization.yaml @@ -0,0 +1,4 @@ +resources: + - role.yaml + - role_binding.yaml + - service_account.yaml diff --git a/config/stress-test/rbac/role.yaml b/config/stress-test/rbac/role.yaml new file mode 100644 index 00000000..21731a72 --- /dev/null +++ b/config/stress-test/rbac/role.yaml @@ -0,0 +1,16 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: pomerium-stress-test +rules: + - apiGroups: + - networking.k8s.io + resources: + - ingresses + verbs: + - get + - list + - create + - update + - delete diff --git a/config/stress-test/rbac/role_binding.yaml b/config/stress-test/rbac/role_binding.yaml new file mode 100644 index 00000000..d68c53cc --- /dev/null +++ b/config/stress-test/rbac/role_binding.yaml @@ -0,0 +1,11 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: pomerium-stress-test +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: pomerium-stress-test +subjects: + - kind: ServiceAccount + name: pomerium-stress-test diff --git a/config/stress-test/rbac/service_account.yaml b/config/stress-test/rbac/service_account.yaml new file mode 100644 index 00000000..1287dfcb --- /dev/null +++ b/config/stress-test/rbac/service_account.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: pomerium-stress-test diff --git a/config/stress-test/service.yaml b/config/stress-test/service.yaml new file mode 100644 index 00000000..d26f308b --- /dev/null +++ b/config/stress-test/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: stress-test-echo +spec: + ports: + - port: 8081 + name: echo1 + targetPort: echo1 + - port: 8082 + name: echo2 + targetPort: echo2 diff --git a/go.mod b/go.mod index a1bea47e..91fd5b07 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/pomerium/ingress-controller go 1.20 require ( + github.com/cenkalti/backoff/v4 v4.2.0 github.com/client9/misspell v0.3.4 github.com/envoyproxy/go-control-plane v0.11.0 github.com/go-logr/logr v1.2.4 @@ -17,7 +18,9 @@ require ( github.com/iancoleman/strcase v0.2.0 github.com/martinlindhe/base36 v1.1.1 github.com/open-policy-agent/opa v0.51.0 + github.com/pomerium/csrf v1.7.0 github.com/pomerium/pomerium v0.20.1-0.20230421153948-65e0fcb667a6 + github.com/rs/zerolog v1.29.0 github.com/sergi/go-diff v1.3.1 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 @@ -96,7 +99,6 @@ require ( github.com/breml/errchkjson v0.3.1 // indirect github.com/butuzov/ireturn v0.1.1 // indirect github.com/caddyserver/certmagic v0.17.2 // indirect - github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/charithe/durationcheck v0.0.10 // indirect @@ -239,7 +241,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/polyfloyd/go-errorlint v1.4.0 // indirect - github.com/pomerium/csrf v1.7.0 // indirect github.com/pomerium/datasource v0.18.2-0.20221108160055-c6134b5ed524 // indirect github.com/pomerium/webauthn v0.0.0-20221118023040-00a9c430578b // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect @@ -255,7 +256,6 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rs/cors v1.8.3 // indirect - github.com/rs/zerolog v1.29.0 // indirect github.com/ryancurrah/gomodguard v1.3.0 // indirect github.com/ryanrolds/sqlclosecheck v0.4.0 // indirect github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect diff --git a/internal/stress/cmd/command.go b/internal/stress/cmd/command.go new file mode 100644 index 00000000..b78970da --- /dev/null +++ b/internal/stress/cmd/command.go @@ -0,0 +1,173 @@ +// Package cmd provides the stress test command +package cmd + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/rs/zerolog" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + + "github.com/pomerium/ingress-controller/internal/stress" +) + +// Command returns the stress test command +func Command() (*cobra.Command, error) { + return &cobra.Command{ + Use: "stress-test", + Short: "stress test the ingress controller", + RunE: func(cmd *cobra.Command, args []string) error { + setupLogger() + return run(withInterruptSignal(context.Background())) + }, + }, nil +} + +func setupLogger() { + logger := zerolog.New(os.Stdout) + zerolog.SetGlobalLevel(zerolog.DebugLevel) + zerolog.DefaultContextLogger = &logger +} + +func testConfigFromEnv() (*stress.IngressLoadTestConfig, error) { + svcName := os.Getenv("SERVICE_NAME") + if svcName == "" { + return nil, fmt.Errorf("SERVICE_NAME environment variable not set") + } + + svcNamespace := os.Getenv("SERVICE_NAMESPACE") + if svcNamespace == "" { + return nil, fmt.Errorf("SERVICE_NAMESPACE environment variable not set") + } + + servicePortNames := strings.Split(os.Getenv("SERVICE_PORT_NAMES"), ",") + if len(servicePortNames) != 2 { + return nil, fmt.Errorf("SERVICE_PORT_NAMES environment variable must have exacly two comma separated values") + } + + domain := os.Getenv("INGRESS_DOMAIN") + if domain == "" { + return nil, fmt.Errorf("INGRESS_DOMAIN environment variable not set") + } + + ingressCount, err := strconv.Atoi(os.Getenv("INGRESS_COUNT")) + if err != nil { + return nil, fmt.Errorf("failed to parse INGRESS_COUNT: %w", err) + } + + ingressClass := os.Getenv("INGRESS_CLASS") + if ingressClass == "" { + return nil, fmt.Errorf("INGRESS_CLASS environment variable not set") + } + + readinessTimeout, err := time.ParseDuration(os.Getenv("READINESS_TIMEOUT")) + if err != nil { + return nil, fmt.Errorf("failed to parse READINESS_TIMEOUT: %w", err) + } + + return &stress.IngressLoadTestConfig{ + ReadinessTimeout: readinessTimeout, + IngressClass: ingressClass, + IngressCount: ingressCount, + ServicePortNames: servicePortNames, + ServiceName: types.NamespacedName{Name: svcName, Namespace: svcNamespace}, + Domain: domain, + }, nil +} + +func getKubeClient() (*kubernetes.Clientset, error) { + var kubeconfig *rest.Config + kubeconfigPath := filepath.Join(homedir.HomeDir(), ".kube", "config") + if _, err := os.Stat(kubeconfigPath); err == nil { + kubeconfig, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed to build kubeconfig from %s: %w", kubeconfigPath, err) + } + } else { + kubeconfig, err = rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to get in cluster kubeconfig: %w", err) + } + } + + client, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes client: %w", err) + } + + return client, nil +} + +func run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return runTest(ctx) + }) + eg.Go(func() error { + return runEcho(ctx) + }) + return eg.Wait() +} + +func runEcho(ctx context.Context) error { + ports := strings.Split(os.Getenv("CONTAINER_PORT_NUMBERS"), ",") + if len(ports) != 2 { + return fmt.Errorf("CONTAINER_PORT_NUMBERS should have exactly two comma separated values") + } + eg, ctx := errgroup.WithContext(ctx) + for _, port := range ports { + port := port + eg.Go(func() error { + n, err := strconv.Atoi(port) + if err != nil { + return fmt.Errorf("failed to parse port number: %w", err) + } + return stress.RunHTTPEchoServer(ctx, fmt.Sprintf(":%d", n)) + }) + } + return eg.Wait() +} + +func runTest(ctx context.Context) error { + cfg, err := testConfigFromEnv() + if err != nil { + return err + } + + client, err := getKubeClient() + if err != nil { + return err + } + + cfg.Client = client + srv := stress.IngressLoadTest{IngressLoadTestConfig: *cfg} + err = srv.Run(ctx) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("error running stress test...") + } + return err +} + +func withInterruptSignal(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func() { + ch := make(chan os.Signal, 2) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + <-ch + cancel() + }() + return ctx +} diff --git a/internal/stress/echo.go b/internal/stress/echo.go new file mode 100644 index 00000000..d4ed9395 --- /dev/null +++ b/internal/stress/echo.go @@ -0,0 +1,37 @@ +package stress + +import ( + "context" + "net/http" + "time" + + "github.com/rs/zerolog" +) + +// RunHTTPEchoServer runs a HTTP server that responds with a 200 OK to all requests +func RunHTTPEchoServer(ctx context.Context, addr string) error { + log := zerolog.Ctx(ctx).With().Str("addr", addr).Logger() + log.Info().Msg("starting echo server...") + s := &http.Server{ + Addr: addr, + ReadHeaderTimeout: 10 * time.Second, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK\n")) + }), + } + go func() { + <-ctx.Done() + log.Info().Msg("stopping echo server...") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + _ = s.Shutdown(shutdownCtx) + cancel() + }() + err := s.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.Err(err).Msg("echo server terminated with error") + return err + } + log.Info().Msg("echo server stopped") + return nil +} diff --git a/internal/stress/ingress.go b/internal/stress/ingress.go new file mode 100644 index 00000000..8aa1ee59 --- /dev/null +++ b/internal/stress/ingress.go @@ -0,0 +1,232 @@ +// Package stress provides a set of stress tests for the ingress controller +package stress + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/rs/zerolog" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// IngressLoadTestConfig is the configuration for the ingress load test +type IngressLoadTestConfig struct { + // ReadinessTimeout is the timeout to wait for the ingress to become ready + ReadinessTimeout time.Duration + // IngressClass is the ingress controller class + IngressClass string + // IngressCount is the number of ingresses to create and mutate + IngressCount int + // Domain is the domain to use for the ingresses + Domain string + // ServiceName is the name of the service to point the ingresses at + ServiceName types.NamespacedName + // ServicePortNames is the list of ports to use on the service, should be more then one + // so that updates to the ingress can be tested + ServicePortNames []string + // Client is kubernetes client + Client *kubernetes.Clientset +} + +// IngressLoadTest is the ingress load test +type IngressLoadTest struct { + IngressLoadTestConfig + + log zerolog.Logger + ingresses []networkingv1.Ingress +} + +// Run runs the ingress load test +func (l *IngressLoadTest) Run(ctx context.Context) (err error) { + l.log = zerolog.Ctx(ctx).With().Str("component", "ingress-load-test").Logger() + + if err := l.cleanup(ctx); err != nil { + return fmt.Errorf("cleanup before run: %w", err) + } + + defer func() { + if err != nil { + l.log.Error().Err(err).Msg("run failed, cleaning up") + } else { + l.log.Info().Msg("cleaning up...") + } + + cleanupCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := l.cleanup(cleanupCtx); err != nil { + l.log.Error().Err(err).Msg("failed to cleanup") + } else { + l.log.Info().Msg("clean up completed") + } + }() + + if err := l.createIngress(ctx); err != nil { + return fmt.Errorf("create ingress: %w", err) + } + + if err := l.waitIngressAvailable(ctx, 0); err != nil { + return fmt.Errorf("wait ingress available: %w", err) + } + + i := 0 + for ctx.Err() == nil { + i++ + l.log.Info().Int("iteration", i).Msg("starting iteration") + + if err := l.updateIngress(ctx, i); err != nil { + return fmt.Errorf("update ingress: %w", err) + } + if err := l.waitIngressAvailable(ctx, i); err != nil { + return fmt.Errorf("wait ingress available: %w", err) + } + } + + return ctx.Err() +} + +func (l *IngressLoadTest) getHTTPClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } +} + +func (l *IngressLoadTest) getURLs() []string { + var urls []string + for _, ingress := range l.ingresses { + urls = append(urls, (&url.URL{Scheme: "https", Host: ingress.Spec.Rules[0].Host, Path: "/"}).String()) + } + return urls +} + +func (l *IngressLoadTest) waitIngressAvailable(ctx context.Context, i int) error { + ctx, cancel := context.WithTimeout(ctx, l.ReadinessTimeout) + defer cancel() + + httpClient := l.getHTTPClient() + + l.log.Info().Msg("waiting for ingresses to become available") + now := time.Now() + err := AwaitReadyMulti(ctx, httpClient, l.getURLs(), getHeaderForIteration(i)) + if err != nil { + l.log.Err(err).Msg("failed to wait for ingress to become available") + return fmt.Errorf("failed to wait for ingress to become available: %w", err) + } + l.log.Info().Dur("elapsed", time.Since(now)).Msg("ingresses are available") + return nil +} + +func getHeaderForIteration(i int) map[string]string { + return map[string]string{ + "x-pomerium-stress-test-iteration": fmt.Sprintf("%d", i), + } +} + +func getAnnotationForIteration(i int) map[string]string { + txt, err := json.Marshal(getHeaderForIteration(i)) + if err != nil { + panic(err) + } + return map[string]string{ + "ingress.pomerium.io/set_response_headers": string(txt), + "ingress.pomerium.io/allow_public_unauthenticated_access": "true", + } +} + +func (l *IngressLoadTest) updateIngress(ctx context.Context, i int) error { + l.log.Info().Msg("updating ingresses") + now := time.Now() + // update the ingress by setting the service port to the next one in the list + // and adding an annotation that updates the response header + for _, ingress := range l.ingresses { + obj, err := l.Client.NetworkingV1().Ingresses(ingress.Namespace).Get(ctx, ingress.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get ingress %s: %w", ingress.Name, err) + } + obj.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend.Service.Port.Name = l.ServicePortNames[i%len(l.ServicePortNames)] + obj.Annotations = getAnnotationForIteration(i) + if _, err := l.Client.NetworkingV1().Ingresses(ingress.Namespace).Update(ctx, obj, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update ingress %s: %w", ingress.Name, err) + } + } + + l.log.Info().Dur("elapsed", time.Since(now)).Msg("ingresses updated") + return nil +} + +// createIngress creates ingresses +func (l *IngressLoadTest) createIngress(ctx context.Context) error { + l.log.Info().Msg("creating ingresses...") + pathType := networkingv1.PathTypePrefix + for i := 0; i < l.IngressCount; i++ { + spec := networkingv1.IngressSpec{ + IngressClassName: &l.IngressClass, + Rules: []networkingv1.IngressRule{ + { + Host: fmt.Sprintf("ingress-%d.%s", i, l.Domain), + IngressRuleValue: networkingv1.IngressRuleValue{ + HTTP: &networkingv1.HTTPIngressRuleValue{ + Paths: []networkingv1.HTTPIngressPath{ + { + Path: "/", + PathType: &pathType, + Backend: networkingv1.IngressBackend{ + Service: &networkingv1.IngressServiceBackend{ + Name: l.ServiceName.Name, + Port: networkingv1.ServiceBackendPort{ + Name: l.ServicePortNames[0], + }}}}}}}}}} + + annotations := getAnnotationForIteration(0) + + obj, err := l.Client.NetworkingV1().Ingresses(l.ServiceName.Namespace).Create(ctx, &networkingv1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("ingress-%d", i), + Labels: map[string]string{ + // default label indicating this ingress was created by this test + "ingress-stress-test": "true", + }, + Annotations: annotations, + }, + Spec: spec, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ingress %+v: %w", spec, err) + } + l.ingresses = append(l.ingresses, *obj) + } + l.log.Info().Int("count", len(l.ingresses)).Msg("created ingresses") + return nil +} + +// cleanup deletes all ingresses created by this test based on the labels +func (l *IngressLoadTest) cleanup(ctx context.Context) error { + l.log.Info().Msg("cleaning up ingresses") + + ingresses, err := l.Client.NetworkingV1().Ingresses(l.ServiceName.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "ingress-stress-test=true", + }) + if err != nil { + return fmt.Errorf("failed to list ingresses: %w", err) + } + + // delete all ingresses + for _, ingress := range ingresses.Items { + if err := l.Client.NetworkingV1().Ingresses(l.ServiceName.Namespace).Delete(ctx, ingress.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("failed to delete ingress %s: %w", ingress.Name, err) + } + } + + l.log.Info().Int("count", len(ingresses.Items)).Msg("deleted ingresses") + return nil +} diff --git a/internal/stress/traffic.go b/internal/stress/traffic.go new file mode 100644 index 00000000..4e920ddc --- /dev/null +++ b/internal/stress/traffic.go @@ -0,0 +1,69 @@ +package stress + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" +) + +// AwaitReadyMulti concurrently waits for multiple HTTP servers to respond with a given status code to a given URL +func AwaitReadyMulti(ctx context.Context, client *http.Client, urls []string, expectHeaders map[string]string) error { + eg, ctx := errgroup.WithContext(ctx) + + for _, url := range urls { + url := url + eg.Go(func() error { + return AwaitReady(ctx, client, url, expectHeaders) + }) + } + return eg.Wait() +} + +// AwaitReady waits for a HTTP server to respond with a given status code to a given URL +func AwaitReady(ctx context.Context, client *http.Client, url string, expectHeaders map[string]string) error { + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 0 + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(bo.NextBackOff()): + } + + err := tryRequest(ctx, client, url, expectHeaders) + if err == nil { + return nil + } + zerolog.Ctx(ctx).Error().Err(err).Str("url", url).Msg("waiting for status") + } +} + +func tryRequest(ctx context.Context, client *http.Client, url string, expectHeaders map[string]string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("new request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + return err + } + _ = resp.Body.Close() + if resp.StatusCode/100 != 2 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + for k, v := range expectHeaders { + if resp.Header.Get(k) != v { + return fmt.Errorf("unexpected header value for %s: want %s, got %s", k, v, resp.Header.Get(k)) + } + } + + return nil +}