Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement opentelemetry for distributed tracing #706

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 72 additions & 51 deletions bin/experiment/experiment.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"context"
"errors"
"flag"
"os"

// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"

Expand Down Expand Up @@ -59,10 +63,11 @@ import (
k6Loadgen "github.com/litmuschaos/litmus-go/experiments/load/k6-loadgen/experiment"
springBootFaults "github.com/litmuschaos/litmus-go/experiments/spring-boot/spring-boot-faults/experiment"
vmpoweroff "github.com/litmuschaos/litmus-go/experiments/vmware/vm-poweroff/experiment"

"github.com/litmuschaos/litmus-go/pkg/clients"
cli "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
)

func init() {
Expand All @@ -75,8 +80,24 @@ func init() {
}

func main() {
ctx := context.Background()

// Set up Observability.
if otelExporterEndpoint := os.Getenv(telemetry.OTELExporterOTLPEndpoint); otelExporterEndpoint != "" {
shutdown, err := telemetry.InitOTelSDK(ctx, true, otelExporterEndpoint)
if err != nil {
return
}
defer func() {
err = errors.Join(err, shutdown(ctx))
}()
ctx = telemetry.GetTraceParentContext()
}

clients := cli.ClientSets{}

clients := clients.ClientSets{}
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "ExecuteExperiment")
defer span.End()

// parse the experiment name
experimentName := flag.String("name", "pod-delete", "name of the chaos experiment")
Expand All @@ -92,101 +113,101 @@ func main() {
// invoke the corresponding experiment based on the (-name) flag
switch *experimentName {
case "container-kill":
containerKill.ContainerKill(clients)
containerKill.ContainerKill(ctx, clients)
case "disk-fill":
diskFill.DiskFill(clients)
diskFill.DiskFill(ctx, clients)
case "kafka-broker-pod-failure":
kafkaBrokerPodFailure.KafkaBrokerPodFailure(clients)
kafkaBrokerPodFailure.KafkaBrokerPodFailure(ctx, clients)
case "kubelet-service-kill":
kubeletServiceKill.KubeletServiceKill(clients)
kubeletServiceKill.KubeletServiceKill(ctx, clients)
case "docker-service-kill":
dockerServiceKill.DockerServiceKill(clients)
dockerServiceKill.DockerServiceKill(ctx, clients)
case "node-cpu-hog":
nodeCPUHog.NodeCPUHog(clients)
nodeCPUHog.NodeCPUHog(ctx, clients)
case "node-drain":
nodeDrain.NodeDrain(clients)
nodeDrain.NodeDrain(ctx, clients)
case "node-io-stress":
nodeIOStress.NodeIOStress(clients)
nodeIOStress.NodeIOStress(ctx, clients)
case "node-memory-hog":
nodeMemoryHog.NodeMemoryHog(clients)
nodeMemoryHog.NodeMemoryHog(ctx, clients)
case "node-taint":
nodeTaint.NodeTaint(clients)
nodeTaint.NodeTaint(ctx, clients)
case "pod-autoscaler":
podAutoscaler.PodAutoscaler(clients)
podAutoscaler.PodAutoscaler(ctx, clients)
case "pod-cpu-hog-exec":
podCPUHogExec.PodCPUHogExec(clients)
podCPUHogExec.PodCPUHogExec(ctx, clients)
case "pod-delete":
podDelete.PodDelete(clients)
podDelete.PodDelete(ctx, clients)
case "pod-io-stress":
podIOStress.PodIOStress(clients)
podIOStress.PodIOStress(ctx, clients)
case "pod-memory-hog-exec":
podMemoryHogExec.PodMemoryHogExec(clients)
podMemoryHogExec.PodMemoryHogExec(ctx, clients)
case "pod-network-corruption":
podNetworkCorruption.PodNetworkCorruption(clients)
podNetworkCorruption.PodNetworkCorruption(ctx, clients)
case "pod-network-duplication":
podNetworkDuplication.PodNetworkDuplication(clients)
podNetworkDuplication.PodNetworkDuplication(ctx, clients)
case "pod-network-latency":
podNetworkLatency.PodNetworkLatency(clients)
podNetworkLatency.PodNetworkLatency(ctx, clients)
case "pod-network-loss":
podNetworkLoss.PodNetworkLoss(clients)
podNetworkLoss.PodNetworkLoss(ctx, clients)
case "pod-network-partition":
podNetworkPartition.PodNetworkPartition(clients)
podNetworkPartition.PodNetworkPartition(ctx, clients)
case "pod-memory-hog":
podMemoryHog.PodMemoryHog(clients)
podMemoryHog.PodMemoryHog(ctx, clients)
case "pod-cpu-hog":
podCPUHog.PodCPUHog(clients)
podCPUHog.PodCPUHog(ctx, clients)
case "cassandra-pod-delete":
cassandraPodDelete.CasssandraPodDelete(clients)
cassandraPodDelete.CasssandraPodDelete(ctx, clients)
case "aws-ssm-chaos-by-id":
awsSSMChaosByID.AWSSSMChaosByID(clients)
awsSSMChaosByID.AWSSSMChaosByID(ctx, clients)
case "aws-ssm-chaos-by-tag":
awsSSMChaosByTag.AWSSSMChaosByTag(clients)
awsSSMChaosByTag.AWSSSMChaosByTag(ctx, clients)
case "ec2-terminate-by-id":
ec2TerminateByID.EC2TerminateByID(clients)
ec2TerminateByID.EC2TerminateByID(ctx, clients)
case "ec2-terminate-by-tag":
ec2TerminateByTag.EC2TerminateByTag(clients)
ec2TerminateByTag.EC2TerminateByTag(ctx, clients)
case "ebs-loss-by-id":
ebsLossByID.EBSLossByID(clients)
ebsLossByID.EBSLossByID(ctx, clients)
case "ebs-loss-by-tag":
ebsLossByTag.EBSLossByTag(clients)
ebsLossByTag.EBSLossByTag(ctx, clients)
case "node-restart":
nodeRestart.NodeRestart(clients)
nodeRestart.NodeRestart(ctx, clients)
case "pod-dns-error":
podDNSError.PodDNSError(clients)
podDNSError.PodDNSError(ctx, clients)
case "pod-dns-spoof":
podDNSSpoof.PodDNSSpoof(clients)
podDNSSpoof.PodDNSSpoof(ctx, clients)
case "pod-http-latency":
podHttpLatency.PodHttpLatency(clients)
podHttpLatency.PodHttpLatency(ctx, clients)
case "pod-http-status-code":
podHttpStatusCode.PodHttpStatusCode(clients)
podHttpStatusCode.PodHttpStatusCode(ctx, clients)
case "pod-http-modify-header":
podHttpModifyHeader.PodHttpModifyHeader(clients)
podHttpModifyHeader.PodHttpModifyHeader(ctx, clients)
case "pod-http-modify-body":
podHttpModifyBody.PodHttpModifyBody(clients)
podHttpModifyBody.PodHttpModifyBody(ctx, clients)
case "pod-http-reset-peer":
podHttpResetPeer.PodHttpResetPeer(clients)
podHttpResetPeer.PodHttpResetPeer(ctx, clients)
case "vm-poweroff":
vmpoweroff.VMPoweroff(clients)
vmpoweroff.VMPoweroff(ctx, clients)
case "azure-instance-stop":
azureInstanceStop.AzureInstanceStop(clients)
azureInstanceStop.AzureInstanceStop(ctx, clients)
case "azure-disk-loss":
azureDiskLoss.AzureDiskLoss(clients)
azureDiskLoss.AzureDiskLoss(ctx, clients)
case "gcp-vm-disk-loss":
gcpVMDiskLoss.VMDiskLoss(clients)
gcpVMDiskLoss.VMDiskLoss(ctx, clients)
case "pod-fio-stress":
podFioStress.PodFioStress(clients)
podFioStress.PodFioStress(ctx, clients)
case "gcp-vm-instance-stop":
gcpVMInstanceStop.VMInstanceStop(clients)
gcpVMInstanceStop.VMInstanceStop(ctx, clients)
case "redfish-node-restart":
redfishNodeRestart.NodeRestart(clients)
redfishNodeRestart.NodeRestart(ctx, clients)
case "gcp-vm-instance-stop-by-label":
gcpVMInstanceStopByLabel.GCPVMInstanceStopByLabel(clients)
gcpVMInstanceStopByLabel.GCPVMInstanceStopByLabel(ctx, clients)
case "gcp-vm-disk-loss-by-label":
gcpVMDiskLossByLabel.GCPVMDiskLossByLabel(clients)
gcpVMDiskLossByLabel.GCPVMDiskLossByLabel(ctx, clients)
case "spring-boot-cpu-stress", "spring-boot-memory-stress", "spring-boot-exceptions", "spring-boot-app-kill", "spring-boot-faults", "spring-boot-latency":
springBootFaults.Experiment(clients, *experimentName)
springBootFaults.Experiment(ctx, clients, *experimentName)
case "k6-loadgen":
k6Loadgen.Experiment(clients)
k6Loadgen.Experiment(ctx, clients)
default:
log.Errorf("Unsupported -name %v, please provide the correct value of -name args", *experimentName)
return
Expand Down
26 changes: 23 additions & 3 deletions bin/helper/helper.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"context"
"errors"
"flag"
"os"

// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"

Expand All @@ -17,10 +21,11 @@ import (
networkChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/network-chaos/helper"
dnsChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/pod-dns-chaos/helper"
stressChaos "github.com/litmuschaos/litmus-go/chaoslib/litmus/stress-chaos/helper"

"github.com/litmuschaos/litmus-go/pkg/clients"
cli "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
)

func init() {
Expand All @@ -33,8 +38,23 @@ func init() {
}

func main() {
ctx := context.Background()
// Set up Observability.
if otelExporterEndpoint := os.Getenv(telemetry.OTELExporterOTLPEndpoint); otelExporterEndpoint != "" {
shutdown, err := telemetry.InitOTelSDK(ctx, true, otelExporterEndpoint)
if err != nil {
return
}
defer func() {
err = errors.Join(err, shutdown(ctx))
}()
ctx = telemetry.GetTraceParentContext()
}

clients := cli.ClientSets{}

clients := clients.ClientSets{}
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "ExecuteExperimentHelper")
defer span.End()

// parse the helper name
helperName := flag.String("name", "", "name of the helper pod")
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Multi-stage docker build
# Build stage
FROM golang:1.20 AS builder
FROM golang:1.22 AS builder

ARG TARGETOS=linux
ARG TARGETARCH
Expand Down
11 changes: 6 additions & 5 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm-chaos.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package lib

import (
"context"
"os"
"strings"
"time"

experimentTypes "github.com/litmuschaos/litmus-go/pkg/aws-ssm/aws-ssm-chaos/types"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/cloud/aws/ssm"
"github.com/litmuschaos/litmus-go/pkg/events"
"github.com/litmuschaos/litmus-go/pkg/log"
Expand All @@ -17,7 +18,7 @@ import (
)

// InjectChaosInSerialMode will inject the aws ssm chaos in serial mode that is one after other
func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
func InjectChaosInSerialMode(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {

select {
case <-inject:
Expand Down Expand Up @@ -60,7 +61,7 @@ func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetai

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 && i == 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand All @@ -85,7 +86,7 @@ func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetai
}

// InjectChaosInParallelMode will inject the aws ssm chaos in parallel mode that is all at once
func InjectChaosInParallelMode(experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {
func InjectChaosInParallelMode(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, instanceIDList []string, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails, inject chan os.Signal) error {

select {
case <-inject:
Expand Down Expand Up @@ -125,7 +126,7 @@ func InjectChaosInParallelMode(experimentsDetails *experimentTypes.ExperimentDet

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand Down
13 changes: 9 additions & 4 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm/aws-ssm-chaos-by-id.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ssm

import (
"context"
"fmt"
"os"
"os/signal"
Expand All @@ -10,12 +11,14 @@ import (
"github.com/litmuschaos/litmus-go/chaoslib/litmus/aws-ssm-chaos/lib"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/aws-ssm/aws-ssm-chaos/types"
"github.com/litmuschaos/litmus-go/pkg/cerrors"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/cloud/aws/ssm"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
)

var (
Expand All @@ -24,7 +27,9 @@ var (
)

// PrepareAWSSSMChaosByID contains the prepration and injection steps for the experiment
func PrepareAWSSSMChaosByID(experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {
func PrepareAWSSSMChaosByID(ctx context.Context, experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {
ctx, span := otel.Tracer(telemetry.TracerName).Start(ctx, "InjectAWSChaosByIDChaos")
defer span.End()

// inject channel is used to transmit signal notifications.
inject = make(chan os.Signal, 1)
Expand Down Expand Up @@ -60,11 +65,11 @@ func PrepareAWSSSMChaosByID(experimentsDetails *experimentTypes.ExperimentDetail

switch strings.ToLower(experimentsDetails.Sequence) {
case "serial":
if err = lib.InjectChaosInSerialMode(experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
if err = lib.InjectChaosInSerialMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
return stacktrace.Propagate(err, "could not run chaos in serial mode")
}
case "parallel":
if err = lib.InjectChaosInParallelMode(experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
if err = lib.InjectChaosInParallelMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
return stacktrace.Propagate(err, "could not run chaos in parallel mode")
}
default:
Expand Down
Loading
Loading