Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/github/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
deletePullRequestFiles,
countRepoLabels,
getInstallation,
getLatestRepoGithubTotalsSnapshot,
listRepoGithubTotalsSnapshotHistory,
getRepoSyncSegment,
getRepoSyncState,
listOpenIssueNumbers,
Expand Down Expand Up @@ -305,6 +305,9 @@ const ERROR_BACKOFF_MS = 60 * 60 * 1000;
const SEGMENT_PAGE_BUDGET: Record<BackfillMode, number> = { light: 2, full: 10, resume: 10 };
const PR_DETAIL_BATCH_SIZE: Record<BackfillMode, number> = { light: 12, full: 40, resume: 40 };
const CURRENT_OPEN_SCAN_MARKER = "gittensory-current-open-scan-v1";
const FRESH_TOTALS_SNAPSHOT_MS = 10 * 60 * 1000;
const TOTALS_SNAPSHOT_LOOKBACK = 8;
const repoGithubTotalsRefreshes = new Map<string, Promise<RepoGithubTotalsSnapshotRecord | undefined>>();

export async function backfillRegisteredRepositories(
env: Env,
Expand Down Expand Up @@ -392,7 +395,7 @@ export async function enqueueRepositoryOpenDataBackfill(
if (!settings.backfillEnabled) return { ok: true, repoFullName: repo.fullName, status: "skipped", warnings: ["Backfill is disabled for this repository."] };
const token = await tokenForRepo(env, repo);
const sourceKind: RepoSyncSegmentRecord["sourceKind"] = repo.installationId && token !== env.GITHUB_PUBLIC_TOKEN ? "installation" : "github";
const totals = token ? await refreshRepoGithubTotals(env, repo, token, sourceKind).catch(() => undefined) : undefined;
const totals = await repoGithubTotalsForBackfill(env, repo, token, sourceKind);
const startedAt = nowIso();
const previous = await getRepoSyncState(env, repo.fullName);
await upsertRepoSyncState(env, {
Expand Down Expand Up @@ -460,7 +463,7 @@ export async function backfillRepositorySegment(
);
return segmentJobResult(repo.fullName, options.segment, segment);
}
const totals = (token ? await refreshRepoGithubTotals(env, repo, token, sourceKind).catch(() => undefined) : undefined) ?? (await getLatestRepoGithubTotalsSnapshot(env, repo.fullName));
const totals = await repoGithubTotalsForBackfill(env, repo, token, sourceKind);
const result =
options.segment === "labels"
? await backfillLabelsSegment(env, repo, token, sourceKind, mode, options.cursor, totals)
Expand All @@ -483,6 +486,54 @@ export async function backfillRepositorySegment(
return segmentJobResult(repo.fullName, options.segment, result.segment);
}

async function repoGithubTotalsForBackfill(
env: Env,
repo: RepositoryRecord,
token: string | undefined,
sourceKind: RepoSyncSegmentRecord["sourceKind"],
): Promise<RepoGithubTotalsSnapshotRecord | null | undefined> {
const { fresh, fallback } = await usableRepoGithubTotalsSnapshot(env, repo.fullName, sourceKind);
if (fresh) return fresh;
return (await refreshRepoGithubTotalsCoalesced(env, repo, token, sourceKind)) ?? fallback;
}

async function usableRepoGithubTotalsSnapshot(
env: Env,
repoFullName: string,
sourceKind: RepoSyncSegmentRecord["sourceKind"],
): Promise<{ fresh?: RepoGithubTotalsSnapshotRecord; fallback?: RepoGithubTotalsSnapshotRecord }> {
const snapshots = await listRepoGithubTotalsSnapshotHistory(env, repoFullName, { limit: TOTALS_SNAPSHOT_LOOKBACK });
for (let index = snapshots.length - 1; index >= 0; index -= 1) {
const snapshot = snapshots[index]!;
if (snapshot.sourceKind !== sourceKind) continue;
const fetchedAtMs = Date.parse(snapshot.fetchedAt);
const ageMs = Date.now() - fetchedAtMs;
if (!Number.isFinite(fetchedAtMs) || ageMs < 0) continue;
if (ageMs <= FRESH_TOTALS_SNAPSHOT_MS) return { fresh: snapshot, fallback: snapshot };
return { fallback: snapshot };
}
return {};
}

async function refreshRepoGithubTotalsCoalesced(
env: Env,
repo: RepositoryRecord,
token: string | undefined,
sourceKind: RepoSyncSegmentRecord["sourceKind"],
): Promise<RepoGithubTotalsSnapshotRecord | undefined> {
if (!token) return undefined;
const key = `${sourceKind}:${repo.fullName}`;
const inFlight = repoGithubTotalsRefreshes.get(key);
if (inFlight) return inFlight;
const refresh = refreshRepoGithubTotals(env, repo, token, sourceKind)
.catch(() => undefined)
.finally(() => {
repoGithubTotalsRefreshes.delete(key);
});
repoGithubTotalsRefreshes.set(key, refresh);
return refresh;
}

export async function backfillOpenPullRequestDetails(
env: Env,
options: { repoFullName: string; mode?: BackfillMode; cursor?: number },
Expand Down Expand Up @@ -1458,9 +1509,9 @@ function isTerminalSegmentStatus(status: RepoSyncSegmentRecord["status"]): boole
}

async function refreshRepoSyncStateFromSegments(env: Env, repo: RepositoryRecord, sourceKind: RepoSyncSegmentRecord["sourceKind"]): Promise<void> {
const [previous, totals, metadata, labels, openIssues, openPullRequests, recentMerged, files, reviews, checks] = await Promise.all([
const [previous, totalsSnapshot, metadata, labels, openIssues, openPullRequests, recentMerged, files, reviews, checks] = await Promise.all([
getRepoSyncState(env, repo.fullName),
getLatestRepoGithubTotalsSnapshot(env, repo.fullName),
usableRepoGithubTotalsSnapshot(env, repo.fullName, sourceKind),
getRepoSyncSegment(env, repo.fullName, "metadata"),
getRepoSyncSegment(env, repo.fullName, "labels"),
getRepoSyncSegment(env, repo.fullName, "open_issues"),
Expand All @@ -1470,6 +1521,7 @@ async function refreshRepoSyncStateFromSegments(env: Env, repo: RepositoryRecord
getRepoSyncSegment(env, repo.fullName, "pull_request_reviews"),
getRepoSyncSegment(env, repo.fullName, "check_summaries"),
]);
const totals = totalsSnapshot.fallback;
// Include recent_merged_pull_requests so an unfinished merged-history crawl (running /
// waiting_rate_limit / error / other non-terminal) is reflected in the repo status instead
// of being silently rolled up as `success` and then skipped by the freshness check.
Expand Down
Loading
Loading