diff --git a/appmesh/app_mesh.go b/appmesh/app_mesh.go index 54bd1bd..40835a8 100644 --- a/appmesh/app_mesh.go +++ b/appmesh/app_mesh.go @@ -16,6 +16,7 @@ import ( "github.com/layer5io/meshkit/logger" "github.com/layer5io/meshkit/models" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" + "github.com/layer5io/meshkit/utils/events" "gopkg.in/yaml.v2" ) @@ -25,24 +26,24 @@ type AppMesh struct { } // New initializes AppMesh handler. -func New(c meshkitCfg.Handler, l logger.Handler, kc meshkitCfg.Handler) adapter.Handler { +func New(c meshkitCfg.Handler, l logger.Handler, kc meshkitCfg.Handler, e *events.EventStreamer) adapter.Handler { return &AppMesh{ Adapter: adapter.Adapter{ Config: c, Log: l, KubeconfigHandler: kc, + EventStreamer: e, }, } } // ApplyOperation applies the requested operation on app-mesh -func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.OperationRequest, hchan *chan interface{}) error { +func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.OperationRequest) error { err := appMesh.CreateKubeconfigs(opReq.K8sConfigs) if err != nil { return err } kubeConfigs := opReq.K8sConfigs - appMesh.SetChannel(hchan); operations := make(adapter.Operations) err = appMesh.Config.GetObject(adapter.OperationsKey, &operations) @@ -51,10 +52,10 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat } e := &meshes.EventsResponse{ - OperationId: opReq.OperationID, - Summary: status.Deploying, - Details: "Operation is not supported", - Component: internalconfig.ServerConfig["type"], + OperationId: opReq.OperationID, + Summary: status.Deploying, + Details: "Operation is not supported", + Component: internalconfig.ServerConfig["type"], ComponentName: internalconfig.ServerConfig["name"], } stat := status.Deploying @@ -65,12 +66,12 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat version := string(operations[opReq.OperationName].Versions[0]) if stat, err = hh.installAppMesh(opReq.IsDeleteOperation, version, opReq.Namespace, kubeConfigs); err != nil { summary := fmt.Sprintf("Error while %s AWS App mesh", stat) - hh.streamErr(summary, e, err) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("App mesh %s successfully", stat) ee.Details = fmt.Sprintf("The App mesh is now %s.", stat) - hh.StreamInfo(e) + hh.StreamInfo(ee) }(appMesh, e) case internalconfig.LabelNamespace: @@ -116,12 +117,12 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat stat, err := hh.installSampleApp(opReq.Namespace, opReq.IsDeleteOperation, operations[opReq.OperationName].Templates, kubeConfigs) if err != nil { summary := fmt.Sprintf("Error while %s %s application", stat, appName) - hh.streamErr(summary, e, err) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("%s application %s successfully", appName, stat) ee.Details = fmt.Sprintf("The %s application is now %s.", appName, stat) - hh.StreamInfo(e) + hh.StreamInfo(ee) }(appMesh, e) case common.CustomOperation: @@ -129,12 +130,12 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat stat, err := hh.applyCustomOperation(opReq.Namespace, opReq.CustomBody, opReq.IsDeleteOperation, kubeConfigs) if err != nil { summary := fmt.Sprintf("Error while %s custom operation", stat) - hh.streamErr(summary, e, err) + hh.streamErr(summary, ee, err) return } ee.Summary = fmt.Sprintf("Manifest %s successfully", status.Deployed) ee.Details = "" - hh.StreamInfo(e) + hh.StreamInfo(ee) }(appMesh, e) default: appMesh.streamErr("Invalid operation", e, ErrOpInvalid) @@ -189,8 +190,8 @@ func (appMesh *AppMesh) CreateKubeconfigs(kubeconfigs []string) error { } // ProcessOAM handles the grpc invocation for handling OAM objects -func (appMesh *AppMesh) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { - appMesh.SetChannel(hchan) +func (appMesh *AppMesh) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) { + err := appMesh.CreateKubeconfigs(oamReq.K8sConfigs) if err != nil { return "", err @@ -245,11 +246,11 @@ func (appMesh *AppMesh) ProcessOAM(ctx context.Context, oamReq adapter.OAMReques return msg1 + "\n" + msg2, nil } -func(appMesh *AppMesh) streamErr(summary string, e *meshes.EventsResponse, err error) { +func (appMesh *AppMesh) streamErr(summary string, e *meshes.EventsResponse, err error) { e.Summary = summary e.Details = err.Error() e.ErrorCode = errors.GetCode(err) e.ProbableCause = errors.GetCause(err) e.SuggestedRemediation = errors.GetRemedy(err) appMesh.StreamErr(e, err) -} \ No newline at end of file +} diff --git a/appmesh/oam.go b/appmesh/oam.go index 7106441..6026c25 100644 --- a/appmesh/oam.go +++ b/appmesh/oam.go @@ -4,6 +4,9 @@ import ( "fmt" "strings" + "github.com/google/uuid" + "github.com/layer5io/meshery-adapter-library/meshes" + "github.com/layer5io/meshery-app-mesh/internal/config" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" "gopkg.in/yaml.v2" ) @@ -15,19 +18,34 @@ type CompHandler func(*AppMesh, v1alpha1.Component, bool, []string) (string, err func (appMesh *AppMesh) HandleComponents(comps []v1alpha1.Component, isDel bool, kubeconfigs []string) (string, error) { var errs []error var msgs []string - + stat1 := "deploying" + stat2 := "deployed" + if isDel { + stat1 = "removing" + stat2 = "removed" + } compFuncMap := map[string]CompHandler{ "AppMesh": handleComponentAppMesh, } for _, comp := range comps { + ee := &meshes.EventsResponse{ + OperationId: uuid.New().String(), + Component: config.ServerConfig["type"], + ComponentName: config.ServerConfig["name"], + } fnc, ok := compFuncMap[comp.Spec.Type] if !ok { msg, err := handleAppMeshCoreComponent(appMesh, comp, isDel, "", "", kubeconfigs) if err != nil { + ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type) + appMesh.streamErr(ee.Summary, ee, err) errs = append(errs, err) continue } + ee.Summary = fmt.Sprintf("%s %s successfully", comp.Spec.Type, stat2) + ee.Details = fmt.Sprintf("The %s is now %s.", comp.Spec.Type, stat2) + appMesh.StreamInfo(ee) msgs = append(msgs, msg) continue @@ -35,9 +53,14 @@ func (appMesh *AppMesh) HandleComponents(comps []v1alpha1.Component, isDel bool, msg, err := fnc(appMesh, comp, isDel, kubeconfigs) if err != nil { + ee.Summary = fmt.Sprintf("Error while %s %s", stat1, comp.Spec.Type) + appMesh.streamErr(ee.Summary, ee, err) errs = append(errs, err) continue } + ee.Summary = fmt.Sprintf("%s %s %s successfully", comp.Name, comp.Spec.Type, stat2) + ee.Details = fmt.Sprintf("The %s %s is now %s.", comp.Name, comp.Spec.Type, stat2) + appMesh.StreamInfo(ee) msgs = append(msgs, msg) } diff --git a/appmesh/oam/register.go b/appmesh/oam/register.go index 06b6acc..96442b4 100644 --- a/appmesh/oam/register.go +++ b/appmesh/oam/register.go @@ -12,6 +12,7 @@ import ( var ( basePath, _ = os.Getwd() + // WorkloadPath contains the path to the workload schemas and definitions directory WorkloadPath = filepath.Join(basePath, "templates", "oam", "workloads") // traitPath = filepath.Join(basePath, "templates", "oam", "traits") pathSets = []schemaDefinitionPathSet{} diff --git a/go.mod b/go.mod index 90c1ffb..dcb630b 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,9 @@ replace ( ) require ( - github.com/layer5io/meshery-adapter-library v0.5.9 - github.com/layer5io/meshkit v0.5.32 + github.com/google/uuid v1.3.0 + github.com/layer5io/meshery-adapter-library v0.5.10 + github.com/layer5io/meshkit v0.5.37 github.com/layer5io/service-mesh-performance v0.3.4 gopkg.in/yaml.v2 v2.4.0 k8s.io/apimachinery v0.23.5 @@ -63,7 +64,6 @@ require ( github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gosuri/uitable v0.0.4 // indirect diff --git a/go.sum b/go.sum index ff7740b..1b23057 100644 --- a/go.sum +++ b/go.sum @@ -852,10 +852,10 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/layer5io/kuttl v0.4.1-0.20200723152044-916f10574334/go.mod h1:UmrVd7x+bNVKrpmKgTtfRiTKHZeNPcMjQproJ0vGwhE= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34 h1:QaViadDOBCMDUwYx78kfRvHMkzRVnh/GOhm3s2gxoP4= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= -github.com/layer5io/meshery-adapter-library v0.5.9 h1:Zp79l4J8kMjML9zAQ4Xu4QiKM5q5HEGcv04Jjg+xWSA= -github.com/layer5io/meshery-adapter-library v0.5.9/go.mod h1:IvURQMnZHa3z0OTcUSPqCHUgTsW2x0/+KjCqpYfMbt0= -github.com/layer5io/meshkit v0.5.32 h1:jIkQ9gKH7TPMWKbVtf6wQ+qv4553UyZ9SV4yKA2D4oo= -github.com/layer5io/meshkit v0.5.32/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= +github.com/layer5io/meshery-adapter-library v0.5.10 h1:Qgr6vDx2s10mkhtk7Mnz5I73m/9yf2yyjCkPMeB4jmA= +github.com/layer5io/meshery-adapter-library v0.5.10/go.mod h1:Sg6WNN82uRo2kiFDEMc/LM/AJ/Pu6ZmBZGbFxZuC7zc= +github.com/layer5io/meshkit v0.5.37 h1:EO0wXAI+eqAm+4uKSzFd50rDkr6nqQ17m1j0wmv9hQA= +github.com/layer5io/meshkit v0.5.37/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0= github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= github.com/layer5io/service-mesh-performance v0.3.4 h1:aw/elsx0wkry7SyiQRIj31wW7TPCP4YfhINdNOLXVg8= github.com/layer5io/service-mesh-performance v0.3.4/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ= diff --git a/main.go b/main.go index 9c2a5d7..eae0bd1 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "github.com/layer5io/meshery-app-mesh/build" "github.com/layer5io/meshery-app-mesh/internal/config" "github.com/layer5io/meshkit/logger" + "github.com/layer5io/meshkit/utils/events" // "github.com/layer5io/meshkit/tracing" "github.com/layer5io/meshery-app-mesh/appmesh/oam" @@ -91,13 +92,13 @@ func main() { // log.Err("Tracing Init Failed", err.Error()) // os.Exit(1) // } - + e := events.NewEventStreamer() // Initialize Handler intance - handler := appmesh.New(cfg, log, kubeconfigHandler) + handler := appmesh.New(cfg, log, kubeconfigHandler, e) handler = adapter.AddLogger(log, handler) service.Handler = handler - service.Channel = make(chan interface{}, 10) + service.EventStreamer = e service.StartedAt = time.Now() service.Version = version service.GitSHA = gitsha