Skip to content

Commit

Permalink
Consider high risk evaluation result from other unrelated prs
Browse files Browse the repository at this point in the history
  • Loading branch information
xueqzhan committed Feb 5, 2025
1 parent 7ac645c commit 5e6cb1b
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 15 deletions.
22 changes: 21 additions & 1 deletion cmd/sippy-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/openshift/sippy/pkg/bigquery"
"github.com/openshift/sippy/pkg/dataloader/prowloader/gcs"
"github.com/openshift/sippy/pkg/dataloader/prowloader/github"
"github.com/openshift/sippy/pkg/flags"
Expand All @@ -20,6 +22,8 @@ import (
var logLevel = "info"

type SippyDaemonFlags struct {
BigQueryFlags *flags.BigQueryFlags
CacheFlags *flags.CacheFlags
DBFlags *flags.PostgresFlags
GoogleCloudFlags *flags.GoogleCloudFlags

Expand All @@ -29,13 +33,17 @@ type SippyDaemonFlags struct {

func NewSippyDaemonFlags() *SippyDaemonFlags {
return &SippyDaemonFlags{
BigQueryFlags: flags.NewBigQueryFlags(),
CacheFlags: flags.NewCacheFlags(),
DBFlags: flags.NewPostgresDatabaseFlags(),
GithubCommenterFlags: flags.NewGithubCommenterFlags(),
GoogleCloudFlags: flags.NewGoogleCloudFlags(),
}
}

func (f *SippyDaemonFlags) BindFlags(fs *pflag.FlagSet) {
f.BigQueryFlags.BindFlags(fs)
f.CacheFlags.BindFlags(fs)
f.DBFlags.BindFlags(fs)
f.GithubCommenterFlags.BindFlags(fs)
f.GoogleCloudFlags.BindFlags(fs)
Expand Down Expand Up @@ -68,6 +76,18 @@ func NewSippyDaemonCommand() *cobra.Command {
return nil
}

cacheClient, err := f.CacheFlags.GetCacheClient()
if err != nil {
return errors.WithMessage(err, "couldn't get cache client")
}

var bigQueryClient *bigquery.Client
bigQueryClient, err = f.BigQueryFlags.GetBigQueryClient(context.Background(),
cacheClient, f.GoogleCloudFlags.ServiceAccountCredentialFile)
if err != nil {
return errors.WithMessage(err, "couldn't get bigquery client")
}

gcsClient, err := gcs.NewGCSClient(context.TODO(),
f.GoogleCloudFlags.ServiceAccountCredentialFile,
f.GoogleCloudFlags.OAuthClientCredentialFile,
Expand All @@ -83,7 +103,7 @@ func NewSippyDaemonCommand() *cobra.Command {
// could lower to 3 seconds if we need, most writes likely won't have to delete
processes = append(processes, sippyserver.NewWorkProcessor(dbc,
gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket),
10, 5*time.Minute, 5*time.Second, ghCommenter, f.GithubCommenterFlags.CommentProcessingDryRun))
10, bigQueryClient, 5*time.Minute, 5*time.Second, ghCommenter, f.GithubCommenterFlags.CommentProcessingDryRun))
}

daemonServer := sippyserver.NewDaemonServer(processes)
Expand Down
86 changes: 77 additions & 9 deletions pkg/api/job_runs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"errors"
"fmt"
"net/http"
Expand All @@ -9,8 +10,12 @@ import (
"strings"
"time"

bqlib "cloud.google.com/go/bigquery"
"google.golang.org/api/iterator"

"github.com/hashicorp/go-version"
apitype "github.com/openshift/sippy/pkg/apis/api"
"github.com/openshift/sippy/pkg/bigquery"
"github.com/openshift/sippy/pkg/db"
"github.com/openshift/sippy/pkg/db/models"
"github.com/openshift/sippy/pkg/db/query"
Expand Down Expand Up @@ -106,6 +111,7 @@ func FetchJobRun(dbc *db.DB, jobRunID int64, logger *log.Entry) (*models.ProwJob
// Load the ProwJobRun, ProwJob, and failed tests:
// TODO: we may want to expand to analyzing flakes here in the future
res := dbc.DB.Joins("ProwJob").
Preload("PullRequests").
Preload("Tests", "status = 12").
Preload("Tests.Test").
Preload("Tests.Suite").First(jobRun, jobRunID)
Expand Down Expand Up @@ -160,7 +166,7 @@ func findReleaseMatchJobNames(dbc *db.DB, jobRun *models.ProwJobRun, compareRele
}

if len(jobs) > 0 {
logger.Infof("Found %d matches with: %s", len(jobs), name)
logger.Infof("Found %d matches with: %s\njobs %+v", len(jobs), name, jobs)

// the first hit we get
// compare the variants
Expand All @@ -182,6 +188,7 @@ func findReleaseMatchJobNames(dbc *db.DB, jobRun *models.ProwJobRun, compareRele
}

gosort.Strings(job.Variants)
log.Infof("variants %+v\njob variants %+v", variants, job.Variants)
if stringSlicesEqual(variants, job.Variants) {

jobIDs, err := query.ProwJobRunIDs(dbc, job.ID)
Expand Down Expand Up @@ -223,7 +230,7 @@ func joinSegments(segments []string, start int, separator string) string {

// JobRunRiskAnalysis checks the test failures and linked bugs for a job run, and reports back an estimated
// risk level for each failed test, and the job run overall.
func JobRunRiskAnalysis(dbc *db.DB, jobRun *models.ProwJobRun, jobRunTestCount int, logger *log.Entry) (apitype.ProwJobRunRiskAnalysis, error) {
func JobRunRiskAnalysis(dbc *db.DB, bqc *bigquery.Client, jobRun *models.ProwJobRun, jobRunTestCount int, logger *log.Entry, compareOtherPRs bool) (apitype.ProwJobRunRiskAnalysis, error) {

// If this job is a Presubmit, compare to test results from master, not presubmits, which may perform
// worse due to dev code that hasn't merged. We do not presently track presubmits on branches other than
Expand Down Expand Up @@ -279,6 +286,7 @@ func JobRunRiskAnalysis(dbc *db.DB, jobRun *models.ProwJobRun, jobRunTestCount i
logger.WithError(err).Errorf("Failed to find matching jobIds for: %s", jobRun.ProwJob.Name)
}
}
logger.Infof("Found %d unfilered matching jobs %d runs for: %s\njobs %+v", len(jobNames), totalJobRuns, jobRun.ProwJob.Name, jobNames)

if totalJobRuns < 20 {
// go back to the prior release and get more jobIds to compare against
Expand Down Expand Up @@ -329,8 +337,8 @@ func JobRunRiskAnalysis(dbc *db.DB, jobRun *models.ProwJobRun, jobRunTestCount i
}
}

return runJobRunAnalysis(jobRun, compareRelease, jobRunTestCount, historicalCount, neverStableJob, jobNames, logger.WithField("func", "runJobRunAnalysis"),
jobNamesTestResultFunc(dbc), variantsTestResultFunc(dbc))
return runJobRunAnalysis(bqc, jobRun, compareRelease, jobRunTestCount, historicalCount, neverStableJob, jobNames, logger.WithField("func", "runJobRunAnalysis"),
jobNamesTestResultFunc(dbc), variantsTestResultFunc(dbc), compareOtherPRs)
}

// testResultsByJobNameFunc is used for injecting db responses in unit tests.
Expand Down Expand Up @@ -411,8 +419,8 @@ func variantsTestResultFunc(dbc *db.DB) testResultsByVariantsFunc {
}
}

func runJobRunAnalysis(jobRun *models.ProwJobRun, compareRelease string, jobRunTestCount int, historicalRunTestCount int, neverStableJob bool, jobNames []string, logger *log.Entry,
testResultsJobNameFunc testResultsByJobNameFunc, testResultsVariantsFunc testResultsByVariantsFunc) (apitype.ProwJobRunRiskAnalysis, error) {
func runJobRunAnalysis(bqc *bigquery.Client, jobRun *models.ProwJobRun, compareRelease string, jobRunTestCount int, historicalRunTestCount int, neverStableJob bool, jobNames []string, logger *log.Entry,
testResultsJobNameFunc testResultsByJobNameFunc, testResultsVariantsFunc testResultsByVariantsFunc, compareOtherPRs bool) (apitype.ProwJobRunRiskAnalysis, error) {

logger.Info("loaded prow job run for analysis")
logger.Infof("this job run has %d failed tests", len(jobRun.Tests))
Expand Down Expand Up @@ -469,7 +477,7 @@ func runJobRunAnalysis(jobRun *models.ProwJobRun, compareRelease string, jobRunT
}

loggerFields := logger.WithFields(log.Fields{"name": ft.Test.Name})
analysis, err := runTestRunAnalysis(ft, jobRun, compareRelease, loggerFields, testResultsJobNameFunc, jobNames, testResultsVariantsFunc, neverStableJob)
analysis, err := runTestRunAnalysis(bqc, ft, jobRun, compareRelease, loggerFields, testResultsJobNameFunc, jobNames, testResultsVariantsFunc, neverStableJob, compareOtherPRs)
if err != nil {
continue // ignore runs where analysis failed
}
Expand All @@ -488,7 +496,7 @@ func runJobRunAnalysis(jobRun *models.ProwJobRun, compareRelease string, jobRunT

// For a failed test, query its pass rates by NURPs, find a matching variant combo, and
// see how often we've passed in the last week.
func runTestRunAnalysis(failedTest models.ProwJobRunTest, jobRun *models.ProwJobRun, compareRelease string, logger *log.Entry, testResultsJobNameFunc testResultsByJobNameFunc, jobNames []string, testResultsVariantsFunc testResultsByVariantsFunc, neverStableJob bool) (apitype.ProwJobRunTestRiskAnalysis, error) {
func runTestRunAnalysis(bqc *bigquery.Client, failedTest models.ProwJobRunTest, jobRun *models.ProwJobRun, compareRelease string, logger *log.Entry, testResultsJobNameFunc testResultsByJobNameFunc, jobNames []string, testResultsVariantsFunc testResultsByVariantsFunc, neverStableJob, compareOtherPRs bool) (apitype.ProwJobRunTestRiskAnalysis, error) {

logger.Debug("failed test")

Expand Down Expand Up @@ -539,7 +547,18 @@ func runTestRunAnalysis(failedTest models.ProwJobRunTest, jobRun *models.ProwJob
// Watch out for tests that ran in previous period, but not current, no sense comparing to 0 runs:
if (testResultsVariants != nil && testResultsVariants.CurrentRuns > 0) || (testResultsJobNames != nil && testResultsJobNames.CurrentRuns > 0) {
// select the 'best' test result
analysis.Risk = selectRiskAnalysisResult(testResultsJobNames, testResultsVariants, jobNames, compareRelease)
risk := selectRiskAnalysisResult(testResultsJobNames, testResultsVariants, jobNames, compareRelease)
if compareOtherPRs && risk.Level.Level >= apitype.FailureRiskLevelHigh.Level && len(jobRun.PullRequests) > 0 && isHighRiskInOtherPRs(bqc, failedTest, jobRun) {
// If the same test/job has high risk in other PRs, we override the risk level
analysis.Risk = apitype.TestFailureRisk{
Level: apitype.FailureRiskLevelNone,
Reasons: []string{
"High risk was identified in other PRs first",
},
}
} else {
analysis.Risk = risk
}
} else {
analysis.Risk = apitype.TestFailureRisk{
Level: apitype.FailureRiskLevelUnknown,
Expand All @@ -552,6 +571,55 @@ func runTestRunAnalysis(failedTest models.ProwJobRunTest, jobRun *models.ProwJob
return analysis, nil
}

func isHighRiskInOtherPRs(bqc *bigquery.Client, failedTest models.ProwJobRunTest, jobRun *models.ProwJobRun) bool {
pr := jobRun.PullRequests[0]
endTime := jobRun.Timestamp.Add(jobRun.Duration)
if jobRun.Timestamp.IsZero() {
endTime = time.Now()
}
log.Infof("Evaluating if test '%s' is high risk in other PRs for job %s", failedTest.Test.Name, jobRun.ProwJob.Name)
_, jobSuffix, found := strings.Cut(jobRun.ProwJob.Name, "pull-ci-"+pr.Org+"-"+pr.Repo)
if !found {
return false
}
queryStr := `SELECT COUNT(*) FROM ` +
fmt.Sprintf("%s.%s.%s", "openshift-ci-data-analysis", "ci_data_autodl", "risk_analysis_test_results") +
fmt.Sprintf(" INNER JOIN %s.%s.%s jobs", "openshift-gce-devel", "ci_analysis_us", "jobs") +
` ON JobRunName=jobs.prowjob_build_id` +
fmt.Sprintf(" WHERE PartitionTime BETWEEN TIMESTAMP('%s') AND TIMESTAMP('%s') AND", endTime.Add(-12*time.Hour).Format(time.RFC3339), endTime.Add(3*time.Hour).Format(time.RFC3339)) +
` RiskLevel>=100 AND` +
fmt.Sprintf(" TestName='%s' AND", failedTest.Test.Name) +
fmt.Sprintf(" (org!='%s' OR repo!='%s' OR pr_number!='%d') AND", pr.Org, pr.Repo, pr.Number) +
fmt.Sprintf(" prowjob_job_name LIKE '%%%s'", jobSuffix)
q := bqc.BQ.Query(queryStr)

it, err := q.Read(context.TODO())
if err != nil {
log.WithError(err).Error("Failed querying high risk items from bigquery")
return false
}

var rowCount int64
for {
var values []bqlib.Value
err := it.Next(&values)
if err == iterator.Done {
break
}
if err != nil {
log.WithError(err).Error("error parsing number of high risk items from bigquery")
return false
}
rowCount = values[0].(int64)
if rowCount > 0 {
log.Infof("High risk items found in other PRs for job %s test '%s'", jobRun.ProwJob.Name, failedTest.Test.Name)
return true
}
}

return false
}

func selectRiskAnalysisResult(testResultsJobNames, testResultsVariants *apitype.Test, jobNames []string, compareRelease string) apitype.TestFailureRisk {

var variantRisk, jobRisk apitype.TestFailureRisk
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/job_runs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestRunJobAnalysis(t *testing.T) {
}
}

result, err := runJobRunAnalysis(fakeProwJobRun, "4.12", 5, 5, false, tc.jobNames, log.WithField("jobRunID", "test"), testResultsJobNamesLookupFunc, testResultsVariantsLookupFunc)
result, err := runJobRunAnalysis(nil, fakeProwJobRun, "4.12", 5, 5, false, tc.jobNames, log.WithField("jobRunID", "test"), testResultsJobNamesLookupFunc, testResultsVariantsLookupFunc, false)

require.NoError(t, err)
assert.Equal(t, len(tc.expectedTestRisks), len(result.Tests))
Expand Down
26 changes: 23 additions & 3 deletions pkg/sippyserver/pr_commenting_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
jobQueries "github.com/openshift/sippy/pkg/api"
"github.com/openshift/sippy/pkg/apis/api"
"github.com/openshift/sippy/pkg/apis/prow"
"github.com/openshift/sippy/pkg/bigquery"
"github.com/openshift/sippy/pkg/dataloader/prowloader/gcs"
"github.com/openshift/sippy/pkg/db"
"github.com/openshift/sippy/pkg/db/models"
Expand Down Expand Up @@ -58,6 +59,11 @@ var (
Help: "Tracks the call made to query db and add items to the pending work queue",
Buckets: prometheus.LinearBuckets(0, 500, 10),
}, []string{"type"})

riskAnalysisPRTestRiskMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "sippy_risk_analysis_pr_test_risk",
Help: "Tracks the risk level of high risk PRs",
}, []string{"org", "repo", "pr", "job", "jobID", "test"})
)

type RiskAnalysisEntry struct {
Expand All @@ -72,8 +78,9 @@ type RiskAnalysisEntry struct {
// commentUpdaterRate: the minimum duration between adding a comment before we begin work on adding the next
// ghCommenter: the commenting implmentation
// dryRunOnly: default is true to prevent unintended commenting when running locally or in a test deployment
func NewWorkProcessor(dbc *db.DB, gcsBucket *storage.BucketHandle, commentAnalysisWorkers int, commentAnalysisRate, commentUpdaterRate time.Duration, ghCommenter *commenter.GitHubCommenter, dryRunOnly bool) *WorkProcessor {
func NewWorkProcessor(dbc *db.DB, gcsBucket *storage.BucketHandle, commentAnalysisWorkers int, bigQueryClient *bigquery.Client, commentAnalysisRate, commentUpdaterRate time.Duration, ghCommenter *commenter.GitHubCommenter, dryRunOnly bool) *WorkProcessor {
wp := &WorkProcessor{dbc: dbc, gcsBucket: gcsBucket, ghCommenter: ghCommenter,
bigQueryClient: bigQueryClient,
commentAnalysisRate: commentAnalysisRate,
commentUpdaterRate: commentUpdaterRate,
commentAnalysisWorkers: commentAnalysisWorkers,
Expand All @@ -89,6 +96,7 @@ type WorkProcessor struct {
dbc *db.DB
gcsBucket *storage.BucketHandle
ghCommenter *commenter.GitHubCommenter
bigQueryClient *bigquery.Client
dryRunOnly bool
}

Expand All @@ -111,6 +119,7 @@ type CommentWorker struct {
type AnalysisWorker struct {
dbc *db.DB
gcsBucket *storage.BucketHandle
bigQueryClient *bigquery.Client
riskAnalysisLocator *regexp.Regexp
pendingAnalysis chan models.PullRequestComment
pendingComments chan PendingComment
Expand Down Expand Up @@ -168,7 +177,7 @@ func (wp *WorkProcessor) Run(ctx context.Context) {
defer close(pendingWork)

for i := 0; i < wp.commentAnalysisWorkers; i++ {
analysisWorker := AnalysisWorker{riskAnalysisLocator: gcs.GetDefaultRiskAnalysisSummaryFile(), dbc: wp.dbc, gcsBucket: wp.gcsBucket, pendingAnalysis: pendingWork, pendingComments: pendingComments}
analysisWorker := AnalysisWorker{riskAnalysisLocator: gcs.GetDefaultRiskAnalysisSummaryFile(), dbc: wp.dbc, gcsBucket: wp.gcsBucket, bigQueryClient: wp.bigQueryClient, pendingAnalysis: pendingWork, pendingComments: pendingComments}
go analysisWorker.Run()
}

Expand Down Expand Up @@ -481,6 +490,7 @@ func (aw *AnalysisWorker) processRiskAnalysisComment(prPendingComment models.Pul
sortedAnalysis := make(RiskAnalysisEntryList, 0)
for k, v := range analysis {
sortedAnalysis = append(sortedAnalysis, RiskAnalysisEntry{k, v})
setRiskAnalysisHighRiskMetrics(prPendingComment.Org, prPendingComment.Repo, strconv.Itoa(prPendingComment.PullNumber), k, v.URL, v)
}
sort.Sort(sortedAnalysis)

Expand All @@ -502,6 +512,16 @@ func (aw *AnalysisWorker) processRiskAnalysisComment(prPendingComment models.Pul
log.Debugf("Comment added to pendingComments: %s/%s/%s", pendingComment.org, pendingComment.repo, pendingComment.sha)
}

func setRiskAnalysisHighRiskMetrics(org, repo, number, jobName, jobID string, summary RiskAnalysisSummary) {
for _, testSummary := range summary.TestRiskAnalysis {
if summary.RiskLevel == api.FailureRiskLevelHigh {
riskAnalysisPRTestRiskMetric.WithLabelValues(org, repo, number, jobName, jobID, testSummary.Name).Set(float64(testSummary.Risk.Level.Level))
} else {
riskAnalysisPRTestRiskMetric.DeleteLabelValues(org, repo, number, jobName, jobID, testSummary.Name)
}
}
}

func buildComment(sortedAnalysis RiskAnalysisEntryList, sha string) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Job Failure Risk Analysis for sha: %s\n\n| Job Name | Failure Risk |\n|:---|:---|\n", sha))
Expand Down Expand Up @@ -820,7 +840,7 @@ func (aw *AnalysisWorker) getRiskSummary(jobRunID, jobRunIDPath string, priorRis
} else {

// get the riskanalysis directly from jobqueries
ra, err := jobQueries.JobRunRiskAnalysis(aw.dbc, jobRun, jobRunTestCount, logger.WithField("func", "JobRunRiskAnalysis"))
ra, err := jobQueries.JobRunRiskAnalysis(aw.dbc, aw.bigQueryClient, jobRun, jobRunTestCount, logger.WithField("func", "JobRunRiskAnalysis"), true)

if err != nil {
logger.WithError(err).Errorf("Error querying risk analysis for: %s", jobRunIDPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sippyserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (s *Server) jsonJobRunRiskAnalysis(w http.ResponseWriter, req *http.Request
}

logger.Infof("job run = %+v", *jobRun)
result, err := api.JobRunRiskAnalysis(s.db, jobRun, jobRunTestCount, logger.WithField("func", "JobRunRiskAnalysis"))
result, err := api.JobRunRiskAnalysis(s.db, s.bigQueryClient, jobRun, jobRunTestCount, logger.WithField("func", "JobRunRiskAnalysis"), false)
if err != nil {
failureResponse(w, http.StatusBadRequest, err.Error())
}
Expand Down

0 comments on commit 5e6cb1b

Please sign in to comment.