Skip to content

Synchorize pr state periodically #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ serde_json = "1"
toml = "0.8"

# GitHub
octocrab = { version = "0.44", features = ["timeout"] }
octocrab = { version = "0.44", features = ["timeout", "stream"] }

# Async
futures = "0.3"
Expand Down
7 changes: 7 additions & 0 deletions src/bin/bors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const CANCEL_TIMED_OUT_BUILDS_INTERVAL: Duration = Duration::from_secs(60 * 5);
/// How often should the bot reload the mergeability status of PRs?
const MERGEABILITY_STATUS_INTERVAL: Duration = Duration::from_secs(60 * 10);

/// How often should the bot synchronize PR state.
const PR_STATE_PERIODIC_REFRESH: Duration = Duration::from_secs(60 * 10);

#[derive(clap::Parser)]
struct Opts {
/// Github App ID.
Expand Down Expand Up @@ -141,6 +144,7 @@ fn try_main(opts: Opts) -> anyhow::Result<()> {
let mut permissions_refresh = make_interval(PERMISSIONS_REFRESH_INTERVAL);
let mut cancel_builds_refresh = make_interval(CANCEL_TIMED_OUT_BUILDS_INTERVAL);
let mut mergeability_status_refresh = make_interval(MERGEABILITY_STATUS_INTERVAL);
let mut prs_interval = make_interval(PR_STATE_PERIODIC_REFRESH);
loop {
tokio::select! {
_ = config_refresh.tick() => {
Expand All @@ -155,6 +159,9 @@ fn try_main(opts: Opts) -> anyhow::Result<()> {
_ = mergeability_status_refresh.tick() => {
refresh_tx.send(BorsGlobalEvent::RefreshPullRequestMergeability).await?;
}
_ = prs_interval.tick() => {
refresh_tx.send(BorsGlobalEvent::RefreshPullRequestState).await?;
}
}
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/bors/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum BorsGlobalEvent {
CancelTimedOutBuilds,
/// Refresh mergeability status of PRs that have unknown mergeability status.
RefreshPullRequestMergeability,
/// Periodic event that serves for synchronizing PR state.
RefreshPullRequestState,
}

#[derive(Debug)]
Expand Down
13 changes: 13 additions & 0 deletions src/bors/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use pr_events::{
handle_pull_request_merged, handle_pull_request_opened, handle_pull_request_ready_for_review,
handle_pull_request_reopened, handle_push_to_branch, handle_push_to_pull_request,
};
use refresh::sync_pull_requests_state;
use review::{command_delegate, command_set_priority, command_set_rollup, command_undelegate};
use tracing::Instrument;

Expand Down Expand Up @@ -283,6 +284,18 @@ pub async fn handle_bors_global_event(
#[cfg(test)]
crate::bors::WAIT_FOR_MERGEABILITY_STATUS_REFRESH.mark();
}
BorsGlobalEvent::RefreshPullRequestState => {
let span = tracing::info_span!("Refresh");
for_each_repo(&ctx, |repo| {
let subspan = tracing::info_span!("Repo", repo = repo.repository().to_string());
sync_pull_requests_state(repo, Arc::clone(&db)).instrument(subspan)
})
.instrument(span)
.await?;

#[cfg(test)]
crate::bors::WAIT_FOR_PR_STATUS_REFRESH.mark();
}
}
Ok(())
}
Expand Down
150 changes: 150 additions & 0 deletions src/bors/handlers/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::time::Duration;

use anyhow::Context;
use chrono::{DateTime, Utc};
use std::collections::BTreeMap;

use crate::bors::Comment;
use crate::bors::PullRequestStatus;
use crate::bors::RepositoryState;
use crate::bors::handlers::trybuild::cancel_build_workflows;
use crate::bors::mergeable_queue::MergeableQueueSender;
Expand Down Expand Up @@ -97,6 +99,70 @@ pub async fn reload_repository_config(repo: Arc<RepositoryState>) -> anyhow::Res
Ok(())
}

pub async fn sync_pull_requests_state(
repo: Arc<RepositoryState>,
db: Arc<PgDbClient>,
) -> anyhow::Result<()> {
let repo = repo.as_ref();
let db = db.as_ref();
let repo_name = repo.repository();
// load open/draft prs from github
let nonclosed_gh_prs = repo.client.fetch_nonclosed_pull_requests().await?;
// load open/draft prs from db
let nonclosed_db_prs = db.get_nonclosed_pull_requests(repo_name).await?;

let nonclosed_gh_prs_num = nonclosed_gh_prs
.into_iter()
.map(|pr| (pr.number, pr))
.collect::<BTreeMap<_, _>>();

let nonclosed_db_prs_num = nonclosed_db_prs
.into_iter()
.map(|pr| (pr.number, pr))
.collect::<BTreeMap<_, _>>();

for (pr_num, gh_pr) in &nonclosed_gh_prs_num {
let db_pr = nonclosed_db_prs_num.get(pr_num);
if let Some(db_pr) = db_pr {
if db_pr.pr_status != gh_pr.status {
// PR status changed in GitHub
tracing::debug!(
"PR {} status changed from {:?} to {:?}",
pr_num,
db_pr.pr_status,
gh_pr.status
);
db.set_pr_status(repo_name, *pr_num, gh_pr.status).await?;
}
} else {
// Nonclosed PRs in GitHub that are either not in the DB or marked as closed
tracing::debug!("PR {} not found in open PRs in DB, upserting it", pr_num);
db.upsert_pull_request(
repo_name,
gh_pr.number,
&gh_pr.base.name,
gh_pr.mergeable_state.clone().into(),
&gh_pr.status,
)
.await?;
}
}
// PRs that are closed in GitHub but not in the DB. In theory PR could also be merged
// but bors does the merging so it should not happen.
for pr_num in nonclosed_db_prs_num.keys() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a potential race condition here, where if the PR just got merged on GH in between the GH API and DB calls, we would mark it as closed even though it was merged. But let's not deal with that now.

Copy link
Contributor Author

@geetanshjuneja geetanshjuneja May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the PR just got merged b/w GH API and DB calls, then GH would send a webhook which will be processed after it changing the status to merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something like this:
Bors merges a PR. Before setting its state to merged, this refresh handler runs and loads the PR as open from GH. Then bors sets the local state as merged, but it gets overwritten by this refresh handler. This is possible because the refresh handler can run concurrently with command handlers.

That being said, this is incredibly unlikely to happen, and even if we did, the state should be correctly synchronized during the next refresh, or during the next comment sent on the PR, so it should be fine.

if !nonclosed_gh_prs_num.contains_key(pr_num) {
tracing::debug!(
"PR {} not found in open/draft prs in GitHub, closing it in DB",
pr_num
);
db.set_pr_status(repo_name, *pr_num, PullRequestStatus::Closed)
.await?;
}
}

Ok(())
}

#[cfg(not(test))]
fn now() -> DateTime<Utc> {
Utc::now()
Expand All @@ -119,6 +185,7 @@ fn elapsed_time(date: DateTime<Utc>) -> Duration {

#[cfg(test)]
mod tests {
use crate::bors::PullRequestStatus;
use crate::bors::handlers::WAIT_FOR_WORKFLOW_STARTED;
use crate::bors::handlers::refresh::MOCK_TIME;
use crate::database::{MergeableState, OctocrabMergeableState};
Expand All @@ -139,6 +206,15 @@ mod tests {
.await;
}

#[sqlx::test]
async fn refresh_pr_state(pool: sqlx::PgPool) {
run_test(pool, |tester| async move {
tester.refresh_prs().await;
Ok(tester)
})
.await;
}

fn gh_state_with_long_timeout() -> GitHubState {
GitHubState::default().with_default_config(
r#"
Expand Down Expand Up @@ -246,6 +322,80 @@ timeout = 3600
.await;
}

#[sqlx::test]
async fn refresh_new_pr(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async move {
let pr = tester
.with_blocked_webhooks(async |tester| {
tester.open_pr(default_repo_name(), false).await
})
.await?;
tester.refresh_prs().await;
assert_eq!(
tester
.db()
.get_pull_request(&default_repo_name(), pr.number)
.await?
.unwrap()
.pr_status,
PullRequestStatus::Open
);
Ok(tester)
})
.await;
}

#[sqlx::test]
async fn refresh_pr_with_status_closed(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async move {
let pr = tester.open_pr(default_repo_name(), false).await?;
tester
.with_blocked_webhooks(async |tester| {
tester.close_pr(default_repo_name(), pr.number.0).await
})
.await?;
tester.refresh_prs().await;
assert_eq!(
tester
.db()
.get_pull_request(&default_repo_name(), pr.number)
.await?
.unwrap()
.pr_status,
PullRequestStatus::Closed
);
Ok(tester)
})
.await;
}

#[sqlx::test]
async fn refresh_pr_with_status_draft(pool: sqlx::PgPool) {
run_test(pool, |mut tester| async move {
let pr = tester.open_pr(default_repo_name(), false).await?;
tester
.with_blocked_webhooks(async |tester| {
tester
.convert_to_draft(default_repo_name(), pr.number.0)
.await
})
.await?;

tester.refresh_prs().await;
assert_eq!(
tester
.db()
.get_pull_request(&default_repo_name(), pr.number)
.await?
.unwrap()
.pr_status,
PullRequestStatus::Draft
);
Ok(tester)
})
.await;
}

async fn with_mocked_time<Fut: Future<Output = ()>>(in_future: Duration, future: Fut) {
// It is important to use this function only with a single threaded runtime,
// otherwise the `MOCK_TIME` variable might get mixed up between different threads.
Expand Down
Loading