diff --git a/src/github/backfill.ts b/src/github/backfill.ts index b7cca72da..04c9a7b26 100644 --- a/src/github/backfill.ts +++ b/src/github/backfill.ts @@ -8,7 +8,7 @@ import { deletePullRequestFiles, countRepoLabels, getInstallation, - getLatestRepoGithubTotalsSnapshot, + listRepoGithubTotalsSnapshotHistory, getRepoSyncSegment, getRepoSyncState, listOpenIssueNumbers, @@ -305,6 +305,9 @@ const ERROR_BACKOFF_MS = 60 * 60 * 1000; const SEGMENT_PAGE_BUDGET: Record = { light: 2, full: 10, resume: 10 }; const PR_DETAIL_BATCH_SIZE: Record = { light: 12, full: 40, resume: 40 }; const CURRENT_OPEN_SCAN_MARKER = "gittensory-current-open-scan-v1"; +const FRESH_TOTALS_SNAPSHOT_MS = 10 * 60 * 1000; +const TOTALS_SNAPSHOT_LOOKBACK = 8; +const repoGithubTotalsRefreshes = new Map>(); export async function backfillRegisteredRepositories( env: Env, @@ -392,7 +395,7 @@ export async function enqueueRepositoryOpenDataBackfill( if (!settings.backfillEnabled) return { ok: true, repoFullName: repo.fullName, status: "skipped", warnings: ["Backfill is disabled for this repository."] }; const token = await tokenForRepo(env, repo); const sourceKind: RepoSyncSegmentRecord["sourceKind"] = repo.installationId && token !== env.GITHUB_PUBLIC_TOKEN ? "installation" : "github"; - const totals = token ? await refreshRepoGithubTotals(env, repo, token, sourceKind).catch(() => undefined) : undefined; + const totals = await repoGithubTotalsForBackfill(env, repo, token, sourceKind); const startedAt = nowIso(); const previous = await getRepoSyncState(env, repo.fullName); await upsertRepoSyncState(env, { @@ -460,7 +463,7 @@ export async function backfillRepositorySegment( ); return segmentJobResult(repo.fullName, options.segment, segment); } - const totals = (token ? await refreshRepoGithubTotals(env, repo, token, sourceKind).catch(() => undefined) : undefined) ?? (await getLatestRepoGithubTotalsSnapshot(env, repo.fullName)); + const totals = await repoGithubTotalsForBackfill(env, repo, token, sourceKind); const result = options.segment === "labels" ? await backfillLabelsSegment(env, repo, token, sourceKind, mode, options.cursor, totals) @@ -483,6 +486,54 @@ export async function backfillRepositorySegment( return segmentJobResult(repo.fullName, options.segment, result.segment); } +async function repoGithubTotalsForBackfill( + env: Env, + repo: RepositoryRecord, + token: string | undefined, + sourceKind: RepoSyncSegmentRecord["sourceKind"], +): Promise { + const { fresh, fallback } = await usableRepoGithubTotalsSnapshot(env, repo.fullName, sourceKind); + if (fresh) return fresh; + return (await refreshRepoGithubTotalsCoalesced(env, repo, token, sourceKind)) ?? fallback; +} + +async function usableRepoGithubTotalsSnapshot( + env: Env, + repoFullName: string, + sourceKind: RepoSyncSegmentRecord["sourceKind"], +): Promise<{ fresh?: RepoGithubTotalsSnapshotRecord; fallback?: RepoGithubTotalsSnapshotRecord }> { + const snapshots = await listRepoGithubTotalsSnapshotHistory(env, repoFullName, { limit: TOTALS_SNAPSHOT_LOOKBACK }); + for (let index = snapshots.length - 1; index >= 0; index -= 1) { + const snapshot = snapshots[index]!; + if (snapshot.sourceKind !== sourceKind) continue; + const fetchedAtMs = Date.parse(snapshot.fetchedAt); + const ageMs = Date.now() - fetchedAtMs; + if (!Number.isFinite(fetchedAtMs) || ageMs < 0) continue; + if (ageMs <= FRESH_TOTALS_SNAPSHOT_MS) return { fresh: snapshot, fallback: snapshot }; + return { fallback: snapshot }; + } + return {}; +} + +async function refreshRepoGithubTotalsCoalesced( + env: Env, + repo: RepositoryRecord, + token: string | undefined, + sourceKind: RepoSyncSegmentRecord["sourceKind"], +): Promise { + if (!token) return undefined; + const key = `${sourceKind}:${repo.fullName}`; + const inFlight = repoGithubTotalsRefreshes.get(key); + if (inFlight) return inFlight; + const refresh = refreshRepoGithubTotals(env, repo, token, sourceKind) + .catch(() => undefined) + .finally(() => { + repoGithubTotalsRefreshes.delete(key); + }); + repoGithubTotalsRefreshes.set(key, refresh); + return refresh; +} + export async function backfillOpenPullRequestDetails( env: Env, options: { repoFullName: string; mode?: BackfillMode; cursor?: number }, @@ -1458,9 +1509,9 @@ function isTerminalSegmentStatus(status: RepoSyncSegmentRecord["status"]): boole } async function refreshRepoSyncStateFromSegments(env: Env, repo: RepositoryRecord, sourceKind: RepoSyncSegmentRecord["sourceKind"]): Promise { - const [previous, totals, metadata, labels, openIssues, openPullRequests, recentMerged, files, reviews, checks] = await Promise.all([ + const [previous, totalsSnapshot, metadata, labels, openIssues, openPullRequests, recentMerged, files, reviews, checks] = await Promise.all([ getRepoSyncState(env, repo.fullName), - getLatestRepoGithubTotalsSnapshot(env, repo.fullName), + usableRepoGithubTotalsSnapshot(env, repo.fullName, sourceKind), getRepoSyncSegment(env, repo.fullName, "metadata"), getRepoSyncSegment(env, repo.fullName, "labels"), getRepoSyncSegment(env, repo.fullName, "open_issues"), @@ -1470,6 +1521,7 @@ async function refreshRepoSyncStateFromSegments(env: Env, repo: RepositoryRecord getRepoSyncSegment(env, repo.fullName, "pull_request_reviews"), getRepoSyncSegment(env, repo.fullName, "check_summaries"), ]); + const totals = totalsSnapshot.fallback; // Include recent_merged_pull_requests so an unfinished merged-history crawl (running / // waiting_rate_limit / error / other non-terminal) is reflected in the repo status instead // of being silently rolled up as `success` and then skipped by the freshness check. diff --git a/test/unit/backfill.test.ts b/test/unit/backfill.test.ts index 5474eb7cb..8d10471c8 100644 --- a/test/unit/backfill.test.ts +++ b/test/unit/backfill.test.ts @@ -14,6 +14,7 @@ import { listRepoLabels, listRepoSyncSegments, listRepoSyncStates, + persistRepoGithubTotalsSnapshot, recordGitHubRateLimitObservation, upsertInstallation, upsertRepoSyncSegment, @@ -51,6 +52,7 @@ import { createTestEnv } from "../helpers/d1"; describe("GitHub backfill", () => { afterEach(() => { + vi.useRealTimers(); clearGitHubResponseCacheForTest(); vi.unstubAllGlobals(); }); @@ -1389,6 +1391,309 @@ describe("GitHub backfill", () => { ); }); + it("reuses a fresh repo totals snapshot when queueing segmented backfills", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:05:00.000Z")); + const sent: import("../../src/types").JobMessage[] = []; + const env = createTestEnv({ + GITHUB_PUBLIC_TOKEN: "public-token", + JOBS: { + async send(message: import("../../src/types").JobMessage) { + sent.push(message); + }, + } as unknown as Queue, + }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { + fetchedAt: "2026-05-25T00:00:00.000Z", + openIssuesTotal: 3, + openPullRequestsTotal: 2, + mergedPullRequestsTotal: 5, + labelsTotal: 7, + }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + if (input.toString() === "https://api.github.com/graphql") { + graphQlFetches += 1; + return githubTotalsResponse({ openIssues: 99, openPullRequests: 99, mergedPullRequests: 99, closedPullRequests: 99, labels: 99 }); + } + return new Response("unexpected", { status: 500 }); + }); + + const result = await enqueueRepositoryOpenDataBackfill(env, { repoFullName: "JSONbored/gittensory", requestedBy: "api", mode: "resume", force: true }); + + expect(result).toMatchObject({ + status: "queued", + totals: { openIssuesTotal: 3, openPullRequestsTotal: 2, mergedPullRequestsTotal: 5, labelsTotal: 7 }, + warnings: [], + }); + expect(graphQlFetches).toBe(0); + expect(await listRepoSyncStates(env)).toMatchObject([{ status: "running", openIssuesCount: 3, openPullRequestsCount: 2 }]); + expect(sent).toHaveLength(4); + }); + + it("refreshes stale queue totals before segment fan-out but keeps stale same-source totals if refresh fails", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:30:00.000Z")); + const cases = [ + { name: "refresh succeeds", graphql: "ok" as const, expectedIssues: 11, expectedWarnings: [] as string[] }, + { name: "refresh fails", graphql: "fail" as const, expectedIssues: 4, expectedWarnings: [] as string[] }, + ]; + for (const scenario of cases) { + const sent: import("../../src/types").JobMessage[] = []; + const env = createTestEnv({ + GITHUB_PUBLIC_TOKEN: "public-token", + JOBS: { + async send(message: import("../../src/types").JobMessage) { + sent.push(message); + }, + } as unknown as Queue, + }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: "2026-05-25T00:00:00.000Z", openIssuesTotal: 4, openPullRequestsTotal: 1 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + if (input.toString() === "https://api.github.com/graphql") { + graphQlFetches += 1; + if (scenario.graphql === "fail") return new Response(`graphql unavailable for ${scenario.name}`, { status: 500 }); + return githubTotalsResponse({ openIssues: 11, openPullRequests: 6, mergedPullRequests: 5, closedPullRequests: 2, labels: 3 }); + } + return new Response("unexpected", { status: 500 }); + }); + + const result = await enqueueRepositoryOpenDataBackfill(env, { repoFullName: "JSONbored/gittensory", requestedBy: "api", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "queued", totals: { openIssuesTotal: scenario.expectedIssues }, warnings: scenario.expectedWarnings }); + expect(graphQlFetches).toBe(1); + expect(await listRepoSyncStates(env)).toMatchObject([{ status: "running", openIssuesCount: scenario.expectedIssues }]); + expect(sent).toHaveLength(4); + } + }); + + it("uses an older valid same-source totals snapshot when the latest row is unusable", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:05:00.000Z")); + const cases = [ + { name: "malformed latest", fetchedAt: "not-a-date", sourceKind: "github" as const }, + { name: "future latest", fetchedAt: "2026-05-25T00:06:00.000Z", sourceKind: "github" as const }, + { name: "source-mismatched latest", fetchedAt: "2026-05-25T00:04:00.000Z", sourceKind: "installation" as const }, + ]; + for (const snapshot of cases) { + const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: "2026-05-25T00:00:00.000Z", openIssuesTotal: 3, labelsTotal: 1 }); + await persistTotalsSnapshot(env, { fetchedAt: snapshot.fetchedAt, sourceKind: snapshot.sourceKind, openIssuesTotal: 99, labelsTotal: 99 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + const url = input.toString(); + if (url === "https://api.github.com/graphql") { + graphQlFetches += 1; + return new Response(`unexpected graphql for ${snapshot.name}`, { status: 500 }); + } + if (url.includes("/labels?")) return Response.json([{ name: "bug", color: "cc0000", description: "Bug" }]); + return new Response("not found", { status: 404 }); + }); + + const result = await backfillRepositorySegment(env, { repoFullName: "JSONbored/gittensory", segment: "labels", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "complete", fetchedCount: 1, expectedCount: 1 }); + expect(graphQlFetches).toBe(0); + expect(await listRepoSyncStates(env)).toMatchObject([{ repoFullName: "JSONbored/gittensory", status: "success", openIssuesCount: 3 }]); + } + }); + + it("coalesces concurrent queue totals refreshes for the same repo and source", async () => { + const sent: import("../../src/types").JobMessage[] = []; + const env = createTestEnv({ + GITHUB_PUBLIC_TOKEN: "public-token", + JOBS: { + async send(message: import("../../src/types").JobMessage) { + sent.push(message); + }, + } as unknown as Queue, + }); + await seedRegisteredRepo(env); + let releaseGraphQl!: () => void; + const graphQlGate = new Promise((resolve) => { + releaseGraphQl = resolve; + }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + if (input.toString() === "https://api.github.com/graphql") { + graphQlFetches += 1; + await graphQlGate; + return githubTotalsResponse({ openIssues: 8, openPullRequests: 5, mergedPullRequests: 3, closedPullRequests: 2, labels: 1 }); + } + return new Response("unexpected", { status: 500 }); + }); + + const first = enqueueRepositoryOpenDataBackfill(env, { repoFullName: "JSONbored/gittensory", requestedBy: "api", mode: "resume", force: true }); + const second = enqueueRepositoryOpenDataBackfill(env, { repoFullName: "JSONbored/gittensory", requestedBy: "schedule", mode: "resume", force: true }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(graphQlFetches).toBe(1); + releaseGraphQl(); + const results = await Promise.all([first, second]); + + expect(results).toEqual([ + expect.objectContaining({ status: "queued", totals: expect.objectContaining({ openIssuesTotal: 8, openPullRequestsTotal: 5 }) }), + expect.objectContaining({ status: "queued", totals: expect.objectContaining({ openIssuesTotal: 8, openPullRequestsTotal: 5 }) }), + ]); + expect(graphQlFetches).toBe(1); + expect(sent).toHaveLength(8); + }); + + it("uses valid stale totals without a token and warns only when no usable snapshot exists", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:30:00.000Z")); + const noSnapshotJobs: import("../../src/types").JobMessage[] = []; + const noSnapshotEnv = createTestEnv({ + JOBS: { + async send(message: import("../../src/types").JobMessage) { + noSnapshotJobs.push(message); + }, + } as unknown as Queue, + }); + await seedRegisteredRepo(noSnapshotEnv); + vi.stubGlobal("fetch", async () => new Response("network should not be used without a token", { status: 500 })); + + const noSnapshot = await enqueueRepositoryOpenDataBackfill(noSnapshotEnv, { repoFullName: "JSONbored/gittensory", requestedBy: "api", mode: "resume", force: true }); + + expect(noSnapshot).toMatchObject({ + status: "queued", + warnings: ["GitHub totals snapshot could not be refreshed before segment queueing."], + }); + expect(noSnapshot).not.toHaveProperty("totals"); + expect(noSnapshotJobs).toHaveLength(4); + + const staleSnapshotJobs: import("../../src/types").JobMessage[] = []; + const staleSnapshotEnv = createTestEnv({ + JOBS: { + async send(message: import("../../src/types").JobMessage) { + staleSnapshotJobs.push(message); + }, + } as unknown as Queue, + }); + await seedRegisteredRepo(staleSnapshotEnv); + await persistTotalsSnapshot(staleSnapshotEnv, { fetchedAt: "2026-05-25T00:00:00.000Z", openIssuesTotal: 6, openPullRequestsTotal: 4 }); + + const staleSnapshot = await enqueueRepositoryOpenDataBackfill(staleSnapshotEnv, { repoFullName: "JSONbored/gittensory", requestedBy: "api", mode: "resume", force: true }); + + expect(staleSnapshot).toMatchObject({ status: "queued", totals: { openIssuesTotal: 6, openPullRequestsTotal: 4 }, warnings: [] }); + expect(staleSnapshotJobs).toHaveLength(4); + }); + + it("reuses a fresh repo totals snapshot for queued segment backfills without repeating GraphQL", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:05:00.000Z")); + const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: "2026-05-25T00:00:00.000Z", labelsTotal: 1 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + const url = input.toString(); + if (url === "https://api.github.com/graphql") { + graphQlFetches += 1; + return githubTotalsResponse({ openIssues: 0, openPullRequests: 0, mergedPullRequests: 0, closedPullRequests: 0, labels: 99 }); + } + if (url.includes("/labels?")) return Response.json([{ name: "bug", color: "cc0000", description: "Bug" }]); + return new Response("not found", { status: 404 }); + }); + + const result = await backfillRepositorySegment(env, { repoFullName: "JSONbored/gittensory", segment: "labels", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "complete", fetchedCount: 1, expectedCount: 1 }); + expect(graphQlFetches).toBe(0); + }); + + it("refreshes live totals when a segment snapshot is stale, malformed, future-dated, or from another source", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:05:00.000Z")); + const cases = [ + { name: "stale", fetchedAt: "2026-05-24T23:40:00.000Z", sourceKind: "github" as const }, + { name: "malformed", fetchedAt: "not-a-date", sourceKind: "github" as const }, + { name: "future", fetchedAt: "2026-05-25T00:06:00.000Z", sourceKind: "github" as const }, + { name: "source mismatch", fetchedAt: "2026-05-25T00:00:00.000Z", sourceKind: "installation" as const }, + ]; + for (const snapshot of cases) { + const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: snapshot.fetchedAt, sourceKind: snapshot.sourceKind, labelsTotal: 1 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + const url = input.toString(); + if (url === "https://api.github.com/graphql") { + graphQlFetches += 1; + return githubTotalsResponse({ openIssues: 0, openPullRequests: 0, mergedPullRequests: 0, closedPullRequests: 0, labels: 2 }); + } + if (url.includes("/labels?")) + return Response.json([ + { name: "bug", color: "cc0000", description: "Bug" }, + { name: "enhancement", color: "00cc00", description: "Enhancement" }, + ]); + return new Response(`not found for ${snapshot.name}`, { status: 404 }); + }); + + const result = await backfillRepositorySegment(env, { repoFullName: "JSONbored/gittensory", segment: "labels", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "complete", fetchedCount: 2, expectedCount: 2 }); + expect(graphQlFetches).toBe(1); + } + }); + + it("falls back to the latest stale totals snapshot when the live segment refresh fails", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:30:00.000Z")); + const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: "2026-05-25T00:00:00.000Z", labelsTotal: 1 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + const url = input.toString(); + if (url === "https://api.github.com/graphql") { + graphQlFetches += 1; + return new Response("graphql unavailable", { status: 500 }); + } + if (url.includes("/labels?")) return Response.json([{ name: "bug", color: "cc0000", description: "Bug" }]); + return new Response("not found", { status: 404 }); + }); + + const result = await backfillRepositorySegment(env, { repoFullName: "JSONbored/gittensory", segment: "labels", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "complete", fetchedCount: 1, expectedCount: 1 }); + expect(graphQlFetches).toBe(1); + }); + + it("does not fall back to invalid or mismatched totals snapshots after a live refresh failure", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-25T00:05:00.000Z")); + const cases = [ + { name: "malformed", fetchedAt: "not-a-date", sourceKind: "github" as const }, + { name: "future", fetchedAt: "2026-05-25T00:06:00.000Z", sourceKind: "github" as const }, + { name: "source mismatch", fetchedAt: "2026-05-25T00:00:00.000Z", sourceKind: "installation" as const }, + ]; + for (const snapshot of cases) { + const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); + await seedRegisteredRepo(env); + await persistTotalsSnapshot(env, { fetchedAt: snapshot.fetchedAt, sourceKind: snapshot.sourceKind, labelsTotal: 99 }); + let graphQlFetches = 0; + vi.stubGlobal("fetch", async (input: RequestInfo | URL) => { + const url = input.toString(); + if (url === "https://api.github.com/graphql") { + graphQlFetches += 1; + return new Response(`graphql unavailable for ${snapshot.name}`, { status: 500 }); + } + if (url.includes("/labels?")) return Response.json([{ name: "bug", color: "cc0000", description: "Bug" }]); + return new Response("not found", { status: 404 }); + }); + + const result = await backfillRepositorySegment(env, { repoFullName: "JSONbored/gittensory", segment: "labels", mode: "resume", force: true }); + + expect(result).toMatchObject({ status: "complete", fetchedCount: 1 }); + expect(result).not.toHaveProperty("expectedCount"); + expect(graphQlFetches).toBe(1); + } + }); + it("drains open issue segments against GitHub totals without counting PR rows from /issues", async () => { const env = createTestEnv({ GITHUB_PUBLIC_TOKEN: "public-token" }); await seedRegisteredRepo(env); @@ -4106,6 +4411,32 @@ async function generatePrivateKeyPem(): Promise { return `-----BEGIN PRIVATE KEY-----\n${base64}\n-----END PRIVATE KEY-----`; } +async function persistTotalsSnapshot( + env: Env, + overrides: { + fetchedAt?: string; + sourceKind?: "github" | "installation"; + openIssuesTotal?: number; + openPullRequestsTotal?: number; + mergedPullRequestsTotal?: number; + closedUnmergedPullRequestsTotal?: number; + labelsTotal?: number; + } = {}, +) { + await persistRepoGithubTotalsSnapshot(env, { + id: crypto.randomUUID(), + repoFullName: "JSONbored/gittensory", + openIssuesTotal: overrides.openIssuesTotal ?? 0, + openPullRequestsTotal: overrides.openPullRequestsTotal ?? 0, + mergedPullRequestsTotal: overrides.mergedPullRequestsTotal ?? 0, + closedUnmergedPullRequestsTotal: overrides.closedUnmergedPullRequestsTotal ?? 0, + labelsTotal: overrides.labelsTotal ?? 0, + sourceKind: overrides.sourceKind ?? "github", + fetchedAt: overrides.fetchedAt ?? "2026-05-25T00:00:00.000Z", + payload: {}, + }); +} + function githubTotalsResponse(counts: { openIssues: number; openPullRequests: number; mergedPullRequests: number; closedPullRequests: number; labels: number }) { return Response.json({ data: {