Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #177 from meshery/noti
Browse files Browse the repository at this point in the history
Add Event streamer struct
  • Loading branch information
Revolyssup committed Aug 31, 2022
2 parents 15d6095 + 12a6719 commit 7400cbe
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 28 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ replace (
)

require (
github.com/layer5io/meshery-adapter-library v0.5.9
github.com/layer5io/meshkit v0.5.32
github.com/layer5io/meshery-adapter-library v0.5.10
github.com/layer5io/meshkit v0.5.37
github.com/layer5io/service-mesh-performance v0.3.4
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.23.5
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,12 @@ github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3
github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw=
github.com/layer5io/meshery-adapter-library v0.5.9 h1:Zp79l4J8kMjML9zAQ4Xu4QiKM5q5HEGcv04Jjg+xWSA=
github.com/layer5io/meshery-adapter-library v0.5.9/go.mod h1:IvURQMnZHa3z0OTcUSPqCHUgTsW2x0/+KjCqpYfMbt0=
github.com/layer5io/meshery-adapter-library v0.5.10 h1:Qgr6vDx2s10mkhtk7Mnz5I73m/9yf2yyjCkPMeB4jmA=
github.com/layer5io/meshery-adapter-library v0.5.10/go.mod h1:Sg6WNN82uRo2kiFDEMc/LM/AJ/Pu6ZmBZGbFxZuC7zc=
github.com/layer5io/meshkit v0.5.32 h1:jIkQ9gKH7TPMWKbVtf6wQ+qv4553UyZ9SV4yKA2D4oo=
github.com/layer5io/meshkit v0.5.32/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0=
github.com/layer5io/meshkit v0.5.37 h1:EO0wXAI+eqAm+4uKSzFd50rDkr6nqQ17m1j0wmv9hQA=
github.com/layer5io/meshkit v0.5.37/go.mod h1:dt0uOluDzatK6hbJEDAZbUsm7LJNb4nsXXaGUDtYxD0=
github.com/layer5io/service-mesh-performance v0.3.2-0.20210122142912-a94e0658b021/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ=
github.com/layer5io/service-mesh-performance v0.3.4 h1:aw/elsx0wkry7SyiQRIj31wW7TPCP4YfhINdNOLXVg8=
github.com/layer5io/service-mesh-performance v0.3.4/go.mod h1:W153amv8aHAeIWxO7b7d7Vibt9RhaEVh4Uh+RG+BumQ=
Expand Down
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
configprovider "github.com/layer5io/meshkit/config/provider"
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshkit/utils/events"
)

