Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #13 from LF-Engineering/clean-up
Browse files Browse the repository at this point in the history
Add job logging functionalities
  • Loading branch information
khalifapro authored Jan 19, 2022
2 parents 96c4ff0 + a972c5e commit 63a9507
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 6 deletions.
2 changes: 1 addition & 1 deletion datasourceStatus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"github.com/LF-Engineering/dev-analytics-libraries/uuid"
"github.com/LF-Engineering/insights-datasource-shared/uuid"
jsoniter "github.com/json-iterator/go"
)

Expand Down
3 changes: 0 additions & 3 deletions elastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,6 @@ func (p *ClientProvider) Search(index string, query map[string]interface{}) ([]b
p.client.Search.WithBody(&buf),
)
if err != nil {
if strings.Contains(err.Error(), "server is not Elasticsearch") {
fmt.Println("esssssss", p.params)
}
return nil, err
}

Expand Down
19 changes: 19 additions & 0 deletions elasticgap/dto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package elasticgap

// ElasticResponse ...
type ElasticResponse struct {
Took int
Errors bool
Items []ElasticResponseItem
}

// ElasticResponseItem ...
type ElasticResponseItem struct {
Index ESResponseIndex
}

// ESResponseIndex ...
type ESResponseIndex struct {
ID string `json:"_id"`
Status int
}
92 changes: 92 additions & 0 deletions elasticgap/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package elasticgap

import (
b64 "encoding/base64"
"fmt"

"github.com/LF-Engineering/insights-datasource-shared/elastic"
jsoniter "github.com/json-iterator/go"
)

// HTTPClient used in connecting to remote http server
type HTTPClient interface {
Request(url string, method string, header map[string]string, body []byte, params map[string]string) (statusCode int, resBody []byte, err error)
}

// Auth0Client ...
type Auth0Client interface {
GetToken() (string, error)
}

// GapHandler ...
type GapHandler struct {
gapURL string
httpClient HTTPClient
auth0Client Auth0Client
}

// NewGapHandler ...
func NewGapHandler(gapURL string, httpClient HTTPClient, auth0Client Auth0Client) *GapHandler {
return &GapHandler{
gapURL: gapURL,
httpClient: httpClient,
auth0Client: auth0Client,
}
}

// Send unsaved data to data-gap handler
func (g *GapHandler) Send(data []elastic.BulkData) error {
token, err := g.auth0Client.GetToken()
if err != nil {
return err
}

byteData, err := jsoniter.Marshal(data)
if err != nil {
return err
}

dataEnc := b64.StdEncoding.EncodeToString(byteData)
gapBody := map[string]map[string]string{"index": {"content": dataEnc}}
bData, err := jsoniter.Marshal(gapBody)
if err != nil {
return err
}

header := make(map[string]string)
header["Authorization"] = fmt.Sprintf("Bearer %s", token)

if g.gapURL != "" {
_, _, err = g.httpClient.Request(g.gapURL, "POST", header, bData, nil)
if err != nil {
return err
}
}

return nil
}

// HandleFailedData ...
func (g *GapHandler) HandleFailedData(data []elastic.BulkData, byteResponse []byte) (failedIndexes []elastic.BulkData, err error) {
var esRes ElasticResponse
err = jsoniter.Unmarshal(byteResponse, &esRes)
if err != nil {
return failedIndexes, err
}

// loop throw elastic response to get failed indexes
for _, item := range esRes.Items {
if item.Index.Status != 200 {
var singleBulk elastic.BulkData
// loop throw real data to get failed ones
for _, el := range data {
if el.ID == item.Index.ID {
singleBulk = el
break
}
}
failedIndexes = append(failedIndexes, singleBulk)
}
}
return failedIndexes, nil
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ module github.com/LF-Engineering/insights-datasource-shared
go 1.15

require (
github.com/LF-Engineering/dev-analytics-libraries v1.1.28
github.com/avast/retry-go v3.0.0+incompatible
github.com/aws/aws-sdk-go v1.42.24
github.com/aws/aws-sdk-go-v2 v1.11.2
github.com/aws/aws-sdk-go-v2/config v1.6.0
github.com/aws/aws-sdk-go-v2/service/firehose v1.4.2
github.com/aws/aws-sdk-go-v2/service/ssm v1.17.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674
github.com/google/uuid v1.3.0
github.com/json-iterator/go v1.1.11
github.com/pkg/errors v0.9.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs=
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674 h1:heH4w5l/KFP4Ry9Xp4+jbRx0Wn+TJD7+HlyoMJE4LvQ=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674/go.mod h1:xe9a/L2aeOgFKKgrO3ibQTnMdpAeL0GC+5/HpGScSa4=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e h1:UzG3rbPYrsey+mmAtyHR3RoL4T4jTBvMF5bCp+RSwPI=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20211220171217-6cdebcf1b94e/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k=
Expand Down
31 changes: 31 additions & 0 deletions ingestjob/dto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ingestjob

import "time"

// Log ...
type Log struct {
Connector string `json:"connector"`
Configuration []map[string]string `json:"configuration"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Message string `json:"message"`
From *time.Time `json:"from,omitempty"`
To *time.Time `json:"to,omitempty"`
}

// TopHits result
type TopHits struct {
Hits Hits `json:"hits"`
}

// Hits result
type Hits struct {
Hits []NestedHits `json:"hits"`
}

// NestedHits is the actual hit data
type NestedHits struct {
ID string `json:"_id"`
Source Log `json:"_source"`
}
Loading

0 comments on commit 63a9507

Please sign in to comment.