-
Notifications
You must be signed in to change notification settings - Fork 694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Litmus 3.0: Added enhancements in subscriber for Litmus 3.0 #4027
Merged
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
92bb9b1
feat: Litmus 3.0: Added subscriber base setup
amityt fc849f1
feat: Litmus 3.0: Added events for chaosengine and workflows
amityt ea27b4d
feat: Litmus 3.0: Added logs and minor changes
amityt 78348ca
updated logs
amityt 3764b60
Merge branch 'master' into 3.0.0-subscriber
Saranya-jena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
module subscriber | ||
|
||
go 1.18 | ||
|
||
require ( | ||
github.com/argoproj/argo-workflows/v3 v3.3.1 | ||
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 | ||
github.com/gorilla/websocket v1.5.0 | ||
github.com/kelseyhightower/envconfig v1.4.0 | ||
github.com/litmuschaos/chaos-operator v0.0.0-20230109130222-de7c74a937a9 | ||
github.com/sirupsen/logrus v1.8.1 | ||
gopkg.in/yaml.v2 v2.4.0 | ||
k8s.io/api v0.23.3 | ||
k8s.io/apimachinery v0.23.3 | ||
k8s.io/client-go v12.0.0+incompatible | ||
) | ||
|
||
require ( | ||
github.com/PuerkitoBio/purell v1.1.1 // indirect | ||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/emicklei/go-restful v2.15.0+incompatible // indirect | ||
github.com/go-logr/logr v1.2.2 // indirect | ||
github.com/go-openapi/jsonpointer v0.19.5 // indirect | ||
github.com/go-openapi/jsonreference v0.19.6 // indirect | ||
github.com/go-openapi/swag v0.19.15 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/golang/protobuf v1.5.2 // indirect | ||
github.com/google/go-cmp v0.5.7 // indirect | ||
github.com/google/gofuzz v1.2.0 // indirect | ||
github.com/googleapis/gnostic v0.5.5 // indirect | ||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | ||
github.com/hashicorp/golang-lru v0.5.3 // indirect | ||
github.com/imdario/mergo v0.3.12 // indirect | ||
github.com/josharian/intern v1.0.0 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/mailru/easyjson v0.7.7 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/pkg/errors v0.9.1 // indirect | ||
github.com/spf13/pflag v1.0.5 // indirect | ||
github.com/stretchr/testify v1.8.2 // indirect | ||
golang.org/x/net v0.6.0 // indirect | ||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect | ||
golang.org/x/sys v0.5.0 // indirect | ||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect | ||
golang.org/x/text v0.7.0 // indirect | ||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect | ||
google.golang.org/appengine v1.6.7 // indirect | ||
google.golang.org/genproto v0.0.0-20220218161850-94dd64e39d7c // indirect | ||
google.golang.org/grpc v1.44.0 // indirect | ||
google.golang.org/protobuf v1.28.1 // indirect | ||
gopkg.in/inf.v0 v0.9.1 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
k8s.io/klog/v2 v2.40.1 // indirect | ||
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf // indirect | ||
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect | ||
sigs.k8s.io/controller-runtime v0.11.1 // indirect | ||
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect | ||
sigs.k8s.io/yaml v1.3.0 // indirect | ||
) | ||
|
||
// Pinned to kubernetes-1.21.2 | ||
replace ( | ||
github.com/docker/docker => github.com/moby/moby v0.7.3-0.20190826074503-38ab9da00309 | ||
|
||
github.com/emicklei/go-restful => github.com/emicklei/go-restful v2.16.0+incompatible | ||
|
||
golang.org/x/net => golang.org/x/net v0.0.0-20220906165146-f3363e06e74c | ||
golang.org/x/text => golang.org/x/text v0.3.8 | ||
k8s.io/api => k8s.io/api v0.21.2 | ||
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.21.2 | ||
k8s.io/apimachinery => k8s.io/apimachinery v0.21.2 | ||
k8s.io/apiserver => k8s.io/apiserver v0.21.2 | ||
k8s.io/cli-runtime => k8s.io/cli-runtime v0.21.2 | ||
k8s.io/client-go => k8s.io/client-go v0.21.2 | ||
k8s.io/cloud-provider => k8s.io/cloud-provider v0.21.2 | ||
k8s.io/code-generator => k8s.io/code-generator v0.21.2 | ||
k8s.io/component-base => k8s.io/component-base v0.21.2 | ||
k8s.io/cri-api => k8s.io/cri-api v0.21.2 | ||
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.21.2 | ||
k8s.io/infra-bootstrap => k8s.io/infra-bootstrap v0.21.2 | ||
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.21.2 | ||
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.21.2 | ||
k8s.io/kube-proxy => k8s.io/kube-proxy v0.21.2 | ||
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.21.2 | ||
k8s.io/kubectl => k8s.io/kubectl v0.21.2 | ||
k8s.io/kubelet => k8s.io/kubelet v0.21.2 | ||
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.21.2 | ||
k8s.io/metrics => k8s.io/metrics v0.21.2 | ||
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.21.2 | ||
) |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,211 @@ | ||||||
package events | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"strconv" | ||||||
"subscriber/pkg/k8s" | ||||||
"subscriber/pkg/types" | ||||||
|
||||||
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" | ||||||
chaosTypes "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" | ||||||
"github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned" | ||||||
litmusV1alpha1 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1" | ||||||
"github.com/litmuschaos/chaos-operator/pkg/client/informers/externalversions" | ||||||
"github.com/sirupsen/logrus" | ||||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
"k8s.io/apimachinery/pkg/runtime/schema" | ||||||
mergeType "k8s.io/apimachinery/pkg/types" | ||||||
"k8s.io/client-go/tools/cache" | ||||||
) | ||||||
|
||||||
// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher | ||||||
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) { | ||||||
startTime, err := strconv.Atoi(infraData["START_TIME"]) | ||||||
if err != nil { | ||||||
logrus.WithError(err).Fatal("failed to parse startTime") | ||||||
} | ||||||
|
||||||
cfg, err := k8s.GetKubeConfig() | ||||||
if err != nil { | ||||||
logrus.WithError(err).Fatal("could not get config") | ||||||
} | ||||||
|
||||||
// ClientSet to create Informer | ||||||
clientSet, err := versioned.NewForConfig(cfg) | ||||||
if err != nil { | ||||||
logrus.WithError(err).Fatal("could not generate dynamic client for config") | ||||||
} | ||||||
|
||||||
// Create a factory object to watch workflows depending on default scope | ||||||
f := externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, | ||||||
externalversions.WithTweakListOptions(func(list *v1.ListOptions) { | ||||||
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID) | ||||||
})) | ||||||
|
||||||
informer := f.Litmuschaos().V1alpha1().ChaosEngines().Informer() | ||||||
if InfraScope == "namespace" { | ||||||
f = externalversions.NewSharedInformerFactoryWithOptions(clientSet, resyncPeriod, externalversions.WithNamespace(InfraNamespace), | ||||||
externalversions.WithTweakListOptions(func(list *v1.ListOptions) { | ||||||
list.LabelSelector = fmt.Sprintf("infra_id=%s,type=standalone_workflow", InfraID) | ||||||
})) | ||||||
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer() | ||||||
} | ||||||
|
||||||
go startWatchEngine(stopCh, informer, stream, int64(startTime)) | ||||||
} | ||||||
|
||||||
// handles the different events events - add, update and delete | ||||||
func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) { | ||||||
handlers := cache.ResourceEventHandlerFuncs{ | ||||||
AddFunc: func(obj interface{}) { | ||||||
chaosEventHandler(obj, "ADD", stream, startTime) | ||||||
}, | ||||||
UpdateFunc: func(oldObj, obj interface{}) { | ||||||
chaosEventHandler(obj, "UPDATE", stream, startTime) | ||||||
}, | ||||||
} | ||||||
|
||||||
s.AddEventHandler(handlers) | ||||||
s.Run(stopCh) | ||||||
} | ||||||
|
||||||
// responsible for extracting the required data from the event and streaming | ||||||
func chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) { | ||||||
workflowObj := obj.(*chaosTypes.ChaosEngine) | ||||||
if workflowObj.Labels["workflow_id"] == "" { | ||||||
logrus.WithFields(map[string]interface{}{ | ||||||
"uid": string(workflowObj.ObjectMeta.UID), | ||||||
"wf_id": workflowObj.Labels["workflow_id"], | ||||||
"infra_id": workflowObj.Labels["infra_id"], | ||||||
}).Printf("CHAOSENGINE RUN IGNORED [INVALID METADATA]") | ||||||
return | ||||||
} | ||||||
|
||||||
if workflowObj.ObjectMeta.CreationTimestamp.Unix() < startTime { | ||||||
return | ||||||
} | ||||||
|
||||||
cfg, err := k8s.GetKubeConfig() | ||||||
if err != nil { | ||||||
logrus.WithError(err).Fatal("could not get config") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
chaosClient, err := litmusV1alpha1.NewForConfig(cfg) | ||||||
if err != nil { | ||||||
logrus.WithError(err).Fatal("could not get Chaos ClientSet") | ||||||
} | ||||||
|
||||||
nodes := make(map[string]types.Node) | ||||||
logrus.Print("STANDALONE CHAOSENGINE EVENT ", workflowObj.UID, " ", eventType) | ||||||
var cd *types.ChaosData = nil | ||||||
|
||||||
//extracts chaos data | ||||||
cd, err = getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient) | ||||||
if err != nil { | ||||||
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD") | ||||||
} | ||||||
|
||||||
// considering chaos events has only 1 artifact with manifest as raw data | ||||||
finTime := int64(-1) | ||||||
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped { | ||||||
if len(workflowObj.Status.Experiments) > 0 { | ||||||
finTime = workflowObj.Status.Experiments[0].LastUpdateTime.Unix() | ||||||
} | ||||||
} | ||||||
|
||||||
nodes[workflowObj.Name] = types.Node{ | ||||||
Name: workflowObj.Name, | ||||||
Phase: "Succeeded", | ||||||
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()), | ||||||
FinishedAt: StrConvTime(finTime), | ||||||
Children: []string{workflowObj.Name + "-engine"}, | ||||||
Type: "Steps", | ||||||
} | ||||||
details := types.Node{ | ||||||
Name: workflowObj.Name, | ||||||
Phase: mapStatus(workflowObj.Status.EngineStatus), | ||||||
Type: "ChaosEngine", | ||||||
StartedAt: StrConvTime(workflowObj.CreationTimestamp.Unix()), | ||||||
FinishedAt: StrConvTime(finTime), | ||||||
Children: []string{}, | ||||||
ChaosExp: cd, | ||||||
Message: string(workflowObj.Status.EngineStatus), | ||||||
} | ||||||
|
||||||
nodes[workflowObj.Name+"-engine"] = details | ||||||
workflow := types.WorkflowEvent{ | ||||||
WorkflowType: "chaosengine", | ||||||
WorkflowID: workflowObj.Labels["workflow_id"], | ||||||
EventType: eventType, | ||||||
UID: string(workflowObj.ObjectMeta.UID), | ||||||
Namespace: workflowObj.ObjectMeta.Namespace, | ||||||
Name: workflowObj.ObjectMeta.Name, | ||||||
CreationTimestamp: StrConvTime(workflowObj.ObjectMeta.CreationTimestamp.Unix()), | ||||||
Phase: details.Phase, | ||||||
Message: details.Message, | ||||||
StartedAt: details.StartedAt, | ||||||
FinishedAt: details.FinishedAt, | ||||||
Nodes: nodes, | ||||||
} | ||||||
|
||||||
//stream | ||||||
stream <- workflow | ||||||
} | ||||||
|
||||||
//StopChaosEngineState is used to patch all the chaosEngines with engineState=stop | ||||||
func StopChaosEngineState(namespace string, workflowRunID *string) error { | ||||||
ctx := context.TODO() | ||||||
|
||||||
//Define the GVR | ||||||
resourceType := schema.GroupVersionResource{ | ||||||
Group: "litmuschaos.io", | ||||||
Version: "v1alpha1", | ||||||
Resource: "chaosengines", | ||||||
} | ||||||
|
||||||
//Generate the dynamic client | ||||||
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient() | ||||||
if err != nil { | ||||||
return errors.New("failed to get dynamic client, error: " + err.Error()) | ||||||
} | ||||||
|
||||||
listOption := v1.ListOptions{} | ||||||
|
||||||
if workflowRunID != nil { | ||||||
listOption.LabelSelector = fmt.Sprintf("workflow_run_id=%s", *workflowRunID) | ||||||
} | ||||||
|
||||||
//List all chaosEngines present in the particular namespace | ||||||
chaosEngines, err := dynamicClient.Resource(resourceType).Namespace(namespace).List(context.TODO(), listOption) | ||||||
if err != nil { | ||||||
return errors.New("failed to list chaosengines: " + err.Error()) | ||||||
} | ||||||
|
||||||
//Foe every chaosEngine patch the engineState to Stop | ||||||
for _, val := range chaosEngines.Items { | ||||||
patch := []byte(`{"spec":{"engineState":"stop"}}`) | ||||||
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{}) | ||||||
if err != nil { | ||||||
return errors.New("failed to patch chaosengines: " + err.Error()) | ||||||
} | ||||||
if patched != nil { | ||||||
logrus.Info("Successfully patched ChaosEngine: ", patched.GetName()) | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func mapStatus(status chaosTypes.EngineStatus) string { | ||||||
switch status { | ||||||
case chaosTypes.EngineStatusInitialized: | ||||||
return "Running" | ||||||
case chaosTypes.EngineStatusCompleted: | ||||||
return "Succeeded" | ||||||
case chaosTypes.EngineStatusStopped: | ||||||
return "Skipped" | ||||||
default: | ||||||
return "Running" | ||||||
} | ||||||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.