Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Use new last sync file (#80)
Browse files Browse the repository at this point in the history
* use new last sync file

Signed-off-by: Ayman <[email protected]>

* use shared latest version

Signed-off-by: Ayman <[email protected]>

* enable multi thread

Signed-off-by: Ayman <[email protected]>

---------

Signed-off-by: Ayman <[email protected]>
Co-authored-by: Ayman <[email protected]>
  • Loading branch information
khalifapro and enkhalifapro authored Feb 28, 2023
1 parent 15cd98c commit 1ca6446
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 14 deletions.
103 changes: 92 additions & 11 deletions cmd/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -1862,9 +1862,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Infof("%s", string(jsonBytes))
}
*docs = []interface{}{}
gMaxUpstreamDtMtx.Lock()
defer gMaxUpstreamDtMtx.Unlock()
err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt)
err = j.setLastSync(ctx)
if err != nil {
return
}
Expand Down Expand Up @@ -2438,7 +2436,7 @@ func (j *DSGit) ParseNextCommit(ctx *shared.Ctx) (commit map[string]interface{},

// Sync - sync git data source
func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
thrN := 1 //shared.GetThreadsNum(ctx)
thrN := shared.GetThreadsNum(ctx)
lastSync := os.Getenv("LAST_SYNC")
if lastSync != "" {
i, err := strconv.ParseInt(lastSync, 10, 64)
Expand All @@ -2452,12 +2450,24 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
}
if ctx.DateFrom == nil {
cachedLastSync, er := j.cacheProvider.GetLastSync(j.endpoint)
lastSyncDataB, er := j.cacheProvider.GetLastSyncFile(j.endpoint)
if er != nil {
err = er
return
}
ctx.DateFrom = &cachedLastSync
var lastSyncData lastSyncFile
if er = json.Unmarshal(lastSyncDataB, &lastSyncData); er != nil {
var cachedLastSync time.Time
err = json.Unmarshal(lastSyncDataB, &cachedLastSync)
if err != nil {
err = er
return
}
lastSyncData = lastSyncFile{
LastSync: cachedLastSync,
}
}
ctx.DateFrom = &lastSyncData.LastSync
if ctx.DateFrom != nil {
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
}
Expand Down Expand Up @@ -2737,11 +2747,7 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
j.handleDataLakeOrphans()
}
// NOTE: Non-generic ends here
gMaxUpstreamDtMtx.Lock()
defer gMaxUpstreamDtMtx.Unlock()
if !gMaxUpstreamDt.IsZero() {
err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt)
}
err = j.setLastSync(ctx)
return
}

Expand Down Expand Up @@ -3026,6 +3032,74 @@ func createHash(content git.Commit) (string, error) {
return contentHash, err
}

func (j *DSGit) setLastSync(ctx *shared.Ctx) error {
commitsCount, err := j.getCommitsCount(ctx)
if err != nil {
return err
}

commitID, err := j.getHead(ctx)
if err != nil {
return err
}

gMaxUpstreamDtMtx.Lock()
defer gMaxUpstreamDtMtx.Unlock()

lastSyncData := lastSyncFile{
LastSync: gMaxUpstreamDt,
Target: commitsCount,
Total: len(createdCommits),
Head: commitID,
}

lastSyncDataB, err := jsoniter.Marshal(lastSyncData)
if err != nil {
return err
}

if !gMaxUpstreamDt.IsZero() {
err = j.cacheProvider.SetLastSyncFile(j.endpoint, lastSyncDataB)
if err != nil {
return err
}
}

return nil
}

func (j *DSGit) getCommitsCount(ctx *shared.Ctx) (int, error) {
count := 0
cmdLine := []string{"git", "rev-list", "--count", j.DefaultBranch}
sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr)
return count, err
}
result := strings.TrimSpace(sout)
count, err = strconv.Atoi(result)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error converting: %v, to int error: %v", result, err)
return count, err
}
return count, nil
}

func (j *DSGit) getHead(ctx *shared.Ctx) (string, error) {
if ctx.Debug > 0 {
j.log.WithFields(logrus.Fields{"operation": "getHead"}).Debugf("parsing logs from %s", j.GitPath)
}
// git rev-parse HEAD
cmdLine := []string{"git", "rev-parse", "head"}
sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv)
if err != nil {
j.log.WithFields(logrus.Fields{"operation": "getHead"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr)
return "", err
}
commitID := strings.TrimSpace(sout)
return commitID, nil
}

// CommitCache single commit cache schema
type CommitCache struct {
Timestamp string `json:"timestamp"`
Expand Down Expand Up @@ -3056,3 +3130,10 @@ type clocResult struct {
Comment int `json:"comment"`
NumberOfFiles int `json:"nFiles"`
}

type lastSyncFile struct {
LastSync time.Time `json:"last_sync"`
Target int `json:"target,omitempty"`
Total int `json:"total,omitempty"`
Head string `json:"head,omitempty"`
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/LF-Engineering/insights-datasource-git
go 1.17

require (
github.com/LF-Engineering/insights-datasource-shared v1.5.21
github.com/LF-Engineering/insights-datasource-shared v1.5.26
github.com/LF-Engineering/lfx-event-schema v0.1.37
github.com/aws/aws-lambda-go v1.27.1
github.com/aws/aws-sdk-go v1.42.25
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/DataDog/datadog-go/v5 v5.0.2 h1:UFtEe7662/Qojxkw1d6SboAeA0CPI3naKhVAS
github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU=
github.com/DataDog/sketches-go v1.2.1 h1:qTBzWLnZ3kM2kw39ymh6rMcnN+5VULwFs++lEYUUsro=
github.com/DataDog/sketches-go v1.2.1/go.mod h1:1xYmPLY1So10AwxV6MJV0J53XVH+WL9Ad1KetxVivVI=
github.com/LF-Engineering/insights-datasource-shared v1.5.21 h1:cZHytRoA5pZHsQlpeasV5K2b2jZ66ikylvNvzlBSj9s=
github.com/LF-Engineering/insights-datasource-shared v1.5.21/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
github.com/LF-Engineering/insights-datasource-shared v1.5.26 h1:zA4Vc/gTjcVJPr4uARisSl4MQiF1Br9XT9i71G5Gza0=
github.com/LF-Engineering/insights-datasource-shared v1.5.26/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg=
github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
Expand Down

0 comments on commit 1ca6446

Please sign in to comment.