diff --git a/cmd/git/git.go b/cmd/git/git.go index f429be9..27bad61 100644 --- a/cmd/git/git.go +++ b/cmd/git/git.go @@ -1862,9 +1862,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Infof("%s", string(jsonBytes)) } *docs = []interface{}{} - gMaxUpstreamDtMtx.Lock() - defer gMaxUpstreamDtMtx.Unlock() - err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt) + err = j.setLastSync(ctx) if err != nil { return } @@ -2438,7 +2436,7 @@ func (j *DSGit) ParseNextCommit(ctx *shared.Ctx) (commit map[string]interface{}, // Sync - sync git data source func (j *DSGit) Sync(ctx *shared.Ctx) (err error) { - thrN := 1 //shared.GetThreadsNum(ctx) + thrN := shared.GetThreadsNum(ctx) lastSync := os.Getenv("LAST_SYNC") if lastSync != "" { i, err := strconv.ParseInt(lastSync, 10, 64) @@ -2452,12 +2450,24 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) { j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching from %v (%d threads)", j.URL, ctx.DateFrom, thrN) } if ctx.DateFrom == nil { - cachedLastSync, er := j.cacheProvider.GetLastSync(j.endpoint) + lastSyncDataB, er := j.cacheProvider.GetLastSyncFile(j.endpoint) if er != nil { err = er return } - ctx.DateFrom = &cachedLastSync + var lastSyncData lastSyncFile + if er = json.Unmarshal(lastSyncDataB, &lastSyncData); er != nil { + var cachedLastSync time.Time + err = json.Unmarshal(lastSyncDataB, &cachedLastSync) + if err != nil { + err = er + return + } + lastSyncData = lastSyncFile{ + LastSync: cachedLastSync, + } + } + ctx.DateFrom = &lastSyncData.LastSync if ctx.DateFrom != nil { j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN) } @@ -2737,11 +2747,7 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) { j.handleDataLakeOrphans() } // NOTE: Non-generic ends here - gMaxUpstreamDtMtx.Lock() - defer gMaxUpstreamDtMtx.Unlock() - if !gMaxUpstreamDt.IsZero() { - err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt) - } + err = j.setLastSync(ctx) return } @@ -3026,6 +3032,74 @@ func createHash(content git.Commit) (string, error) { return contentHash, err } +func (j *DSGit) setLastSync(ctx *shared.Ctx) error { + commitsCount, err := j.getCommitsCount(ctx) + if err != nil { + return err + } + + commitID, err := j.getHead(ctx) + if err != nil { + return err + } + + gMaxUpstreamDtMtx.Lock() + defer gMaxUpstreamDtMtx.Unlock() + + lastSyncData := lastSyncFile{ + LastSync: gMaxUpstreamDt, + Target: commitsCount, + Total: len(createdCommits), + Head: commitID, + } + + lastSyncDataB, err := jsoniter.Marshal(lastSyncData) + if err != nil { + return err + } + + if !gMaxUpstreamDt.IsZero() { + err = j.cacheProvider.SetLastSyncFile(j.endpoint, lastSyncDataB) + if err != nil { + return err + } + } + + return nil +} + +func (j *DSGit) getCommitsCount(ctx *shared.Ctx) (int, error) { + count := 0 + cmdLine := []string{"git", "rev-list", "--count", j.DefaultBranch} + sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv) + if err != nil { + j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr) + return count, err + } + result := strings.TrimSpace(sout) + count, err = strconv.Atoi(result) + if err != nil { + j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error converting: %v, to int error: %v", result, err) + return count, err + } + return count, nil +} + +func (j *DSGit) getHead(ctx *shared.Ctx) (string, error) { + if ctx.Debug > 0 { + j.log.WithFields(logrus.Fields{"operation": "getHead"}).Debugf("parsing logs from %s", j.GitPath) + } + // git rev-parse HEAD + cmdLine := []string{"git", "rev-parse", "head"} + sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv) + if err != nil { + j.log.WithFields(logrus.Fields{"operation": "getHead"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr) + return "", err + } + commitID := strings.TrimSpace(sout) + return commitID, nil +} + // CommitCache single commit cache schema type CommitCache struct { Timestamp string `json:"timestamp"` @@ -3056,3 +3130,10 @@ type clocResult struct { Comment int `json:"comment"` NumberOfFiles int `json:"nFiles"` } + +type lastSyncFile struct { + LastSync time.Time `json:"last_sync"` + Target int `json:"target,omitempty"` + Total int `json:"total,omitempty"` + Head string `json:"head,omitempty"` +} diff --git a/go.mod b/go.mod index d447442..3a8d4e4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/LF-Engineering/insights-datasource-git go 1.17 require ( - github.com/LF-Engineering/insights-datasource-shared v1.5.21 + github.com/LF-Engineering/insights-datasource-shared v1.5.26 github.com/LF-Engineering/lfx-event-schema v0.1.37 github.com/aws/aws-lambda-go v1.27.1 github.com/aws/aws-sdk-go v1.42.25 diff --git a/go.sum b/go.sum index d177493..65e0df3 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,8 @@ github.com/DataDog/datadog-go/v5 v5.0.2 h1:UFtEe7662/Qojxkw1d6SboAeA0CPI3naKhVAS github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU= github.com/DataDog/sketches-go v1.2.1 h1:qTBzWLnZ3kM2kw39ymh6rMcnN+5VULwFs++lEYUUsro= github.com/DataDog/sketches-go v1.2.1/go.mod h1:1xYmPLY1So10AwxV6MJV0J53XVH+WL9Ad1KetxVivVI= -github.com/LF-Engineering/insights-datasource-shared v1.5.21 h1:cZHytRoA5pZHsQlpeasV5K2b2jZ66ikylvNvzlBSj9s= -github.com/LF-Engineering/insights-datasource-shared v1.5.21/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8= +github.com/LF-Engineering/insights-datasource-shared v1.5.26 h1:zA4Vc/gTjcVJPr4uARisSl4MQiF1Br9XT9i71G5Gza0= +github.com/LF-Engineering/insights-datasource-shared v1.5.26/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8= github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs= github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg= github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=