From 3c3ba57a839f7ecc02b922adbf2dd249df240a32 Mon Sep 17 00:00:00 2001 From: khalifapro <81648906+khalifapro@users.noreply.github.com> Date: Sun, 12 Feb 2023 20:45:17 +0200 Subject: [PATCH] cache all pullrequest and issues events (#71) Signed-off-by: Ayman Co-authored-by: Ayman --- cmd/github/github.go | 266 +++++++++++++++++++++++++++---------------- 1 file changed, 167 insertions(+), 99 deletions(-) diff --git a/cmd/github/github.go b/cmd/github/github.go index 84ab3d4..485e10e 100644 --- a/cmd/github/github.go +++ b/cmd/github/github.go @@ -163,7 +163,8 @@ var ( gMaxUpstreamDt time.Time gMaxUpstreamDtMtx = &sync.Mutex{} cachedIssues = make(map[string]ItemCache) - rawItems = make(map[int64]github.Issue) + rawIssues = make(map[int64]github.Issue) + rawPulls = make(map[int64]github.PullRequest) cachedPulls = make(map[string]ItemCache) cachedAssignees = make(map[string][]ItemCache) cachedComments = make(map[string][]ItemCache) @@ -177,6 +178,8 @@ var ( commentReactionsCacheFile = "comment-reactions-cache" reactionsCacheFile = "reactions-cache" reviewersCacheFile = "reviewers-cache" + createdPulls = make(map[string]bool) + createdIssues = make(map[string]bool) ) // Publisher - for streaming data to Kinesis @@ -1324,13 +1327,6 @@ func (j *DSGitHub) githubIssues(ctx *shared.Ctx, org, repo string, since, until return } for _, issue := range issues { - var issID int64 - if issue.ID != nil { - issID = *issue.ID - } - if issue != nil { - rawItems[issID] = *issue - } iss := map[string]interface{}{} jm, _ := jsoniter.Marshal(issue) _ = jsoniter.Unmarshal(jm, &iss) @@ -1814,6 +1810,7 @@ func (j *DSGitHub) githubPull(ctx *shared.Ctx, org, repo string, number int) (pu e error ) pull, response, e = c.PullRequests.Get(j.Context, org, repo, number) + rawPulls[*pull.ID] = *pull if ctx.Debug > 2 { j.log.WithFields(logrus.Fields{"operation": "githubPull"}).Debugf("GET %s/%s/%d -> {%+v, %+v, %+v}", org, repo, number, pull, response, e) } @@ -3044,32 +3041,7 @@ func (j *DSGitHub) FetchItemsIssue(ctx *shared.Ctx) (err error) { eschaMtx *sync.Mutex issProcMtx *sync.Mutex ) - issuesB, err := j.cacheProvider.GetFileByKey(fmt.Sprintf("%s/%s/%s", j.Org, j.Repo, GitHubIssue), issuesCacheFile) - if err != nil { - return - } - reader := csv.NewReader(bytes.NewBuffer(issuesB)) - records, err := reader.ReadAll() - if err != nil { - return - } - for i, record := range records { - if i == 0 { - continue - } - orphaned, err := strconv.ParseBool(record[5]) - if err != nil { - orphaned = false - } - cachedIssues[record[1]] = ItemCache{ - Timestamp: record[0], - EntityID: record[1], - SourceEntityID: record[2], - FileLocation: record[3], - Hash: record[4], - Orphaned: orphaned, - } - } + j.getCachedIssues() err = j.getAssignees(j.Categories[0]) if err != nil { return err @@ -3295,32 +3267,7 @@ func (j *DSGitHub) FetchItemsPullRequest(ctx *shared.Ctx) (err error) { eschaMtx *sync.Mutex pullsProcMtx *sync.Mutex ) - pullsB, err := j.cacheProvider.GetFileByKey(fmt.Sprintf("%s/%s/%s", j.Org, j.Repo, GitHubPullrequest), pullsCacheFile) - if err != nil { - return - } - reader := csv.NewReader(bytes.NewBuffer(pullsB)) - records, err := reader.ReadAll() - if err != nil { - return - } - for i, record := range records { - if i == 0 { - continue - } - orphaned, er := strconv.ParseBool(record[5]) - if er != nil { - orphaned = false - } - cachedPulls[record[1]] = ItemCache{ - Timestamp: record[0], - EntityID: record[1], - SourceEntityID: record[2], - FileLocation: record[3], - Hash: record[4], - Orphaned: orphaned, - } - } + j.getCachedPulls() err = j.getAssignees(j.Categories[0]) if err != nil { @@ -5825,18 +5772,25 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte if len(cacheData) > 0 { for _, c := range cacheData { c.FileLocation = path - cachedIssues[c.EntityID] = c + cachedIssues[c.Hash] = c } } case "closed": - updates, _, err := j.preventUpdateIssueDuplication(v, "closed") + updates, cacheData, err := j.preventUpdateIssueDuplication(v, "closed") if err != nil { j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("preventUpdateIssueDuplication error: %+v", err) return } + path := "" if len(updates) > 0 { ev, _ := v[0].(igh.IssueClosedEvent) - _, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint) + path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint) + } + if len(cacheData) > 0 { + for _, c := range cacheData { + c.FileLocation = path + cachedIssues[c.Hash] = c + } } case "assignee_added": ev, _ := v[0].(igh.IssueAssigneeAddedEvent) @@ -5941,16 +5895,58 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte if len(cacheData) > 0 { for _, c := range cacheData { c.FileLocation = path - cachedPulls[c.EntityID] = c + cachedPulls[c.Hash] = c } } case "closed": ev, _ := v[0].(igh.PullRequestClosedEvent) - _, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint) + path := "" + path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint) + for _, val := range v { + pr := val.(igh.PullRequestClosedEvent).Payload + tStamp := fmt.Sprintf("%v", pr.SyncTimestamp.Unix()) + pr.SyncTimestamp = time.Time{} + b, er := jsoniter.Marshal(pr) + if er != nil { + continue + } + contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) + hashExist := isHashExist(cachedIssues, contentHash) + if !hashExist { + cachedPulls[contentHash] = ItemCache{ + Timestamp: tStamp, + EntityID: pr.ID, + SourceEntityID: pr.ChangeRequestID, + Hash: contentHash, + FileLocation: path, + } + } + } case "merged": ev, _ := v[0].(igh.PullRequestMergedEvent) - _, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint) + path := "" + path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint) + for _, val := range v { + pr := val.(igh.PullRequestMergedEvent).Payload + tStamp := fmt.Sprintf("%v", pr.SyncTimestamp.Unix()) + pr.SyncTimestamp = time.Time{} + b, er := jsoniter.Marshal(pr) + if er != nil { + continue + } + contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) + hashExist := isHashExist(cachedIssues, contentHash) + if !hashExist { + cachedPulls[contentHash] = ItemCache{ + Timestamp: tStamp, + EntityID: pr.ID, + SourceEntityID: pr.ChangeRequestID, + Hash: contentHash, + FileLocation: path, + } + } + } case "assignee_added": ev, _ := v[0].(igh.PullRequestAssigneeAddedEvent) _, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint) @@ -7659,7 +7655,7 @@ func (j *DSGitHub) GetModelDataPullRequest(ctx *shared.Ctx, docs []interface{}) pullRequest.ClosedBy = closedBY } key := "updated" - if isCreated := isParentKeyCreated(cachedPulls, pullRequest.ID); !isCreated { + if isCreated := isParentKeyCreated(createdPulls, pullRequest.ID); !isCreated { key = "created" pullRequest.ChangeRequest.SourceTimestamp = createdOn } @@ -8773,7 +8769,7 @@ func (j *DSGitHub) GetModelDataIssue(ctx *shared.Ctx, docs []interface{}) (data issue.ClosedBy = closedBY } key := "updated" - if isCreated := isParentKeyCreated(cachedIssues, issue.ID); !isCreated { + if isCreated := isParentKeyCreated(createdIssues, issue.ID); !isCreated { key = "created" issue.Issue.SourceTimestamp = createdOn } @@ -8899,13 +8895,13 @@ func (j *DSGitHub) cacheCreatedPullrequest(v []interface{}, path string) error { for _, val := range v { pr := val.(igh.PullRequestCreatedEvent).Payload prID, err := strconv.ParseInt(pr.ChangeRequestID, 10, 64) - rawPull := rawItems[prID] + rawPull := rawPulls[prID] b, err := json.Marshal(rawPull) if err != nil { return err } contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) - cachedPulls[pr.ID] = ItemCache{ + cachedPulls[contentHash] = ItemCache{ Timestamp: fmt.Sprintf("%v", pr.SyncTimestamp.Unix()), EntityID: pr.ID, SourceEntityID: pr.ChangeRequestID, @@ -8913,6 +8909,7 @@ func (j *DSGitHub) cacheCreatedPullrequest(v []interface{}, path string) error { Hash: contentHash, Orphaned: false, } + createdPulls[pr.ID] = true } return nil } @@ -8932,22 +8929,22 @@ func (j *DSGitHub) preventUpdatePullrequestDuplication(v []interface{}, event st default: return updates, cacheData, fmt.Errorf("event: %s is not recognized", event) } - issID, err := strconv.ParseInt(pr.ChangeRequestID, 10, 64) - if err != nil { - return updates, cacheData, err - } - rawIssue := rawItems[issID] - b, err := json.Marshal(rawIssue) - if err != nil { - return updates, cacheData, err + tStamp := fmt.Sprintf("%v", pr.SyncTimestamp.Unix()) + pr.SyncTimestamp = time.Time{} + b, er := jsoniter.Marshal(pr) + if er != nil { + continue } contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) - pull := cachedPulls[pr.ID] - - if contentHash != pull.Hash { - pull.Hash = contentHash + hashExist := isHashExist(cachedIssues, contentHash) + if !hashExist { updates = append(updates, val) - cacheData = append(cacheData, pull) + cacheData = append(cacheData, ItemCache{ + Timestamp: tStamp, + EntityID: pr.ID, + SourceEntityID: pr.ChangeRequestID, + Hash: contentHash, + }) } } return updates, cacheData, nil @@ -8960,13 +8957,13 @@ func (j *DSGitHub) cacheCreatedIssues(v []interface{}, path string) error { if err != nil { return err } - rawIssue := rawItems[issID] + rawIssue := rawIssues[issID] b, err := json.Marshal(rawIssue) if err != nil { return err } contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) - cachedIssues[issue.ID] = ItemCache{ + cachedIssues[contentHash] = ItemCache{ Timestamp: fmt.Sprintf("%v", issue.SyncTimestamp.Unix()), EntityID: issue.ID, SourceEntityID: issue.IssueID, @@ -8974,6 +8971,7 @@ func (j *DSGitHub) cacheCreatedIssues(v []interface{}, path string) error { Hash: contentHash, Orphaned: false, } + createdIssues[issue.ID] = true } return nil } @@ -8991,22 +8989,22 @@ func (j *DSGitHub) preventUpdateIssueDuplication(v []interface{}, event string) default: return updates, cacheData, fmt.Errorf("event: %s is not recognized", event) } - issID, err := strconv.ParseInt(issue.IssueID, 10, 64) - if err != nil { - return updates, cacheData, err - } - rawIssue := rawItems[issID] - b, err := json.Marshal(rawIssue) - if err != nil { - return updates, cacheData, err + tStamp := fmt.Sprintf("%v", issue.SyncTimestamp.Unix()) + issue.SyncTimestamp = time.Time{} + b, er := jsoniter.Marshal(issue) + if er != nil { + continue } contentHash := fmt.Sprintf("%x", sha256.Sum256(b)) - iss := cachedIssues[issue.ID] - - if contentHash != iss.Hash { - iss.Hash = contentHash + hashExist := isHashExist(cachedIssues, contentHash) + if !hashExist { updates = append(updates, val) - cacheData = append(cacheData, iss) + cacheData = append(cacheData, ItemCache{ + Timestamp: tStamp, + EntityID: issue.ID, + SourceEntityID: issue.IssueID, + Hash: contentHash, + }) } } return updates, cacheData, nil @@ -9018,6 +9016,7 @@ func (j *DSGitHub) getClosedBy(ctx *shared.Ctx, id int, org string) (*insights.C if err != nil { return nil, err } + rawIssues[*iss.ID] = *iss if iss.ClosedBy == nil { return nil, nil } @@ -9138,7 +9137,68 @@ func (j *DSGitHub) getReactions(category string) error { return nil } -func isParentKeyCreated(element map[string]ItemCache, id string) bool { +func (j *DSGitHub) getCachedPulls() { + pullsB, err := j.cacheProvider.GetFileByKey(fmt.Sprintf("%s/%s/%s", j.Org, j.Repo, GitHubPullrequest), pullsCacheFile) + if err != nil { + return + } + reader := csv.NewReader(bytes.NewBuffer(pullsB)) + records, err := reader.ReadAll() + if err != nil { + return + } + for i, record := range records { + if i == 0 { + continue + } + orphaned, er := strconv.ParseBool(record[5]) + if er != nil { + orphaned = false + } + cachedPulls[record[4]] = ItemCache{ + Timestamp: record[0], + EntityID: record[1], + SourceEntityID: record[2], + FileLocation: record[3], + Hash: record[4], + Orphaned: orphaned, + } + createdPulls[record[1]] = true + } +} + +func (j *DSGitHub) getCachedIssues() { + issuesB, err := j.cacheProvider.GetFileByKey(fmt.Sprintf("%s/%s/%s", j.Org, j.Repo, GitHubIssue), issuesCacheFile) + if err != nil { + return + } + reader := csv.NewReader(bytes.NewBuffer(issuesB)) + records, err := reader.ReadAll() + if err != nil { + return + } + for i, record := range records { + if i == 0 { + continue + } + orphaned, err := strconv.ParseBool(record[5]) + if err != nil { + orphaned = false + } + cachedIssues[record[4]] = ItemCache{ + Timestamp: record[0], + EntityID: record[1], + SourceEntityID: record[2], + FileLocation: record[3], + Hash: record[4], + Orphaned: orphaned, + } + createdIssues[record[1]] = true + + } +} + +func isParentKeyCreated(element map[string]bool, id string) bool { _, ok := element[id] if ok { return true @@ -9146,6 +9206,14 @@ func isParentKeyCreated(element map[string]ItemCache, id string) bool { return false } +func isHashExist(element map[string]ItemCache, hash string) bool { + _, ok := element[hash] + if ok { + return true + } + return false +} + func (j *DSGitHub) updateRemoteCache(cacheFile string, cacheType string) error { records := [][]string{ {"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},