var (
Expand Down Expand Up @@ -70,9 +71,9 @@ func main() {

service := &grpc.Service{}
_ = cfg.GetObject(adapter.ServerKey, &service)

service.Handler = osm.New(cfg, log, kubeconfigHandler)
service.Channel = make(chan interface{}, 100)
e := events.NewEventStreamer()
service.Handler = osm.New(cfg, log, kubeconfigHandler, e)
service.EventStreamer = e
service.StartedAt = time.Now()
service.Version = version
service.GitSHA = gitsha
Expand Down
23 changes: 20 additions & 3 deletions osm/oam.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"fmt"
"strings"

"github.com/google/uuid"
"github.com/layer5io/meshery-adapter-library/meshes"
"github.com/layer5io/meshery-osm/internal/config"
"github.com/layer5io/meshkit/models/oam/core/v1alpha1"
"gopkg.in/yaml.v2"
)
Expand All @@ -19,26 +22,40 @@ func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool, kubec
compFuncMap := map[string]CompHandler{
"OSMMesh": handleComponentOSMMesh,
}

stat1 := "deploying"
stat2 := "deployed"
if isDel {
stat1 = "removing"
stat2 = "removed"
}
for _, comp := range comps {
ee := &meshes.EventsResponse{
OperationId: uuid.New().String(),
Component: config.ServerDefaults["type"],
ComponentName: config.ServerDefaults["name"],
}
fnc, ok := compFuncMap[comp.Spec.Type]
if !ok {
msg, err := handleOSMCoreComponent(h, comp, isDel, "", "", kubeconfigs)
if err != nil {
h.streamErr(fmt.Sprintf("failed in %s %s", stat1, comp.Spec.Type), ee, err)
errs = append(errs, err)
continue
}

ee.Summary = fmt.Sprintf("%s: %s %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2)
ee.Details = fmt.Sprintf("The %s of type %s has been %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2)
msgs = append(msgs, msg)
continue
}

msg, err := fnc(h, comp, isDel, kubeconfigs)
if err != nil {
h.streamErr(fmt.Sprintf("failed in %s %s", stat1, strings.TrimSuffix(comp.Spec.Type, ".OSM")), ee, err)
errs = append(errs, err)
continue
}

ee.Summary = fmt.Sprintf("%s: %s %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2)
ee.Details = fmt.Sprintf("The %s of type %s has been %s successfully", comp.Name, strings.TrimSuffix(comp.Spec.Type, ".OSM"), stat2)
msgs = append(msgs, msg)
}

Expand Down
33 changes: 16 additions & 17 deletions osm/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ import (
)

// ApplyOperation function contains the operation handlers
func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error {
func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest) error {
err := h.CreateKubeconfigs(request.K8sConfigs)
if err != nil {
return err
}
h.SetChannel(hchan)
kubeconfigs := request.K8sConfigs
operations := make(adapter.Operations)
err = h.Config.GetObject(adapter.OperationsKey, &operations)
Expand All @@ -27,10 +26,10 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR
}

e := &meshes.EventsResponse{
OperationId: request.OperationID,
Summary: status.Deploying,
Details: "Operation is not supported",
Component: internalconfig.ServerDefaults["type"],
OperationId: request.OperationID,
Summary: status.Deploying,
Details: "Operation is not supported",
Component: internalconfig.ServerDefaults["type"],
ComponentName: internalconfig.ServerDefaults["name"],
}

Expand All @@ -42,12 +41,12 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR
stat, err := hh.installOSM(request.IsDeleteOperation, version, request.Namespace, kubeconfigs)
if err != nil {
summary := fmt.Sprintf("Error while %s Open service mesh", stat)
hh.streamErr(summary, e, err)
hh.streamErr(summary, ee, err)
return
}
ee.Summary = fmt.Sprintf("Open service mesh %s successfully", stat)
ee.Details = fmt.Sprintf("Open service mesh is now %s.", stat)
hh.StreamInfo(e)
hh.StreamInfo(ee)
}(h, e)
case
common.BookInfoOperation,
Expand All @@ -58,13 +57,13 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR
appName := operations[request.OperationName].AdditionalProperties[common.ServiceName]
stat, err := hh.installSampleApp(request.IsDeleteOperation, request.Namespace, operations[request.OperationName].Templates, kubeconfigs)
if err != nil {
summary := fmt.Sprintf("Error while %s %s application", stat, appName)
hh.streamErr(summary, e, err)
summary := fmt.Sprintf("Error while %s %s application", stat, appName)
hh.streamErr(summary, ee, err)
return
}
ee.Summary = fmt.Sprintf("%s application %s successfully", appName, stat)
ee.Details = fmt.Sprintf("The %s application is now %s.", appName, stat)
hh.StreamInfo(e)
hh.StreamInfo(ee)
}(h, e)
case internalconfig.OSMBookStoreOperation:
go func(hh *Handler, ee *meshes.EventsResponse) {
Expand All @@ -78,12 +77,12 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR
)
if err != nil {
summary := fmt.Sprintf("Error while %s %s application", stat, appName)
hh.streamErr(summary, e, err)
hh.streamErr(summary, ee, err)
return
}
ee.Summary = fmt.Sprintf("%s application %s successfully", appName, stat)
ee.Details = fmt.Sprintf("The %s application is now %s.", appName, stat)
hh.StreamInfo(e)
hh.StreamInfo(ee)
}(h, e)
case common.SmiConformanceOperation:
go func(hh *Handler, ee *meshes.EventsResponse) {
Expand All @@ -99,21 +98,21 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR
Annotations: make(map[string]string),
})
if err != nil {
summary := fmt.Sprintf("Error while %s %s test", status.Running, name)
hh.streamErr(summary ,e, err)
summary := fmt.Sprintf("Error while %s %s test", status.Running, name)
hh.streamErr(summary, ee, err)
return
}
ee.Summary = fmt.Sprintf("%s test %s successfully", name, status.Completed)
ee.Details = ""
hh.StreamInfo(e)
hh.StreamInfo(ee)
}(h, e)
default:
h.streamErr("Invalid operation", e, ErrOpInvalid)
}
return nil
}

func(h *Handler) streamErr(summary string, e *meshes.EventsResponse, err error) {
func (h *Handler) streamErr(summary string, e *meshes.EventsResponse, err error) {
e.Summary = summary
e.Details = err.Error()
e.ErrorCode = errors.GetCode(err)
Expand Down
7 changes: 4 additions & 3 deletions osm/osm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/models"
"github.com/layer5io/meshkit/models/oam/core/v1alpha1"
"github.com/layer5io/meshkit/utils/events"
"gopkg.in/yaml.v2"
)

Expand All @@ -32,12 +33,13 @@ type Handler struct {
}

// New initializes a new handler instance
func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) adapter.Handler {
func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler, e *events.EventStreamer) adapter.Handler {
return &Handler{
Adapter: adapter.Adapter{
Config: config,
Log: log,
KubeconfigHandler: kc,
EventStreamer: e,
},
}
}
Expand Down Expand Up @@ -88,13 +90,12 @@ func (h *Handler) CreateKubeconfigs(kubeconfigs []string) error {
}

// ProcessOAM will handles the grpc invocation for handling OAM objects
func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) {
func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) {
err := h.CreateKubeconfigs(oamReq.K8sConfigs)
if err != nil {
return "", err
}
kubeconfigs := oamReq.K8sConfigs
h.SetChannel(hchan)
var comps []v1alpha1.Component
for _, acomp := range oamReq.OamComps {
comp, err := oam.ParseApplicationComponent(acomp)
Expand Down

0 comments on commit 7400cbe

Please sign in to comment.