From bf460434c54b2c5c0a0d611a9b717dae4c692fdc Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 4 Jan 2022 12:39:24 +0200 Subject: [PATCH 01/11] clean up Signed-off-by: Ayman --- datasourceStatus/client.go | 2 +- go.mod | 3 +-- go.sum | 1 + 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datasourceStatus/client.go b/datasourceStatus/client.go index 20bae17..b46e739 100644 --- a/datasourceStatus/client.go +++ b/datasourceStatus/client.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/LF-Engineering/dev-analytics-libraries/uuid" + "github.com/LF-Engineering/insights-datasource-shared/uuid" jsoniter "github.com/json-iterator/go" ) diff --git a/go.mod b/go.mod index b4fc95e..712e769 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/LF-Engineering/insights-datasource-shared go 1.15 require ( - github.com/LF-Engineering/dev-analytics-libraries v1.1.28 github.com/avast/retry-go v3.0.0+incompatible github.com/aws/aws-sdk-go v1.42.24 github.com/aws/aws-sdk-go-v2 v1.11.2 @@ -11,7 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/firehose v1.4.2 github.com/aws/aws-sdk-go-v2/service/ssm v1.17.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e + github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674 github.com/google/uuid v1.3.0 github.com/json-iterator/go v1.1.11 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index fb3d290..c46ff80 100644 --- a/go.sum +++ b/go.sum @@ -94,6 +94,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674 h1:heH4w5l/KFP4Ry9Xp4+jbRx0Wn+TJD7+HlyoMJE4LvQ= github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674/go.mod h1:xe9a/L2aeOgFKKgrO3ibQTnMdpAeL0GC+5/HpGScSa4= github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e h1:UzG3rbPYrsey+mmAtyHR3RoL4T4jTBvMF5bCp+RSwPI= github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= From 33874826315a0b3d3aed94472c8927bfda80f87c Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 4 Jan 2022 16:10:45 +0200 Subject: [PATCH 02/11] add elastic logging functionality Signed-off-by: Ayman --- elastic/client.go | 3 -- logging/client.go | 132 ++++++++++++++++++++++++++++++++++++++++++++++ logging/dto.go | 27 ++++++++++ 3 files changed, 159 insertions(+), 3 deletions(-) create mode 100644 logging/client.go create mode 100644 logging/dto.go diff --git a/elastic/client.go b/elastic/client.go index 43511da..8d85806 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -477,9 +477,6 @@ func (p *ClientProvider) Search(index string, query map[string]interface{}) ([]b p.client.Search.WithBody(&buf), ) if err != nil { - if strings.Contains(err.Error(), "server is not Elasticsearch") { - fmt.Println("esssssss", p.params) - } return nil, err } diff --git a/logging/client.go b/logging/client.go new file mode 100644 index 0000000..f2788e0 --- /dev/null +++ b/logging/client.go @@ -0,0 +1,132 @@ +package logging + +import ( + "fmt" + "time" + + "github.com/LF-Engineering/insights-datasource-shared/uuid" + jsoniter "github.com/json-iterator/go" +) + +const ( + logIndex = "insights-task-logging" + inProgress = "inprogress" + failed = "failed" + done = "done" +) + +// ESLogProvider used in connecting to ES logging server +type ESLogProvider interface { + CreateDocument(index, documentID string, body []byte) ([]byte, error) + Get(index string, query map[string]interface{}, result interface{}) error + UpdateDocument(index string, id string, body interface{}) ([]byte, error) +} + +// LogProvider ... +type LogProvider struct { + esClient ESLogProvider + environment string +} + +// NewLogProvider ... +func NewLogProvider(esClient ESLogProvider, environment string) (*LogProvider, error) { + logProvider := &LogProvider{ + esClient: esClient, + environment: environment, + } + + return logProvider, nil +} + +// StoreLog ... +func (s *LogProvider) StoreLog(log Log) error { + if log.Datasource == "" || log.Endpoint == "" || log.CreatedAt.IsZero() { + return fmt.Errorf("error: log datasource, endpoint and created at are all required") + } + if log.Status != inProgress && log.Status != failed && log.Status != done { + return fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + } + + date := log.CreatedAt.Format(time.RFC3339) + docID, err := uuid.Generate(log.Datasource, log.Endpoint, log.Status, date) + if err != nil { + return err + } + + b, err := jsoniter.Marshal(log) + if err != nil { + return err + } + + index := fmt.Sprintf("%s-%s", logIndex, s.environment) + query := map[string]interface{}{ + "query": map[string]interface{}{ + "term": map[string]interface{}{ + "_id": map[string]string{ + "value": docID}, + }, + }, + } + + var res TopHits + err = s.esClient.Get(fmt.Sprintf("%s-%s", logIndex, s.environment), query, &res) + if err != nil || len(res.Hits.Hits) == 0 { + _, err := s.esClient.CreateDocument(index, docID, b) + return err + } + + return s.updateDocument(log, index, docID) +} + +// PullLogs ... +func (s *LogProvider) PullLogs(datasource string) ([]Log, error) { + must := make([]map[string]interface{}, 0) + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "datasource": map[string]string{ + "value": datasource}, + }, + }) + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "status": map[string]string{ + "value": inProgress}, + }, + }) + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + //"must": must, + }, + }, + } + + var res TopHits + logs := make([]Log, 0) + err := s.esClient.Get(fmt.Sprintf("%s-%s", logIndex, s.environment), query, &res) + if err != nil { + return logs, err + } + + for _, l := range res.Hits.Hits { + logs = append(logs, l.Source) + } + + return logs, nil +} + +func (s *LogProvider) updateDocument(log Log, index string, docID string) error { + doc := map[string]interface{}{ + "datasource": log.Datasource, + "endpoint": log.Endpoint, + "created_at": log.CreatedAt, + "status": log.Status, + } + + _, err := s.esClient.UpdateDocument(index, docID, doc) + if err != nil { + fmt.Println(index) + return err + } + return nil +} diff --git a/logging/dto.go b/logging/dto.go new file mode 100644 index 0000000..193ce40 --- /dev/null +++ b/logging/dto.go @@ -0,0 +1,27 @@ +package logging + +import "time" + +// Log ... +type Log struct { + Datasource string `json:"datasource"` + Endpoint string `json:"endpoint"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` +} + +// TopHits result +type TopHits struct { + Hits Hits `json:"hits"` +} + +// Hits result +type Hits struct { + Hits []NestedHits `json:"hits"` +} + +// NestedHits is the actual hit data +type NestedHits struct { + ID string `json:"_id"` + Source Log `json:"_source"` +} From bcb28db08ea62a55ecef78debae94e9a12157a34 Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 4 Jan 2022 17:31:37 +0200 Subject: [PATCH 03/11] add elastic logging functionality Signed-off-by: Ayman --- logging/client.go | 4 ++-- logging/dto.go | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/logging/client.go b/logging/client.go index f2788e0..8a0fbea 100644 --- a/logging/client.go +++ b/logging/client.go @@ -48,7 +48,7 @@ func (s *LogProvider) StoreLog(log Log) error { } date := log.CreatedAt.Format(time.RFC3339) - docID, err := uuid.Generate(log.Datasource, log.Endpoint, log.Status, date) + docID, err := uuid.Generate(log.Datasource, log.Endpoint, date) if err != nil { return err } @@ -96,7 +96,7 @@ func (s *LogProvider) PullLogs(datasource string) ([]Log, error) { query := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ - //"must": must, + "must": must, }, }, } diff --git a/logging/dto.go b/logging/dto.go index 193ce40..63ffc79 100644 --- a/logging/dto.go +++ b/logging/dto.go @@ -4,10 +4,12 @@ import "time" // Log ... type Log struct { - Datasource string `json:"datasource"` - Endpoint string `json:"endpoint"` - Status string `json:"status"` - CreatedAt time.Time `json:"created_at"` + Datasource string `json:"datasource"` + Endpoint string `json:"endpoint"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + ProjectSlug string `json:"project_slug"` + Message string `json:"message"` } // TopHits result From 312ff1ec3eccaf54ed8586e1ec17ab555dc2a5c2 Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 4 Jan 2022 22:12:24 +0200 Subject: [PATCH 04/11] clean up Signed-off-by: Ayman --- {logging => ingestjob}/dto.go | 2 +- logging/client.go => ingestjob/logger.go | 60 +++++++++++++++++++----- 2 files changed, 48 insertions(+), 14 deletions(-) rename {logging => ingestjob}/dto.go (96%) rename logging/client.go => ingestjob/logger.go (64%) diff --git a/logging/dto.go b/ingestjob/dto.go similarity index 96% rename from logging/dto.go rename to ingestjob/dto.go index 63ffc79..aa3b04b 100644 --- a/logging/dto.go +++ b/ingestjob/dto.go @@ -1,4 +1,4 @@ -package logging +package ingestjob import "time" diff --git a/logging/client.go b/ingestjob/logger.go similarity index 64% rename from logging/client.go rename to ingestjob/logger.go index 8a0fbea..f23cdf8 100644 --- a/logging/client.go +++ b/ingestjob/logger.go @@ -1,4 +1,4 @@ -package logging +package ingestjob import ( "fmt" @@ -9,7 +9,7 @@ import ( ) const ( - logIndex = "insights-task-logging" + logIndex = "insights-job-logging" inProgress = "inprogress" failed = "failed" done = "done" @@ -20,17 +20,18 @@ type ESLogProvider interface { CreateDocument(index, documentID string, body []byte) ([]byte, error) Get(index string, query map[string]interface{}, result interface{}) error UpdateDocument(index string, id string, body interface{}) ([]byte, error) + Count(index string, query map[string]interface{}) (int, error) } -// LogProvider ... -type LogProvider struct { +// Logger ... +type Logger struct { esClient ESLogProvider environment string } -// NewLogProvider ... -func NewLogProvider(esClient ESLogProvider, environment string) (*LogProvider, error) { - logProvider := &LogProvider{ +// NewLogger ... +func NewLogger(esClient ESLogProvider, environment string) (*Logger, error) { + logProvider := &Logger{ esClient: esClient, environment: environment, } @@ -38,8 +39,8 @@ func NewLogProvider(esClient ESLogProvider, environment string) (*LogProvider, e return logProvider, nil } -// StoreLog ... -func (s *LogProvider) StoreLog(log Log) error { +// Write ... +func (s *Logger) Write(log Log) error { if log.Datasource == "" || log.Endpoint == "" || log.CreatedAt.IsZero() { return fmt.Errorf("error: log datasource, endpoint and created at are all required") } @@ -78,8 +79,12 @@ func (s *LogProvider) StoreLog(log Log) error { return s.updateDocument(log, index, docID) } -// PullLogs ... -func (s *LogProvider) PullLogs(datasource string) ([]Log, error) { +// Read ... +func (s *Logger) Read(datasource string, status string) ([]Log, error) { + if status != inProgress && status != failed && status != done { + return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + } + must := make([]map[string]interface{}, 0) must = append(must, map[string]interface{}{ "term": map[string]interface{}{ @@ -90,7 +95,7 @@ func (s *LogProvider) PullLogs(datasource string) ([]Log, error) { must = append(must, map[string]interface{}{ "term": map[string]interface{}{ "status": map[string]string{ - "value": inProgress}, + "value": status}, }, }) query := map[string]interface{}{ @@ -115,7 +120,36 @@ func (s *LogProvider) PullLogs(datasource string) ([]Log, error) { return logs, nil } -func (s *LogProvider) updateDocument(log Log, index string, docID string) error { +func (s *Logger) Count(datasource string, status string) (int, error) { + if status != inProgress && status != failed && status != done { + return 0, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + } + + must := make([]map[string]interface{}, 0) + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "datasource": map[string]string{ + "value": datasource}, + }, + }) + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "status": map[string]string{ + "value": status}, + }, + }) + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": must, + }, + }, + } + + return s.esClient.Count(fmt.Sprintf("%s-%s", logIndex, s.environment), query) +} + +func (s *Logger) updateDocument(log Log, index string, docID string) error { doc := map[string]interface{}{ "datasource": log.Datasource, "endpoint": log.Endpoint, From 0e4ab5a12db2bae4671cd2a6ce58f7a5b5f0183f Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 4 Jan 2022 22:45:11 +0200 Subject: [PATCH 05/11] clean up Signed-off-by: Ayman --- ingestjob/logger.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestjob/logger.go b/ingestjob/logger.go index f23cdf8..5cde095 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -40,7 +40,7 @@ func NewLogger(esClient ESLogProvider, environment string) (*Logger, error) { } // Write ... -func (s *Logger) Write(log Log) error { +func (s *Logger) Write(log *Log) error { if log.Datasource == "" || log.Endpoint == "" || log.CreatedAt.IsZero() { return fmt.Errorf("error: log datasource, endpoint and created at are all required") } @@ -76,7 +76,7 @@ func (s *Logger) Write(log Log) error { return err } - return s.updateDocument(log, index, docID) + return s.updateDocument(*log, index, docID) } // Read ... From 716194606cc644e2d1b34d02dc9bcdcb0e60d4d9 Mon Sep 17 00:00:00 2001 From: Ayman Date: Wed, 5 Jan 2022 13:18:37 +0200 Subject: [PATCH 06/11] add elastic gap Signed-off-by: Ayman --- elasticgap/dto.go | 19 +++++++++ elasticgap/handler.go | 92 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 elasticgap/dto.go create mode 100644 elasticgap/handler.go diff --git a/elasticgap/dto.go b/elasticgap/dto.go new file mode 100644 index 0000000..d4191b0 --- /dev/null +++ b/elasticgap/dto.go @@ -0,0 +1,19 @@ +package elasticgap + +// ElasticResponse ... +type ElasticResponse struct { + Took int + Errors bool + Items []ElasticResponseItem +} + +// ElasticResponseItem ... +type ElasticResponseItem struct { + Index ESResponseIndex +} + +// ESResponseIndex ... +type ESResponseIndex struct { + ID string `json:"_id"` + Status int +} diff --git a/elasticgap/handler.go b/elasticgap/handler.go new file mode 100644 index 0000000..4906702 --- /dev/null +++ b/elasticgap/handler.go @@ -0,0 +1,92 @@ +package elasticgap + +import ( + b64 "encoding/base64" + "fmt" + + "github.com/LF-Engineering/insights-datasource-shared/elastic" + jsoniter "github.com/json-iterator/go" +) + +// HTTPClient used in connecting to remote http server +type HTTPClient interface { + Request(url string, method string, header map[string]string, body []byte, params map[string]string) (statusCode int, resBody []byte, err error) +} + +// Auth0Client ... +type Auth0Client interface { + GetToken() (string, error) +} + +// GapHandler ... +type GapHandler struct { + gapURL string + httpClient HTTPClient + auth0Client Auth0Client +} + +// NewGapHandler ... +func NewGapHandler(gapURL string, httpClient HTTPClient, auth0Client Auth0Client) *GapHandler { + return &GapHandler{ + gapURL: gapURL, + httpClient: httpClient, + auth0Client: auth0Client, + } +} + +// Send unsaved data to data-gap handler +func (g *GapHandler) Send(data []elastic.BulkData) error { + token, err := g.auth0Client.GetToken() + if err != nil { + return err + } + + byteData, err := jsoniter.Marshal(data) + if err != nil { + return err + } + + dataEnc := b64.StdEncoding.EncodeToString(byteData) + gapBody := map[string]map[string]string{"index": {"content": dataEnc}} + bData, err := jsoniter.Marshal(gapBody) + if err != nil { + return err + } + + header := make(map[string]string) + header["Authorization"] = fmt.Sprintf("Bearer %s", token) + + if g.gapURL != "" { + _, _, err = g.httpClient.Request(g.gapURL, "POST", header, bData, nil) + if err != nil { + return err + } + } + + return nil +} + +// HandleFailedData ... +func (g *GapHandler) HandleFailedData(data []elastic.BulkData, byteResponse []byte) (failedIndexes []elastic.BulkData, err error) { + var esRes ElasticResponse + err = jsoniter.Unmarshal(byteResponse, &esRes) + if err != nil { + return failedIndexes, err + } + + // loop throw elastic response to get failed indexes + for _, item := range esRes.Items { + if item.Index.Status != 200 { + var singleBulk elastic.BulkData + // loop throw real data to get failed ones + for _, el := range data { + if el.ID == item.Index.ID { + singleBulk = el + break + } + } + failedIndexes = append(failedIndexes, singleBulk) + } + } + return failedIndexes, nil +} From 91fdc894121c2d13b3c085f37f13b6f2ccf14bb2 Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 11 Jan 2022 17:53:33 +0200 Subject: [PATCH 07/11] update log schema Signed-off-by: Ayman --- ingestjob/dto.go | 13 +++++++------ ingestjob/logger.go | 32 ++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/ingestjob/dto.go b/ingestjob/dto.go index aa3b04b..7d330ba 100644 --- a/ingestjob/dto.go +++ b/ingestjob/dto.go @@ -4,12 +4,13 @@ import "time" // Log ... type Log struct { - Datasource string `json:"datasource"` - Endpoint string `json:"endpoint"` - Status string `json:"status"` - CreatedAt time.Time `json:"created_at"` - ProjectSlug string `json:"project_slug"` - Message string `json:"message"` + Connector string `json:"connector"` + Configuration []map[string]string `json:"configuration"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + ProjectSlug string `json:"project_slug"` + Message string `json:"message"` } // TopHits result diff --git a/ingestjob/logger.go b/ingestjob/logger.go index 5cde095..a23e5a9 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -1,6 +1,7 @@ package ingestjob import ( + "encoding/json" "fmt" "time" @@ -41,15 +42,19 @@ func NewLogger(esClient ESLogProvider, environment string) (*Logger, error) { // Write ... func (s *Logger) Write(log *Log) error { - if log.Datasource == "" || log.Endpoint == "" || log.CreatedAt.IsZero() { - return fmt.Errorf("error: log datasource, endpoint and created at are all required") + if log.Connector == "" || len(log.Configuration) == 0 || log.CreatedAt.IsZero() { + return fmt.Errorf("error: log connector, configuration and created at are all required") } if log.Status != inProgress && log.Status != failed && log.Status != done { return fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) } date := log.CreatedAt.Format(time.RFC3339) - docID, err := uuid.Generate(log.Datasource, log.Endpoint, date) + configs, err := json.Marshal(log.Configuration) + if err != nil { + return err + } + docID, err := uuid.Generate(log.Connector, string(configs), date) if err != nil { return err } @@ -80,7 +85,7 @@ func (s *Logger) Write(log *Log) error { } // Read ... -func (s *Logger) Read(datasource string, status string) ([]Log, error) { +func (s *Logger) Read(connector string, status string) ([]Log, error) { if status != inProgress && status != failed && status != done { return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) } @@ -88,8 +93,8 @@ func (s *Logger) Read(datasource string, status string) ([]Log, error) { must := make([]map[string]interface{}, 0) must = append(must, map[string]interface{}{ "term": map[string]interface{}{ - "datasource": map[string]string{ - "value": datasource}, + "connector": map[string]string{ + "value": connector}, }, }) must = append(must, map[string]interface{}{ @@ -120,7 +125,7 @@ func (s *Logger) Read(datasource string, status string) ([]Log, error) { return logs, nil } -func (s *Logger) Count(datasource string, status string) (int, error) { +func (s *Logger) Count(connector string, status string) (int, error) { if status != inProgress && status != failed && status != done { return 0, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) } @@ -128,8 +133,8 @@ func (s *Logger) Count(datasource string, status string) (int, error) { must := make([]map[string]interface{}, 0) must = append(must, map[string]interface{}{ "term": map[string]interface{}{ - "datasource": map[string]string{ - "value": datasource}, + "connector": map[string]string{ + "value": connector}, }, }) must = append(must, map[string]interface{}{ @@ -151,15 +156,14 @@ func (s *Logger) Count(datasource string, status string) (int, error) { func (s *Logger) updateDocument(log Log, index string, docID string) error { doc := map[string]interface{}{ - "datasource": log.Datasource, - "endpoint": log.Endpoint, - "created_at": log.CreatedAt, - "status": log.Status, + "connector": log.Connector, + "configuration": log.Configuration, + "updated_at": time.Now().UTC(), + "status": log.Status, } _, err := s.esClient.UpdateDocument(index, docID, doc) if err != nil { - fmt.Println(index) return err } return nil From 44d94dac9aea5c2b3bcfa3b35574e031d63d843a Mon Sep 17 00:00:00 2001 From: Ayman Date: Thu, 13 Jan 2022 18:58:43 +0200 Subject: [PATCH 08/11] clean up Signed-off-by: Ayman --- ingestjob/logger.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ingestjob/logger.go b/ingestjob/logger.go index a23e5a9..8fa037e 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -11,9 +11,9 @@ import ( const ( logIndex = "insights-job-logging" - inProgress = "inprogress" - failed = "failed" - done = "done" + InProgress = "inprogress" + Failed = "failed" + Done = "done" ) // ESLogProvider used in connecting to ES logging server @@ -45,8 +45,8 @@ func (s *Logger) Write(log *Log) error { if log.Connector == "" || len(log.Configuration) == 0 || log.CreatedAt.IsZero() { return fmt.Errorf("error: log connector, configuration and created at are all required") } - if log.Status != inProgress && log.Status != failed && log.Status != done { - return fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + if log.Status != InProgress && log.Status != Failed && log.Status != Done { + return fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) } date := log.CreatedAt.Format(time.RFC3339) @@ -86,8 +86,8 @@ func (s *Logger) Write(log *Log) error { // Read ... func (s *Logger) Read(connector string, status string) ([]Log, error) { - if status != inProgress && status != failed && status != done { - return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + if status != InProgress && status != Failed && status != Done { + return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) } must := make([]map[string]interface{}, 0) @@ -126,8 +126,8 @@ func (s *Logger) Read(connector string, status string) ([]Log, error) { } func (s *Logger) Count(connector string, status string) (int, error) { - if status != inProgress && status != failed && status != done { - return 0, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", inProgress, failed, done) + if status != InProgress && status != Failed && status != Done { + return 0, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) } must := make([]map[string]interface{}, 0) From b00cb03ae83a9929f847ef5fbd51be8378645b43 Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 18 Jan 2022 16:46:21 +0200 Subject: [PATCH 09/11] add filter functionality Signed-off-by: Ayman --- ingestjob/dto.go | 3 +- ingestjob/logger.go | 95 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/ingestjob/dto.go b/ingestjob/dto.go index 7d330ba..b6c1095 100644 --- a/ingestjob/dto.go +++ b/ingestjob/dto.go @@ -9,8 +9,9 @@ type Log struct { Status string `json:"status"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` - ProjectSlug string `json:"project_slug"` Message string `json:"message"` + From *time.Time `json:"from,omitempty"` + To *time.Time `json:"to,omitempty"` } // TopHits result diff --git a/ingestjob/logger.go b/ingestjob/logger.go index 8fa037e..fc71867 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -3,6 +3,7 @@ package ingestjob import ( "encoding/json" "fmt" + "strings" "time" "github.com/LF-Engineering/insights-datasource-shared/uuid" @@ -10,7 +11,7 @@ import ( ) const ( - logIndex = "insights-job-logging" + logIndex = "insights-connector" InProgress = "inprogress" Failed = "failed" Done = "done" @@ -59,12 +60,15 @@ func (s *Logger) Write(log *Log) error { return err } + if log.UpdatedAt.IsZero() { + log.UpdatedAt = log.CreatedAt + } b, err := jsoniter.Marshal(log) if err != nil { return err } - index := fmt.Sprintf("%s-%s", logIndex, s.environment) + index := fmt.Sprintf("%s-%s-log-%s", logIndex, log.Connector, s.environment) query := map[string]interface{}{ "query": map[string]interface{}{ "term": map[string]interface{}{ @@ -75,7 +79,7 @@ func (s *Logger) Write(log *Log) error { } var res TopHits - err = s.esClient.Get(fmt.Sprintf("%s-%s", logIndex, s.environment), query, &res) + err = s.esClient.Get(fmt.Sprintf("%s-%s-log-%s", logIndex, log.Connector, s.environment), query, &res) if err != nil || len(res.Hits.Hits) == 0 { _, err := s.esClient.CreateDocument(index, docID, b) return err @@ -113,7 +117,7 @@ func (s *Logger) Read(connector string, status string) ([]Log, error) { var res TopHits logs := make([]Log, 0) - err := s.esClient.Get(fmt.Sprintf("%s-%s", logIndex, s.environment), query, &res) + err := s.esClient.Get(fmt.Sprintf("%s-%s-log-%s", logIndex, connector, s.environment), query, &res) if err != nil { return logs, err } @@ -150,8 +154,7 @@ func (s *Logger) Count(connector string, status string) (int, error) { }, }, } - - return s.esClient.Count(fmt.Sprintf("%s-%s", logIndex, s.environment), query) + return s.esClient.Count(fmt.Sprintf("%s-%s-log-%s", logIndex, connector, s.environment), query) } func (s *Logger) updateDocument(log Log, index string, docID string) error { @@ -168,3 +171,83 @@ func (s *Logger) updateDocument(log Log, index string, docID string) error { } return nil } + +func (s *Logger) Filter(log *Log) ([]Log, error) { + if log.Status != InProgress && log.Status != Failed && log.Status != Done { + return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) + } + + must := CreateMustTerms(log) + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": must, + }, + }, + } + + var res TopHits + logs := make([]Log, 0) + err := s.esClient.Get(fmt.Sprintf("%s-%s-log-%s", logIndex, log.Connector, s.environment), query, &res) + if err != nil { + return logs, err + } + + for _, l := range res.Hits.Hits { + logs = append(logs, l.Source) + } + + return logs, nil +} + +func CreateMustTerms(log *Log) []map[string]interface{} { + must := make([]map[string]interface{}, 0) + if log.Connector != "" { + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "connector": map[string]string{ + "value": log.Connector}, + }, + }) + } + + if log.Status != "" { + must = append(must, map[string]interface{}{ + "term": map[string]interface{}{ + "status": map[string]string{ + "value": log.Status}, + }, + }) + } + + if len(log.Configuration) != 0 { + for _, conf := range log.Configuration { + for k, v := range conf { + val := strings.ReplaceAll(v, "/", "\\/") + must = append(must, map[string]interface{}{ + "query_string": map[string]interface{}{ + "default_field": fmt.Sprintf("configuration.%s", k), + "query": val, + }, + }) + } + } + } + + if log.From != nil { + from := log.From.Format(time.RFC3339) + to := "now/d" + if log.To != nil { + to = log.To.Format(time.RFC3339) + } + must = append(must, map[string]interface{}{ + "range": map[string]interface{}{ + "created_at": map[string]string{ + "gte": from, + "lte": to}, + }, + }) + } + + return must +} From e50446022e8c4aa6411716fd6945d6e8fc4c9f9c Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 18 Jan 2022 16:56:33 +0200 Subject: [PATCH 10/11] add generate id func Signed-off-by: Ayman --- ingestjob/logger.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/ingestjob/logger.go b/ingestjob/logger.go index fc71867..ea28730 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -50,12 +50,7 @@ func (s *Logger) Write(log *Log) error { return fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) } - date := log.CreatedAt.Format(time.RFC3339) - configs, err := json.Marshal(log.Configuration) - if err != nil { - return err - } - docID, err := uuid.Generate(log.Connector, string(configs), date) + docID, err := generateID(log) if err != nil { return err } @@ -251,3 +246,16 @@ func CreateMustTerms(log *Log) []map[string]interface{} { return must } + +func generateID(log *Log) (string, error) { + date := log.CreatedAt.Format(time.RFC3339) + configs, err := json.Marshal(log.Configuration) + if err != nil { + return "", err + } + docID, err := uuid.Generate(log.Connector, string(configs), date) + if err != nil { + return "", err + } + return docID, nil +} From a972c5e09a99174c6ab0f4beb67e9e96c87c64b4 Mon Sep 17 00:00:00 2001 From: Ayman Date: Tue, 18 Jan 2022 17:05:58 +0200 Subject: [PATCH 11/11] clean up Signed-off-by: Ayman --- ingestjob/logger.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/ingestjob/logger.go b/ingestjob/logger.go index ea28730..3466362 100644 --- a/ingestjob/logger.go +++ b/ingestjob/logger.go @@ -167,7 +167,11 @@ func (s *Logger) updateDocument(log Log, index string, docID string) error { return nil } +// Filter connector logs based on status, configuration and creation date func (s *Logger) Filter(log *Log) ([]Log, error) { + if log.Connector == "" { + return []Log{}, fmt.Errorf("error: log connector is required") + } if log.Status != InProgress && log.Status != Failed && log.Status != Done { return []Log{}, fmt.Errorf("error: log status must be one of [%s, %s, %s ]", InProgress, Failed, Done) } @@ -197,14 +201,6 @@ func (s *Logger) Filter(log *Log) ([]Log, error) { func CreateMustTerms(log *Log) []map[string]interface{} { must := make([]map[string]interface{}, 0) - if log.Connector != "" { - must = append(must, map[string]interface{}{ - "term": map[string]interface{}{ - "connector": map[string]string{ - "value": log.Connector}, - }, - }) - } if log.Status != "" { must = append(must, map[string]interface{}{