Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #17 from LF-Engineering/remove-duplicated-events
Browse files Browse the repository at this point in the history
Prevent duplicated update events
  • Loading branch information
khalifapro authored Aug 29, 2022
2 parents 459ac79 + 4c8ffd1 commit 66e617e
Showing 1 changed file with 102 additions and 12 deletions.
114 changes: 102 additions & 12 deletions cmd/confluence/confluence.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -47,6 +49,7 @@ const (
ConfluenceAddHistoryCreatedByRole = false
// ConfluenceAddHistoryLastUpdatedByRole - should we add contributor for history->lastUpdatedBy page version edit?
ConfluenceAddHistoryLastUpdatedByRole = false
contentHashField = "contentHash"
)

var (
Expand Down Expand Up @@ -1287,7 +1290,7 @@ func (j *DSConfluence) GetModelData(ctx *shared.Ctx, docs []interface{}) (data m
cacheID := fmt.Sprintf("content-%s", confluenceContentID)
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, cacheID)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "GetModelDataPullRequest"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
return data, err
}
key := "updated"
Expand Down Expand Up @@ -1337,26 +1340,37 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
case "created":
ev, _ := v[0].(insightsConf.ContentCreatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
for _, val := range v {
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
d = append(d, map[string]interface{}{
"id": id,
"data": "",
})
cacheData, err := j.cachedCreatedContent(v)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("cachedCreatedContent error: %+v", err)
return
}
d = append(d, cacheData...)
case "updated":
ev, _ := v[0].(insightsConf.ContentUpdatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
updates, cacheData, err := j.preventUpdateDuplication(v)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("preventUpdateDuplication error: %+v", err)
return
}
if len(cacheData) > 0 {
d = append(d, cacheData...)
}
if len(updates) > 0 {
ev, _ := updates[0].(insightsConf.ContentUpdatedEvent)
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
}
default:
err = fmt.Errorf("unknown confluence event type '%s'", k)
}
if err != nil {
break
}
}
err = j.cacheProvider.Create(j.endpoint, d)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
if len(d) > 0 {
err = j.cacheProvider.Create(j.endpoint, d)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
}
}
} else {
jsonBytes, err = jsoniter.Marshal(data)
Expand Down Expand Up @@ -1529,3 +1543,79 @@ func (j *DSConfluence) AddCacheProvider() {
j.cacheProvider = *cacheProvider
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "http://"), "/", "-")
}

func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]interface{}, error) {
cacheData := make([]map[string]interface{}, 0)
for _, val := range v {
content := val.(insightsConf.ContentCreatedEvent).Payload
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
c := insightsConf.Content{
ID: content.ID,
EndpointID: content.EndpointID,
Space: content.Space,
ServerURL: content.ServerURL,
ContentID: content.ContentID,
ContentURL: content.ContentURL,
Type: content.Type,
Title: content.Title,
Body: content.Body,
Contributors: content.Contributors,
Children: content.Children,
}
b, err := json.Marshal(c)
if err != nil {
return cacheData, err
}
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
cacheData = append(cacheData, map[string]interface{}{
"id": id,
"data": map[string]interface{}{
contentHashField: contentHash,
},
})
}
return cacheData, nil
}

func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, []map[string]interface{}, error) {
updatedVals := make([]interface{}, 0, len(v))
cacheData := make([]map[string]interface{}, 0)
for _, val := range v {
content := val.(insightsConf.ContentUpdatedEvent).Payload
c := insightsConf.Content{
ID: content.ID,
EndpointID: content.EndpointID,
Space: content.Space,
ServerURL: content.ServerURL,
ContentID: content.ContentID,
ContentURL: content.ContentURL,
Type: content.Type,
Title: content.Title,
Body: content.Body,
Contributors: content.Contributors,
Children: content.Children,
}
b, err := json.Marshal(c)
if err != nil {
return updatedVals, cacheData, nil
}
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
cacheID := fmt.Sprintf("content-%s", content.ID)
byt, err := j.cacheProvider.GetFileByKey(j.endpoint, cacheID)
if err != nil {
return updatedVals, cacheData, nil
}
cachedHash := make(map[string]interface{})
err = json.Unmarshal(byt, &cachedHash)
if contentHash != cachedHash["contentHash"] {
updatedVals = append(updatedVals, val)
cacheData = append(cacheData, map[string]interface{}{
"id": cacheID,
"data": map[string]interface{}{
contentHashField: contentHash,
},
})
}
}
return updatedVals, cacheData, nil
}

0 comments on commit 66e617e

Please sign in to comment.