From f255c9c8937fe4ccd8dfe1c4cfb36e920053d700 Mon Sep 17 00:00:00 2001 From: JSONbored <49853598+JSONbored@users.noreply.github.com> Date: Tue, 30 Jun 2026 22:12:46 -0700 Subject: [PATCH 1/5] feat(selfhost): prepare Postgres cutover tooling --- .env.example | 8 + .../docs.self-hosting-backup-scaling.tsx | 17 +- docker-compose.yml | 9 +- package.json | 1 + scripts/export-grafana-reporting-db.sh | 184 ++++++++++ .../migrate-selfhost-sqlite-to-postgres.ts | 316 ++++++++++++++++++ test/unit/selfhost-grafana-reporting.test.ts | 87 ++++- 7 files changed, 615 insertions(+), 7 deletions(-) create mode 100644 scripts/migrate-selfhost-sqlite-to-postgres.ts diff --git a/.env.example b/.env.example index 748b79164..2bd326443 100644 --- a/.env.example +++ b/.env.example @@ -156,8 +156,15 @@ GITTENSORY_REVIEW_DRAFT=false # # instance can't be driven through App creation by a random visitor. # PORT=8787 # DATABASE_PATH=/data/gittensory.sqlite # SQLite file on the mounted data volume; all migrations auto-apply +# POSTGRES_PASSWORD=change-this-long-random-value # used by the --profile postgres / --profile pgbouncer services # DATABASE_URL= # set to postgres://user:pw@host:5432/db to use Postgres instead of # # SQLite (shared DB → multi-instance). Overrides DATABASE_PATH. +# # Compose examples: +# # postgres://gittensory:@postgres:5432/gittensory +# # postgres://gittensory:@pgbouncer:5432/gittensory +# PGVECTOR_ENABLED=false # set true only when using the Postgres pgvector table for RAG. +# # Leave false when QDRANT_URL is set; Qdrant remains the preferred +# # dedicated vector store for review context at scale. REDIS_URL=redis://redis:6379 # REQUIRED for the self-host review runtime. The default compose stack # # starts Redis automatically; override for an external Redis. # GITTENSORY_IMAGE=ghcr.io/jsonbored/gittensory-selfhost:latest # image used by scripts/deploy-selfhost-image.sh; @@ -176,6 +183,7 @@ REDIS_URL=redis://redis:6379 # REQUIRED for the self-host review # QDRANT_DIM=768 # vector dimension of the collection (768 = nomic-embed-text:latest; # # 1024 = bge-m3/mxbai-embed-large). Must match AI_EMBED_MODEL; # # recreate the Qdrant collection when changing this after startup. +# GITTENSORY_REPORTING_SOURCE_DATABASE_URL= # optional Postgres reporting reader URL. Defaults to DATABASE_URL. # MIGRATIONS_DIR=/app/migrations # CRON_INTERVAL_MS=120000 # maintain/sweep + sync cadence (default ~2 min) diff --git a/apps/gittensory-ui/src/routes/docs.self-hosting-backup-scaling.tsx b/apps/gittensory-ui/src/routes/docs.self-hosting-backup-scaling.tsx index 1f7ae6d12..d1798083f 100644 --- a/apps/gittensory-ui/src/routes/docs.self-hosting-backup-scaling.tsx +++ b/apps/gittensory-ui/src/routes/docs.self-hosting-backup-scaling.tsx @@ -74,9 +74,22 @@ LITESTREAM_REGION=us-east-1`} /> @postgres:5432/gittensory + code={`POSTGRES_PASSWORD= +DATABASE_URL=postgres://gittensory:@pgbouncer:5432/gittensory REDIS_URL=redis://redis:6379 -PGVECTOR_ENABLED=true`} +QDRANT_URL=http://qdrant:6333`} + /> + + +

One-time SQLite to Postgres copy

+

+ Existing SQLite installs can copy state into a fresh Postgres database with the bundled + migrator. It dry-runs by default and only commits when --execute is present. +

+

Restore checks

diff --git a/docker-compose.yml b/docker-compose.yml index 592183e0f..487b560ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -331,9 +331,12 @@ services: - ./scripts/export-grafana-reporting-db.sh:/export-grafana-reporting-db.sh:ro environment: # Default SQLite app DB path maps app /data/gittensory.sqlite to exporter /appdb/gittensory.sqlite. - # If you override DATABASE_PATH, set this to the matching /appdb/ path. DATABASE_URL/Postgres - # deployments export an empty dashboard-safe DB so Grafana can still start until a SQL exporter is added. + # If you override DATABASE_PATH, set this to the matching /appdb/ path. When DATABASE_URL points at + # Postgres, the exporter reads Postgres with psql and still writes the same redacted SQLite reporting DB + # for Grafana. Set GITTENSORY_REPORTING_SOURCE_DATABASE_URL only if the reporting reader uses a different URL. GITTENSORY_REPORTING_SOURCE_DB: "${GITTENSORY_REPORTING_SOURCE_DB:-/appdb/gittensory.sqlite}" + GITTENSORY_REPORTING_SOURCE_DATABASE_URL: "${GITTENSORY_REPORTING_SOURCE_DATABASE_URL:-}" + DATABASE_URL: "${DATABASE_URL:-}" GITTENSORY_REPORTING_DIR: /reporting GITTENSORY_REPORTING_DB: "${GITTENSORY_REPORTING_DB:-/reporting/gittensory-reporting.sqlite}" GRAFANA_REPORTING_EXPORT_INTERVAL_SECONDS: "${GRAFANA_REPORTING_EXPORT_INTERVAL_SECONDS:-30}" @@ -341,7 +344,7 @@ services: - /bin/sh - -c - >- - apk add --no-cache sqlite >/dev/null 2>&1 && + apk add --no-cache sqlite postgresql16-client >/dev/null 2>&1 && while true; do sh /export-grafana-reporting-db.sh || echo '[reporting] export failed'; interval="$${GRAFANA_REPORTING_EXPORT_INTERVAL_SECONDS:-30}"; diff --git a/package.json b/package.json index ddb5e8bd4..547f50e6d 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "dev": "wrangler dev", "deploy": "wrangler deploy", "deploy:api": "wrangler d1 migrations apply gittensory --remote && wrangler deploy", + "selfhost:postgres:migrate": "tsx scripts/migrate-selfhost-sqlite-to-postgres.ts", "cf-typegen": "wrangler types && perl -pi -e 's/[[:blank:]]+$//' worker-configuration.d.ts", "db:migrate:local": "wrangler d1 migrations apply gittensory --local", "db:migrate:remote": "wrangler d1 migrations apply gittensory --remote", diff --git a/scripts/export-grafana-reporting-db.sh b/scripts/export-grafana-reporting-db.sh index c5907cb0e..869febe40 100644 --- a/scripts/export-grafana-reporting-db.sh +++ b/scripts/export-grafana-reporting-db.sh @@ -2,6 +2,7 @@ set -eu APP_DB="${GITTENSORY_REPORTING_SOURCE_DB:-/appdb/gittensory.sqlite}" +PG_DB="${GITTENSORY_REPORTING_SOURCE_DATABASE_URL:-${DATABASE_URL:-}}" OUT_DIR="${GITTENSORY_REPORTING_DIR:-/reporting}" OUT_DB="${GITTENSORY_REPORTING_DB:-$OUT_DIR/gittensory-reporting.sqlite}" TMP_DB="${OUT_DB}.tmp" @@ -18,6 +19,49 @@ source_table_exists() { sqlite3 "$APP_DB" "SELECT 1 FROM sqlite_master WHERE type='table' AND name='$1' LIMIT 1" | grep -q 1 } +pg_enabled() { + case "$PG_DB" in + postgres://*|postgresql://*) return 0 ;; + *) return 1 ;; + esac +} + +pg_scalar() { + psql "$PG_DB" -X -q -t -A -v ON_ERROR_STOP=1 -c "$1" +} + +pg_table_exists() { + value="$(pg_scalar "SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '$1' LIMIT 1")" || { + echo "reporting export failed: could not inspect Postgres table $1" >&2 + exit 1 + } + [ "$value" = "1" ] +} + +pg_column_exists() { + value="$(pg_scalar "SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' AND table_name = '$1' AND column_name = '$2' LIMIT 1")" || { + echo "reporting export failed: could not inspect Postgres column $1.$2" >&2 + exit 1 + } + [ "$value" = "1" ] +} + +pg_copy_csv() { + query="$1" + out="$2" + psql "$PG_DB" -X -q -v ON_ERROR_STOP=1 -c "\\copy ($query) TO STDOUT WITH CSV" >"$out" +} + +sqlite_import_csv() { + csv="$1" + table="$2" + [ -s "$csv" ] || return 0 + sqlite3 "$TMP_DB" </dev/null 2>&1; then + rm -f "$TMP_DB" "$TMP_DB-wal" "$TMP_DB-shm" + echo "reporting export failed: DATABASE_URL is Postgres but psql is not installed" >&2 + exit 1 + fi + + if ! pg_table_exists "pull_requests" && + ! pg_table_exists "advisories" && + ! pg_table_exists "review_targets" && + ! pg_table_exists "ai_usage_events"; then + if [ -s "$OUT_DB" ]; then + rm -f "$TMP_DB" "$TMP_DB-wal" "$TMP_DB-shm" + echo "reporting export skipped: no reporting source tables in Postgres; preserving last good $OUT_DB" >&2 + exit 1 + fi + fi + + if pg_table_exists "pull_requests" && pg_table_exists "advisories"; then + PR_CSV="$(mktemp)" + pg_copy_csv " +WITH latest_advisories AS ( + SELECT + repo_full_name, + pull_number, + conclusion, + updated_at, + ROW_NUMBER() OVER ( + PARTITION BY repo_full_name, pull_number + ORDER BY updated_at DESC, id DESC + ) AS rn + FROM advisories + WHERE pull_number IS NOT NULL +), +current_pull_requests AS ( + SELECT + p.repo_full_name AS repo, + p.number AS number, + p.author_login AS submitter, + CASE + WHEN lower(p.state) = 'closed' AND p.merged_at IS NOT NULL THEN 'merged' + WHEN lower(p.state) = 'closed' THEN 'closed' + WHEN a.conclusion IN ('failure', 'action_required') THEN 'manual' + WHEN a.conclusion IS NOT NULL THEN 'commented' + ELSE 'manual' + END AS status, + CASE a.conclusion + WHEN 'success' THEN 'merge' + WHEN 'failure' THEN 'close' + WHEN 'action_required' THEN 'manual' + WHEN 'neutral' THEN 'comment' + WHEN 'skipped' THEN 'ignore' + ELSE NULL + END AS verdict, + p.title AS title, + p.created_at AS created_at, + CASE + WHEN a.updated_at IS NOT NULL AND a.updated_at > p.updated_at THEN a.updated_at + ELSE p.updated_at + END AS updated_at + FROM pull_requests p + LEFT JOIN latest_advisories a + ON a.repo_full_name = p.repo_full_name + AND a.pull_number = p.number + AND a.rn = 1 +) +SELECT + repo, + number, + submitter, + status, + verdict, + title, + created_at, + updated_at +FROM current_pull_requests +" "$PR_CSV" + sqlite_import_csv "$PR_CSV" "review_targets" + rm -f "$PR_CSV" + fi + + if pg_table_exists "review_targets"; then + LEGACY_CSV="$(mktemp)" + if pg_table_exists "pull_requests"; then + LEGACY_FILTER="AND NOT EXISTS (SELECT 1 FROM pull_requests p WHERE p.repo_full_name = t.repo AND p.number = t.number)" + else + LEGACY_FILTER="" + fi + pg_copy_csv " +SELECT + t.repo, + t.number, + t.submitter, + t.status, + t.verdict, + t.title, + t.created_at, + t.updated_at +FROM review_targets t +WHERE t.kind = 'pull_request' + $LEGACY_FILTER +" "$LEGACY_CSV" + sqlite_import_csv "$LEGACY_CSV" "review_targets" + rm -f "$LEGACY_CSV" + fi + + if pg_table_exists "ai_usage_events"; then + AI_CSV="$(mktemp)" + if pg_column_exists "ai_usage_events" "estimated_neurons"; then + ESTIMATED_NEURONS_EXPR="COALESCE(estimated_neurons, 0)" + else + ESTIMATED_NEURONS_EXPR="0" + fi + pg_copy_csv " +SELECT + feature, + model, + status, + $ESTIMATED_NEURONS_EXPR AS estimated_neurons, + detail, + json_build_object( + 'repoFullName', metadata_json::jsonb ->> 'repoFullName', + 'pullNumber', metadata_json::jsonb ->> 'pullNumber' + )::text AS metadata_json, + created_at +FROM ai_usage_events +WHERE feature = 'ai_review_pr' +" "$AI_CSV" + sqlite_import_csv "$AI_CSV" "ai_usage_events" + rm -f "$AI_CSV" + fi + + sqlite3 "$TMP_DB" "PRAGMA quick_check;" | grep -qx "ok" + mv "$TMP_DB" "$OUT_DB" + rm -f "$TMP_DB-wal" "$TMP_DB-shm" + + echo "reporting export complete: $OUT_DB" + exit 0 +fi + if [ ! -s "$APP_DB" ]; then if [ -s "$OUT_DB" ]; then rm -f "$TMP_DB" "$TMP_DB-wal" "$TMP_DB-shm" diff --git a/scripts/migrate-selfhost-sqlite-to-postgres.ts b/scripts/migrate-selfhost-sqlite-to-postgres.ts new file mode 100644 index 000000000..f1722bd18 --- /dev/null +++ b/scripts/migrate-selfhost-sqlite-to-postgres.ts @@ -0,0 +1,316 @@ +#!/usr/bin/env tsx +import { existsSync } from "node:fs"; +import { DatabaseSync } from "node:sqlite"; +import pg, { type PoolClient } from "pg"; +import { createPgAdapter } from "../src/selfhost/pg-adapter"; +import { createPgQueue } from "../src/selfhost/pg-queue"; +import { initPgVectorize } from "../src/selfhost/pg-vectorize"; +import { runSelfHostMigrations } from "../src/selfhost/migrate"; + +interface Options { + sqlitePath: string; + postgresUrl: string; + migrationsDir: string; + execute: boolean; + allowNonEmpty: boolean; + includeVectors: boolean; + batchSize: number; +} + +interface CopyResult { + table: string; + rows: number; +} + +interface SkipResult { + table: string; + reason: string; +} + +const INTERNAL_SQLITE_TABLES = new Set(["d1_migrations", "_cf_KV", "__drizzle_migrations", "_selfhost_migrations"]); +const TABLES_ALLOWED_AFTER_SCHEMA_INIT = new Set(["global_agent_controls", "global_contributor_blacklist"]); + +function usage(): string { + return `Usage: npm run selfhost:postgres:migrate -- --sqlite --postgres-url [--execute] + +Copies a self-host SQLite database into an empty Postgres backend. The default is a transactionally +rolled-back dry run. Pass --execute to commit the copy. + +Options: + --sqlite SQLite source file. Defaults to DATABASE_PATH or /data/gittensory.sqlite. + --postgres-url Postgres target URL. Defaults to DATABASE_URL. + --migrations-dir Migration directory. Defaults to migrations. + --execute Commit the copy. Omit for a rollback dry run. + --allow-non-empty Allow non-empty target app tables. Off by default. + --include-vectors Also copy _selfhost_vectors into pgvector. + --batch-size Rows per INSERT batch. Defaults to 250.`; +} + +function parseArgs(argv: string[]): Options { + const opts: Options = { + sqlitePath: process.env.DATABASE_PATH ?? "/data/gittensory.sqlite", + postgresUrl: process.env.DATABASE_URL ?? "", + migrationsDir: process.env.MIGRATIONS_DIR ?? "migrations", + execute: false, + allowNonEmpty: false, + includeVectors: false, + batchSize: 250, + }; + + for (let i = 0; i < argv.length; i += 1) { + const arg = argv[i] as string; + const next = () => { + const value = argv[i + 1]; + if (!value) throw new Error(`Missing value for ${arg}`); + i += 1; + return value; + }; + switch (arg) { + case "--sqlite": + opts.sqlitePath = next(); + break; + case "--postgres-url": + opts.postgresUrl = next(); + break; + case "--migrations-dir": + opts.migrationsDir = next(); + break; + case "--execute": + opts.execute = true; + break; + case "--allow-non-empty": + opts.allowNonEmpty = true; + break; + case "--include-vectors": + opts.includeVectors = true; + break; + case "--batch-size": { + const parsed = Number.parseInt(next(), 10); + if (!Number.isFinite(parsed) || parsed < 1) throw new Error("--batch-size must be a positive integer"); + opts.batchSize = parsed; + break; + } + case "--help": + case "-h": + console.log(usage()); + process.exit(0); + default: + throw new Error(`Unknown argument: ${arg}`); + } + } + + if (!opts.postgresUrl || !/^postgres(?:ql)?:\/\//i.test(opts.postgresUrl)) { + throw new Error("--postgres-url or DATABASE_URL must be a postgres:// URL"); + } + if (!existsSync(opts.sqlitePath)) throw new Error(`SQLite source does not exist: ${opts.sqlitePath}`); + if (!existsSync(opts.migrationsDir)) throw new Error(`Migrations directory does not exist: ${opts.migrationsDir}`); + return opts; +} + +function quoteIdent(name: string): string { + if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(name)) throw new Error(`Unsupported identifier: ${name}`); + return `"${name}"`; +} + +function sqliteTables(db: DatabaseSync): string[] { + return db + .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + .all() + .map((row) => String((row as { name: unknown }).name)) + .filter((table) => !table.startsWith("sqlite_") && !INTERNAL_SQLITE_TABLES.has(table)); +} + +function sqliteColumns(db: DatabaseSync, table: string): string[] { + return db + .prepare(`PRAGMA table_info(${quoteIdent(table)})`) + .all() + .map((row) => String((row as { name: unknown }).name)); +} + +function sqliteCount(db: DatabaseSync, table: string): number { + const row = db.prepare(`SELECT COUNT(*) AS count FROM ${quoteIdent(table)}`).get() as { count: number }; + return Number(row.count); +} + +function sqliteRows(db: DatabaseSync, table: string, columns: string[], limit: number, offset: number): Record[] { + const projection = columns.map(quoteIdent).join(", "); + return db.prepare(`SELECT ${projection} FROM ${quoteIdent(table)} LIMIT ? OFFSET ?`).all(limit, offset) as Record[]; +} + +async function pgTables(client: PoolClient): Promise> { + const res = await client.query<{ table_name: string }>( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'", + ); + return new Set(res.rows.map((row) => row.table_name)); +} + +async function pgColumns(client: PoolClient, table: string): Promise { + const res = await client.query<{ column_name: string }>( + "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position", + [table], + ); + return res.rows.map((row) => row.column_name); +} + +async function pgPrimaryKey(client: PoolClient, table: string): Promise { + const res = await client.query<{ column_name: string }>( + ` + SELECT a.attname AS column_name + FROM pg_index i + JOIN LATERAL unnest(i.indkey) WITH ORDINALITY AS k(attnum, ord) ON true + JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = k.attnum + WHERE i.indrelid = $1::regclass + AND i.indisprimary + ORDER BY k.ord + `, + [table], + ); + return res.rows.map((row) => row.column_name); +} + +async function pgCount(client: PoolClient, table: string): Promise { + const res = await client.query<{ count: string }>(`SELECT COUNT(*)::text AS count FROM ${quoteIdent(table)}`); + return Number(res.rows[0]?.count ?? 0); +} + +function valuePlaceholder(index: number, table: string, column: string): string { + const base = `$${index}`; + if (table === "_selfhost_vectors" && column === "embedding") return `${base}::vector`; + if (table === "_selfhost_vectors" && column === "metadata") return `${base}::jsonb`; + return base; +} + +function insertSql(table: string, columns: string[], primaryKey: string[], rowCount: number): string { + const columnSql = columns.map(quoteIdent).join(", "); + const valuesSql = Array.from({ length: rowCount }, (_, rowIndex) => { + const placeholders = columns.map((column, columnIndex) => valuePlaceholder(rowIndex * columns.length + columnIndex + 1, table, column)); + return `(${placeholders.join(", ")})`; + }).join(", "); + const conflictColumns = primaryKey.filter((column) => columns.includes(column)); + if (conflictColumns.length === 0) return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql}`; + const updateColumns = columns.filter((column) => !conflictColumns.includes(column)); + const conflictTarget = conflictColumns.map(quoteIdent).join(", "); + if (updateColumns.length === 0) { + return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql} ON CONFLICT (${conflictTarget}) DO NOTHING`; + } + const updates = updateColumns.map((column) => `${quoteIdent(column)} = EXCLUDED.${quoteIdent(column)}`).join(", "); + return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql} ON CONFLICT (${conflictTarget}) DO UPDATE SET ${updates}`; +} + +async function copyTable(db: DatabaseSync, client: PoolClient, table: string, columns: string[], primaryKey: string[], batchSize: number): Promise { + const total = sqliteCount(db, table); + for (let offset = 0; offset < total; offset += batchSize) { + const rows = sqliteRows(db, table, columns, batchSize, offset); + if (rows.length === 0) continue; + const values = rows.flatMap((row) => columns.map((column) => row[column] ?? null)); + await client.query(insertSql(table, columns, primaryKey, rows.length), values); + } + return total; +} + +async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Promise<{ copied: CopyResult[]; skipped: SkipResult[] }> { + const copied: CopyResult[] = []; + const skipped: SkipResult[] = []; + let targetTables = await pgTables(client); + const sourceTables = sqliteTables(db); + + for (const table of sourceTables) { + if (table === "_selfhost_vectors" && !opts.includeVectors) { + skipped.push({ table, reason: "vectors are externalized by Qdrant or can be rebuilt; pass --include-vectors for pgvector" }); + continue; + } + if (!targetTables.has(table)) throw new Error(`Target Postgres schema is missing source table: ${table}`); + const targetCount = await pgCount(client, table); + if (targetCount > 0 && !opts.allowNonEmpty && !TABLES_ALLOWED_AFTER_SCHEMA_INIT.has(table)) { + throw new Error(`Target table ${table} already contains ${targetCount} row(s); rerun with --allow-non-empty only if this is intentional`); + } + const sourceColumns = sqliteColumns(db, table); + const targetColumns = await pgColumns(client, table); + const commonColumns = sourceColumns.filter((column) => targetColumns.includes(column)); + if (commonColumns.length === 0) { + skipped.push({ table, reason: "no common columns" }); + continue; + } + const primaryKey = await pgPrimaryKey(client, table); + const rows = await copyTable(db, client, table, commonColumns, primaryKey, opts.batchSize); + copied.push({ table, rows }); + } + + if (targetTables.has("_selfhost_jobs")) { + await client.query( + "SELECT setval(pg_get_serial_sequence('_selfhost_jobs', 'id'), COALESCE((SELECT MAX(id) FROM _selfhost_jobs), 1), (SELECT COUNT(*) > 0 FROM _selfhost_jobs))", + ); + } + + // Re-run queue init after copying so migrated processing rows are recovered and derived job metadata is current. + const queue = createPgQueue(client as unknown as pg.Pool, async () => undefined); + await queue.init(); + await queue.stop(); + targetTables = await pgTables(client); + + for (const result of copied) { + if (!targetTables.has(result.table)) continue; + const targetCount = await pgCount(client, result.table); + if (result.table === "_selfhost_job_stats") { + if (targetCount < result.rows) { + throw new Error(`Validation failed for ${result.table}: copied ${result.rows} row(s), target has ${targetCount}`); + } + continue; + } + if (targetCount !== result.rows) { + throw new Error(`Validation failed for ${result.table}: copied ${result.rows} row(s), target has ${targetCount}`); + } + } + return { copied, skipped }; +} + +async function main(): Promise { + const opts = parseArgs(process.argv.slice(2)); + const sqlite = new DatabaseSync(opts.sqlitePath, { readOnly: true }); + sqlite.exec("PRAGMA query_only = ON; PRAGMA busy_timeout = 5000;"); + + const pool = new pg.Pool({ connectionString: opts.postgresUrl, max: 1 }); + const client = await pool.connect(); + let finished = false; + try { + await client.query("BEGIN"); + const db = createPgAdapter(client as unknown as pg.Pool); + const migrationsApplied = await runSelfHostMigrations(db, opts.migrationsDir); + const queue = createPgQueue(client as unknown as pg.Pool, async () => undefined); + await queue.init(); + await queue.stop(); + if (opts.includeVectors) await initPgVectorize(client as unknown as pg.Pool); + + const { copied, skipped } = await copyAll(opts, sqlite, client); + if (opts.execute) { + await client.query("COMMIT"); + finished = true; + } else { + await client.query("ROLLBACK"); + finished = true; + } + + console.log( + JSON.stringify( + { + mode: opts.execute ? "executed" : "dry_run_rolled_back", + migrationsApplied, + copied, + skipped, + }, + null, + 2, + ), + ); + } finally { + if (!finished) await client.query("ROLLBACK").catch(() => undefined); + client.release(); + await pool.end(); + sqlite.close(); + } +} + +main().catch((error) => { + console.error(error instanceof Error ? error.message : String(error)); + process.exit(1); +}); diff --git a/test/unit/selfhost-grafana-reporting.test.ts b/test/unit/selfhost-grafana-reporting.test.ts index 13df7b3d9..20aab80fc 100644 --- a/test/unit/selfhost-grafana-reporting.test.ts +++ b/test/unit/selfhost-grafana-reporting.test.ts @@ -1,5 +1,5 @@ import { execFileSync } from "node:child_process"; -import { mkdtempSync, rmSync } from "node:fs"; +import { chmodSync, existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, describe, expect, it } from "vitest"; @@ -28,7 +28,7 @@ function sqlite(db: string, sql: string): string { return execFileSync("sqlite3", [db, sql], { encoding: "utf8" }).trim(); } -function runExporter(root: string, sourceDb: string, outDb: string): void { +function runExporter(root: string, sourceDb: string, outDb: string, env: Record = {}): void { execFileSync("sh", ["scripts/export-grafana-reporting-db.sh"], { cwd: process.cwd(), env: { @@ -36,11 +36,59 @@ function runExporter(root: string, sourceDb: string, outDb: string): void { GITTENSORY_REPORTING_SOURCE_DB: sourceDb, GITTENSORY_REPORTING_DIR: root, GITTENSORY_REPORTING_DB: outDb, + ...env, }, stdio: "pipe", }); } +function fakePsql(root: string): string { + const bin = join(root, "bin"); + mkdirSync(bin); + const psql = join(bin, "psql"); + writeFileSync( + psql, + `#!/bin/sh +args="$*" +case "$args" in + *"information_schema.tables"*"pull_requests"*|*"information_schema.tables"*"advisories"*|*"information_schema.tables"*"review_targets"*|*"information_schema.tables"*"ai_usage_events"*) + printf '1\\n' + ;; + *"information_schema.columns"*"ai_usage_events"*"estimated_neurons"*) + printf '1\\n' + ;; + *"FROM current_pull_requests"*) + printf '"JSONbored/gittensory",1690,JSONbored,commented,comment,"fresh advisory PR",2026-06-28T21:00:00Z,2026-06-28T21:40:00Z\\n' + printf '"JSONbored/gittensory",1691,tmimmanuel,merged,merge,"fresh merged PR",2026-06-28T21:30:00Z,2026-06-28T21:47:40Z\\n' + ;; + *"FROM review_targets t"*) + printf '"JSONbored/gittensory",1049,bohdansolovie,closed,close,"historical PR",2026-06-22T17:28:56Z,2026-06-22T17:28:56Z\\n' + ;; + *"FROM ai_usage_events"*) + printf 'ai_review_pr,codex:gpt-5.5,ok,42,done,"{""repoFullName"" : ""JSONbored/gittensory"", ""pullNumber"" : 1678}",2026-06-28T00:00:00Z\\n' + ;; +esac +`, + ); + chmodSync(psql, 0o755); + return bin; +} + +function failingPsql(root: string): string { + const bin = join(root, "broken-bin"); + mkdirSync(bin); + const psql = join(bin, "psql"); + writeFileSync( + psql, + `#!/bin/sh +echo 'connection refused' >&2 +exit 7 +`, + ); + chmodSync(psql, 0o755); + return bin; +} + (sqliteCliAvailable ? describe : describe.skip)("Grafana reporting exporter", () => { it("prefers current pull request rows while preserving non-overlapping legacy review history", () => { const root = tmpRoot(); @@ -181,4 +229,39 @@ function runExporter(root: string, sourceDb: string, outDb: string): void { expect(sqlite(outDb, "PRAGMA quick_check;")).toBe("ok"); expect(sqlite(outDb, "SELECT estimated_neurons FROM ai_usage_events;")).toBe("0"); }); + + it("exports the same redacted dashboard snapshot from Postgres", () => { + const root = tmpRoot(); + const outDb = join(root, "reporting.sqlite"); + const bin = fakePsql(root); + + runExporter(root, join(root, "unused.sqlite"), outDb, { + DATABASE_URL: "postgres://gittensory:pw@postgres:5432/gittensory", + PATH: `${bin}:${process.env.PATH ?? ""}`, + }); + + expect(sqlite(outDb, "PRAGMA quick_check;")).toBe("ok"); + expect(sqlite(outDb, "SELECT count(*) FROM review_targets;")).toBe("3"); + expect(sqlite(outDb, "SELECT submitter || '|' || status || '|' || verdict FROM review_targets WHERE repo='JSONbored/gittensory' AND number=1690;")).toBe( + "JSONbored|commented|comment", + ); + expect(sqlite(outDb, "SELECT title FROM review_targets WHERE repo='JSONbored/gittensory' AND number=1049;")).toBe("historical PR"); + expect(sqlite(outDb, "SELECT estimated_neurons FROM ai_usage_events;")).toBe("42"); + expect(sqlite(outDb, "SELECT json_extract(metadata_json, '$.repoFullName') FROM ai_usage_events;")).toBe("JSONbored/gittensory"); + expect(sqlite(outDb, "SELECT json_extract(metadata_json, '$.private') IS NULL FROM ai_usage_events;")).toBe("1"); + }); + + it("fails closed when Postgres metadata cannot be inspected", () => { + const root = tmpRoot(); + const outDb = join(root, "reporting.sqlite"); + const bin = failingPsql(root); + + expect(() => + runExporter(root, join(root, "unused.sqlite"), outDb, { + DATABASE_URL: "postgres://gittensory:pw@postgres:5432/gittensory", + PATH: `${bin}:${process.env.PATH ?? ""}`, + }), + ).toThrow(); + expect(existsSync(outDb)).toBe(false); + }); }); From 452f4504f3f6c726c8022d98fb376bdff454d614 Mon Sep 17 00:00:00 2001 From: JSONbored <49853598+JSONbored@users.noreply.github.com> Date: Tue, 30 Jun 2026 22:27:12 -0700 Subject: [PATCH 2/5] fix(selfhost): harden Postgres cutover tooling --- scripts/export-grafana-reporting-db.sh | 2 +- .../migrate-selfhost-sqlite-to-postgres.ts | 63 ++++++++++++++++--- test/unit/selfhost-grafana-reporting.test.ts | 4 ++ 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/scripts/export-grafana-reporting-db.sh b/scripts/export-grafana-reporting-db.sh index 869febe40..7ba0a3c98 100644 --- a/scripts/export-grafana-reporting-db.sh +++ b/scripts/export-grafana-reporting-db.sh @@ -49,7 +49,7 @@ pg_column_exists() { pg_copy_csv() { query="$1" out="$2" - psql "$PG_DB" -X -q -v ON_ERROR_STOP=1 -c "\\copy ($query) TO STDOUT WITH CSV" >"$out" + psql "$PG_DB" -X -q -v ON_ERROR_STOP=1 -c "COPY ($query) TO STDOUT WITH CSV" >"$out" } sqlite_import_csv() { diff --git a/scripts/migrate-selfhost-sqlite-to-postgres.ts b/scripts/migrate-selfhost-sqlite-to-postgres.ts index f1722bd18..a1a62744a 100644 --- a/scripts/migrate-selfhost-sqlite-to-postgres.ts +++ b/scripts/migrate-selfhost-sqlite-to-postgres.ts @@ -20,6 +20,8 @@ interface Options { interface CopyResult { table: string; rows: number; + targetRowsBefore: number; + keyColumns: string[]; } interface SkipResult { @@ -208,6 +210,41 @@ async function copyTable(db: DatabaseSync, client: PoolClient, table: string, co return total; } +async function countTargetRowsMatchingSourceKeys( + db: DatabaseSync, + client: PoolClient, + table: string, + keyColumns: string[], + batchSize: number, +): Promise { + const total = sqliteCount(db, table); + let matched = 0; + for (let offset = 0; offset < total; offset += batchSize) { + const rows = sqliteRows(db, table, keyColumns, batchSize, offset); + if (rows.length === 0) continue; + const values: unknown[] = []; + for (const row of rows) { + for (const column of keyColumns) { + const value = row[column] ?? null; + if (value === null) throw new Error(`Validation failed for ${table}: source primary key ${column} is null`); + values.push(value); + } + } + const condition = + keyColumns.length === 1 + ? `${quoteIdent(keyColumns[0] as string)} IN (${values.map((_, index) => `$${index + 1}`).join(", ")})` + : `(${keyColumns.map(quoteIdent).join(", ")}) IN (${rows + .map((_, rowIndex) => { + const base = rowIndex * keyColumns.length; + return `(${keyColumns.map((__, columnIndex) => `$${base + columnIndex + 1}`).join(", ")})`; + }) + .join(", ")})`; + const res = await client.query<{ count: string }>(`SELECT COUNT(*)::text AS count FROM ${quoteIdent(table)} WHERE ${condition}`, values); + matched += Number(res.rows[0]?.count ?? 0); + } + return matched; +} + async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Promise<{ copied: CopyResult[]; skipped: SkipResult[] }> { const copied: CopyResult[] = []; const skipped: SkipResult[] = []; @@ -220,9 +257,9 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro continue; } if (!targetTables.has(table)) throw new Error(`Target Postgres schema is missing source table: ${table}`); - const targetCount = await pgCount(client, table); - if (targetCount > 0 && !opts.allowNonEmpty && !TABLES_ALLOWED_AFTER_SCHEMA_INIT.has(table)) { - throw new Error(`Target table ${table} already contains ${targetCount} row(s); rerun with --allow-non-empty only if this is intentional`); + const targetRowsBefore = await pgCount(client, table); + if (targetRowsBefore > 0 && !opts.allowNonEmpty && !TABLES_ALLOWED_AFTER_SCHEMA_INIT.has(table)) { + throw new Error(`Target table ${table} already contains ${targetRowsBefore} row(s); rerun with --allow-non-empty only if this is intentional`); } const sourceColumns = sqliteColumns(db, table); const targetColumns = await pgColumns(client, table); @@ -232,8 +269,9 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro continue; } const primaryKey = await pgPrimaryKey(client, table); + const keyColumns = primaryKey.filter((column) => commonColumns.includes(column)); const rows = await copyTable(db, client, table, commonColumns, primaryKey, opts.batchSize); - copied.push({ table, rows }); + copied.push({ table, rows, targetRowsBefore, keyColumns }); } if (targetTables.has("_selfhost_jobs")) { @@ -251,9 +289,20 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro for (const result of copied) { if (!targetTables.has(result.table)) continue; const targetCount = await pgCount(client, result.table); - if (result.table === "_selfhost_job_stats") { - if (targetCount < result.rows) { - throw new Error(`Validation failed for ${result.table}: copied ${result.rows} row(s), target has ${targetCount}`); + if (result.table === "_selfhost_job_stats" || result.targetRowsBefore > 0) { + if (targetCount < result.targetRowsBefore) { + throw new Error(`Validation failed for ${result.table}: expected to preserve at least ${result.targetRowsBefore} existing row(s), target has ${targetCount}`); + } + if (result.rows > 0 && result.keyColumns.length > 0) { + const matched = await countTargetRowsMatchingSourceKeys(db, client, result.table, result.keyColumns, opts.batchSize); + if (matched !== result.rows) { + throw new Error(`Validation failed for ${result.table}: copied ${result.rows} source row(s), target has ${matched} matching source key(s)`); + } + continue; + } + const minimumExpectedRows = result.targetRowsBefore + result.rows; + if (targetCount < minimumExpectedRows) { + throw new Error(`Validation failed for ${result.table}: expected at least ${minimumExpectedRows} row(s), target has ${targetCount}`); } continue; } diff --git a/test/unit/selfhost-grafana-reporting.test.ts b/test/unit/selfhost-grafana-reporting.test.ts index 20aab80fc..7f63c443d 100644 --- a/test/unit/selfhost-grafana-reporting.test.ts +++ b/test/unit/selfhost-grafana-reporting.test.ts @@ -51,6 +51,10 @@ function fakePsql(root: string): string { `#!/bin/sh args="$*" case "$args" in + *\\\\copy*) + echo 'unexpected psql meta-command copy' >&2 + exit 9 + ;; *"information_schema.tables"*"pull_requests"*|*"information_schema.tables"*"advisories"*|*"information_schema.tables"*"review_targets"*|*"information_schema.tables"*"ai_usage_events"*) printf '1\\n' ;; From ca8794cea1bb5b9db62518e0facf250df6d55b9b Mon Sep 17 00:00:00 2001 From: JSONbored <49853598+JSONbored@users.noreply.github.com> Date: Tue, 30 Jun 2026 22:37:08 -0700 Subject: [PATCH 3/5] fix(selfhost): preserve Postgres migration resume semantics --- .../migrate-selfhost-sqlite-to-postgres.ts | 84 ++++++++++++------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/scripts/migrate-selfhost-sqlite-to-postgres.ts b/scripts/migrate-selfhost-sqlite-to-postgres.ts index a1a62744a..3a644d19a 100644 --- a/scripts/migrate-selfhost-sqlite-to-postgres.ts +++ b/scripts/migrate-selfhost-sqlite-to-postgres.ts @@ -22,6 +22,7 @@ interface CopyResult { rows: number; targetRowsBefore: number; keyColumns: string[]; + commonColumns: string[]; } interface SkipResult { @@ -175,6 +176,37 @@ async function pgCount(client: PoolClient, table: string): Promise { return Number(res.rows[0]?.count ?? 0); } +async function resetPgSequences(client: PoolClient, tables: Set): Promise { + const tableNames = [...tables]; + if (tableNames.length === 0) return; + const res = await client.query<{ table_name: string; column_name: string; sequence_name: string }>( + ` + SELECT + c.table_name, + c.column_name, + pg_get_serial_sequence(format('%I.%I', c.table_schema, c.table_name), c.column_name) AS sequence_name + FROM information_schema.columns c + WHERE c.table_schema = 'public' + AND c.table_name = ANY($1::text[]) + AND pg_get_serial_sequence(format('%I.%I', c.table_schema, c.table_name), c.column_name) IS NOT NULL + ORDER BY c.table_name, c.ordinal_position + `, + [tableNames], + ); + for (const row of res.rows) { + await client.query( + ` + SELECT setval( + $1::regclass, + COALESCE((SELECT MAX(${quoteIdent(row.column_name)}) FROM ${quoteIdent(row.table_name)}), 1), + (SELECT COUNT(${quoteIdent(row.column_name)}) > 0 FROM ${quoteIdent(row.table_name)}) + ) + `, + [row.sequence_name], + ); + } +} + function valuePlaceholder(index: number, table: string, column: string): string { const base = `$${index}`; if (table === "_selfhost_vectors" && column === "embedding") return `${base}::vector`; @@ -210,35 +242,29 @@ async function copyTable(db: DatabaseSync, client: PoolClient, table: string, co return total; } -async function countTargetRowsMatchingSourceKeys( +async function countTargetRowsMatchingSourceRows( db: DatabaseSync, client: PoolClient, table: string, - keyColumns: string[], + columns: string[], batchSize: number, ): Promise { const total = sqliteCount(db, table); let matched = 0; for (let offset = 0; offset < total; offset += batchSize) { - const rows = sqliteRows(db, table, keyColumns, batchSize, offset); + const rows = sqliteRows(db, table, columns, batchSize, offset); if (rows.length === 0) continue; const values: unknown[] = []; for (const row of rows) { - for (const column of keyColumns) { - const value = row[column] ?? null; - if (value === null) throw new Error(`Validation failed for ${table}: source primary key ${column} is null`); - values.push(value); - } + for (const column of columns) values.push(row[column] ?? null); } - const condition = - keyColumns.length === 1 - ? `${quoteIdent(keyColumns[0] as string)} IN (${values.map((_, index) => `$${index + 1}`).join(", ")})` - : `(${keyColumns.map(quoteIdent).join(", ")}) IN (${rows - .map((_, rowIndex) => { - const base = rowIndex * keyColumns.length; - return `(${keyColumns.map((__, columnIndex) => `$${base + columnIndex + 1}`).join(", ")})`; - }) - .join(", ")})`; + const condition = rows + .map((_, rowIndex) => { + const base = rowIndex * columns.length; + const predicates = columns.map((column, columnIndex) => `${quoteIdent(column)} IS NOT DISTINCT FROM $${base + columnIndex + 1}`); + return `(${predicates.join(" AND ")})`; + }) + .join(" OR "); const res = await client.query<{ count: string }>(`SELECT COUNT(*)::text AS count FROM ${quoteIdent(table)} WHERE ${condition}`, values); matched += Number(res.rows[0]?.count ?? 0); } @@ -271,19 +297,16 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro const primaryKey = await pgPrimaryKey(client, table); const keyColumns = primaryKey.filter((column) => commonColumns.includes(column)); const rows = await copyTable(db, client, table, commonColumns, primaryKey, opts.batchSize); - copied.push({ table, rows, targetRowsBefore, keyColumns }); + copied.push({ table, rows, targetRowsBefore, keyColumns, commonColumns }); } - if (targetTables.has("_selfhost_jobs")) { - await client.query( - "SELECT setval(pg_get_serial_sequence('_selfhost_jobs', 'id'), COALESCE((SELECT MAX(id) FROM _selfhost_jobs), 1), (SELECT COUNT(*) > 0 FROM _selfhost_jobs))", - ); - } + await resetPgSequences(client, new Set(copied.map((result) => result.table))); // Re-run queue init after copying so migrated processing rows are recovered and derived job metadata is current. const queue = createPgQueue(client as unknown as pg.Pool, async () => undefined); await queue.init(); await queue.stop(); + await resetPgSequences(client, new Set(copied.map((result) => result.table))); targetTables = await pgTables(client); for (const result of copied) { @@ -293,17 +316,16 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro if (targetCount < result.targetRowsBefore) { throw new Error(`Validation failed for ${result.table}: expected to preserve at least ${result.targetRowsBefore} existing row(s), target has ${targetCount}`); } - if (result.rows > 0 && result.keyColumns.length > 0) { - const matched = await countTargetRowsMatchingSourceKeys(db, client, result.table, result.keyColumns, opts.batchSize); - if (matched !== result.rows) { - throw new Error(`Validation failed for ${result.table}: copied ${result.rows} source row(s), target has ${matched} matching source key(s)`); + const validationColumns = result.keyColumns.length > 0 ? result.keyColumns : result.commonColumns; + if (result.rows > 0 && validationColumns.length > 0) { + const matched = await countTargetRowsMatchingSourceRows(db, client, result.table, validationColumns, opts.batchSize); + const validMatchCount = result.keyColumns.length > 0 ? matched === result.rows : matched >= result.rows; + if (!validMatchCount) { + const unit = result.keyColumns.length > 0 ? "source key(s)" : "source row(s)"; + throw new Error(`Validation failed for ${result.table}: copied ${result.rows} source row(s), target has ${matched} matching ${unit}`); } continue; } - const minimumExpectedRows = result.targetRowsBefore + result.rows; - if (targetCount < minimumExpectedRows) { - throw new Error(`Validation failed for ${result.table}: expected at least ${minimumExpectedRows} row(s), target has ${targetCount}`); - } continue; } if (targetCount !== result.rows) { From fc4325e82027a9da682dd912afa66efab0b0f68b Mon Sep 17 00:00:00 2001 From: JSONbored <49853598+JSONbored@users.noreply.github.com> Date: Tue, 30 Jun 2026 22:48:27 -0700 Subject: [PATCH 4/5] fix(selfhost): fail closed on Postgres migration conflicts --- .../migrate-selfhost-sqlite-to-postgres.ts | 96 +++++++++++++++++-- 1 file changed, 86 insertions(+), 10 deletions(-) diff --git a/scripts/migrate-selfhost-sqlite-to-postgres.ts b/scripts/migrate-selfhost-sqlite-to-postgres.ts index 3a644d19a..31cb79847 100644 --- a/scripts/migrate-selfhost-sqlite-to-postgres.ts +++ b/scripts/migrate-selfhost-sqlite-to-postgres.ts @@ -44,7 +44,7 @@ Options: --postgres-url Postgres target URL. Defaults to DATABASE_URL. --migrations-dir Migration directory. Defaults to migrations. --execute Commit the copy. Omit for a rollback dry run. - --allow-non-empty Allow non-empty target app tables. Off by default. + --allow-non-empty Allow non-empty target tables only when overlapping primary keys are identical. --include-vectors Also copy _selfhost_vectors into pgvector. --batch-size Rows per INSERT batch. Defaults to 250.`; } @@ -155,6 +155,34 @@ async function pgColumns(client: PoolClient, table: string): Promise { return res.rows.map((row) => row.column_name); } +async function prunePgSchemaInitSeed(client: PoolClient, table: string, columns: string[]): Promise { + if (table === "global_agent_controls") { + const lastFanoutPredicate = columns.includes("last_regate_fanout_at") ? "AND last_regate_fanout_at IS NULL" : ""; + const res = await client.query( + ` + DELETE FROM global_agent_controls + WHERE id = 'singleton' + AND frozen = 0 + AND updated_by IS NULL + ${lastFanoutPredicate} + `, + ); + return res.rowCount ?? 0; + } + if (table === "global_contributor_blacklist") { + const res = await client.query( + ` + DELETE FROM global_contributor_blacklist + WHERE id = 'singleton' + AND contributor_blacklist_json = '[]' + AND updated_by IS NULL + `, + ); + return res.rowCount ?? 0; + } + return 0; +} + async function pgPrimaryKey(client: PoolClient, table: string): Promise { const res = await client.query<{ column_name: string }>( ` @@ -222,13 +250,9 @@ function insertSql(table: string, columns: string[], primaryKey: string[], rowCo }).join(", "); const conflictColumns = primaryKey.filter((column) => columns.includes(column)); if (conflictColumns.length === 0) return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql}`; - const updateColumns = columns.filter((column) => !conflictColumns.includes(column)); const conflictTarget = conflictColumns.map(quoteIdent).join(", "); - if (updateColumns.length === 0) { - return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql} ON CONFLICT (${conflictTarget}) DO NOTHING`; - } - const updates = updateColumns.map((column) => `${quoteIdent(column)} = EXCLUDED.${quoteIdent(column)}`).join(", "); - return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql} ON CONFLICT (${conflictTarget}) DO UPDATE SET ${updates}`; + // Conflict compatibility is checked before copy; never overwrite an existing target row implicitly. + return `INSERT INTO ${quoteIdent(table)} (${columnSql}) VALUES ${valuesSql} ON CONFLICT (${conflictTarget}) DO NOTHING`; } async function copyTable(db: DatabaseSync, client: PoolClient, table: string, columns: string[], primaryKey: string[], batchSize: number): Promise { @@ -271,6 +295,38 @@ async function countTargetRowsMatchingSourceRows( return matched; } +async function countConflictingTargetRowsForSourceKeys( + db: DatabaseSync, + client: PoolClient, + table: string, + keyColumns: string[], + compareColumns: string[], + batchSize: number, +): Promise { + const total = sqliteCount(db, table); + let conflicts = 0; + for (let offset = 0; offset < total; offset += batchSize) { + const rows = sqliteRows(db, table, compareColumns, batchSize, offset); + if (rows.length === 0) continue; + const values: unknown[] = []; + const condition = rows + .map((row) => { + const parameterByColumn = new Map(); + for (const column of compareColumns) { + values.push(row[column] ?? null); + parameterByColumn.set(column, values.length); + } + const keyPredicates = keyColumns.map((column) => `${quoteIdent(column)} IS NOT DISTINCT FROM $${parameterByColumn.get(column)}`); + const differencePredicates = compareColumns.map((column) => `${quoteIdent(column)} IS DISTINCT FROM $${parameterByColumn.get(column)}`); + return `((${keyPredicates.join(" AND ")}) AND (${differencePredicates.join(" OR ")}))`; + }) + .join(" OR "); + const res = await client.query<{ count: string }>(`SELECT COUNT(*)::text AS count FROM ${quoteIdent(table)} WHERE ${condition}`, values); + conflicts += Number(res.rows[0]?.count ?? 0); + } + return conflicts; +} + async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Promise<{ copied: CopyResult[]; skipped: SkipResult[] }> { const copied: CopyResult[] = []; const skipped: SkipResult[] = []; @@ -283,12 +339,15 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro continue; } if (!targetTables.has(table)) throw new Error(`Target Postgres schema is missing source table: ${table}`); + const sourceColumns = sqliteColumns(db, table); + const targetColumns = await pgColumns(client, table); + if (TABLES_ALLOWED_AFTER_SCHEMA_INIT.has(table)) { + await prunePgSchemaInitSeed(client, table, targetColumns); + } const targetRowsBefore = await pgCount(client, table); - if (targetRowsBefore > 0 && !opts.allowNonEmpty && !TABLES_ALLOWED_AFTER_SCHEMA_INIT.has(table)) { + if (targetRowsBefore > 0 && !opts.allowNonEmpty) { throw new Error(`Target table ${table} already contains ${targetRowsBefore} row(s); rerun with --allow-non-empty only if this is intentional`); } - const sourceColumns = sqliteColumns(db, table); - const targetColumns = await pgColumns(client, table); const commonColumns = sourceColumns.filter((column) => targetColumns.includes(column)); if (commonColumns.length === 0) { skipped.push({ table, reason: "no common columns" }); @@ -296,6 +355,12 @@ async function copyAll(opts: Options, db: DatabaseSync, client: PoolClient): Pro } const primaryKey = await pgPrimaryKey(client, table); const keyColumns = primaryKey.filter((column) => commonColumns.includes(column)); + if (targetRowsBefore > 0 && keyColumns.length > 0) { + const conflicts = await countConflictingTargetRowsForSourceKeys(db, client, table, keyColumns, commonColumns, opts.batchSize); + if (conflicts > 0) { + throw new Error(`Target table ${table} already contains ${conflicts} conflicting row(s); --allow-non-empty only permits identical overlapping primary keys`); + } + } const rows = await copyTable(db, client, table, commonColumns, primaryKey, opts.batchSize); copied.push({ table, rows, targetRowsBefore, keyColumns, commonColumns }); } @@ -343,7 +408,11 @@ async function main(): Promise { const pool = new pg.Pool({ connectionString: opts.postgresUrl, max: 1 }); const client = await pool.connect(); let finished = false; + let sqliteTransactionOpen = false; try { + // Pin one source snapshot across COUNT + paged reads so a live writer cannot create a mixed copy. + sqlite.exec("BEGIN DEFERRED TRANSACTION;"); + sqliteTransactionOpen = true; await client.query("BEGIN"); const db = createPgAdapter(client as unknown as pg.Pool); const migrationsApplied = await runSelfHostMigrations(db, opts.migrationsDir); @@ -377,6 +446,13 @@ async function main(): Promise { if (!finished) await client.query("ROLLBACK").catch(() => undefined); client.release(); await pool.end(); + if (sqliteTransactionOpen) { + try { + sqlite.exec("ROLLBACK"); + } catch { + // Read-only snapshot cleanup is best-effort; Postgres rollback above is the safety boundary. + } + } sqlite.close(); } } From 95d02f88b3e348a23e3586a313b3a1fd350eebdb Mon Sep 17 00:00:00 2001 From: JSONbored <49853598+JSONbored@users.noreply.github.com> Date: Tue, 30 Jun 2026 23:03:38 -0700 Subject: [PATCH 5/5] fix(selfhost): harden Postgres cutover cleanup --- scripts/export-grafana-reporting-db.sh | 27 +++++++--- .../migrate-selfhost-sqlite-to-postgres.ts | 3 ++ test/unit/selfhost-grafana-reporting.test.ts | 50 ++++++++++++++++++- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/scripts/export-grafana-reporting-db.sh b/scripts/export-grafana-reporting-db.sh index 7ba0a3c98..6f6f007c5 100644 --- a/scripts/export-grafana-reporting-db.sh +++ b/scripts/export-grafana-reporting-db.sh @@ -6,11 +6,25 @@ PG_DB="${GITTENSORY_REPORTING_SOURCE_DATABASE_URL:-${DATABASE_URL:-}}" OUT_DIR="${GITTENSORY_REPORTING_DIR:-/reporting}" OUT_DB="${GITTENSORY_REPORTING_DB:-$OUT_DIR/gittensory-reporting.sqlite}" TMP_DB="${OUT_DB}.tmp" +CSV_TMP_DIR="$(mktemp -d)" + +cleanup() { + rm -rf "$CSV_TMP_DIR" +} +trap cleanup EXIT HUP INT TERM sql_string() { printf "%s" "$1" | sed "s/'/''/g" } +sqlite_dot_string() { + printf "%s" "$1" | sed 's/\\/\\\\/g; s/"/\\"/g; s/^/"/; s/$/"/' +} + +csv_temp_file() { + mktemp "$CSV_TMP_DIR/$1.XXXXXX.csv" +} + source_column_exists() { sqlite3 "$APP_DB" "SELECT 1 FROM pragma_table_info('$1') WHERE name = '$2' LIMIT 1" | grep -q 1 } @@ -56,9 +70,11 @@ sqlite_import_csv() { csv="$1" table="$2" [ -s "$csv" ] || return 0 + csv_arg="$(sqlite_dot_string "$csv")" + table_arg="$(sqlite_dot_string "$table")" sqlite3 "$TMP_DB" < commonColumns.includes(column)); + if (targetRowsBefore > 0 && keyColumns.length === 0) { + throw new Error(`Target table ${table} already contains ${targetRowsBefore} row(s) but has no comparable copied primary key; --allow-non-empty cannot safely merge it`); + } if (targetRowsBefore > 0 && keyColumns.length > 0) { const conflicts = await countConflictingTargetRowsForSourceKeys(db, client, table, keyColumns, commonColumns, opts.batchSize); if (conflicts > 0) { diff --git a/test/unit/selfhost-grafana-reporting.test.ts b/test/unit/selfhost-grafana-reporting.test.ts index 7f63c443d..fe55bc770 100644 --- a/test/unit/selfhost-grafana-reporting.test.ts +++ b/test/unit/selfhost-grafana-reporting.test.ts @@ -1,5 +1,5 @@ import { execFileSync } from "node:child_process"; -import { chmodSync, existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { chmodSync, existsSync, mkdirSync, mkdtempSync, readdirSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, describe, expect, it } from "vitest"; @@ -93,6 +93,32 @@ exit 7 return bin; } +function failingCopyPsql(root: string): string { + const bin = join(root, "copy-fail-bin"); + mkdirSync(bin); + const psql = join(bin, "psql"); + writeFileSync( + psql, + `#!/bin/sh +args="$*" +case "$args" in + *"information_schema.tables"*"pull_requests"*|*"information_schema.tables"*"advisories"*|*"information_schema.tables"*"review_targets"*|*"information_schema.tables"*"ai_usage_events"*) + printf '1\\n' + ;; + *"information_schema.columns"*"ai_usage_events"*"estimated_neurons"*) + printf '1\\n' + ;; + *"FROM current_pull_requests"*) + echo 'copy failed' >&2 + exit 7 + ;; +esac +`, + ); + chmodSync(psql, 0o755); + return bin; +} + (sqliteCliAvailable ? describe : describe.skip)("Grafana reporting exporter", () => { it("prefers current pull request rows while preserving non-overlapping legacy review history", () => { const root = tmpRoot(); @@ -238,10 +264,13 @@ exit 7 const root = tmpRoot(); const outDb = join(root, "reporting.sqlite"); const bin = fakePsql(root); + const csvTmp = join(root, "csv temp"); + mkdirSync(csvTmp); runExporter(root, join(root, "unused.sqlite"), outDb, { DATABASE_URL: "postgres://gittensory:pw@postgres:5432/gittensory", PATH: `${bin}:${process.env.PATH ?? ""}`, + TMPDIR: csvTmp, }); expect(sqlite(outDb, "PRAGMA quick_check;")).toBe("ok"); @@ -253,6 +282,7 @@ exit 7 expect(sqlite(outDb, "SELECT estimated_neurons FROM ai_usage_events;")).toBe("42"); expect(sqlite(outDb, "SELECT json_extract(metadata_json, '$.repoFullName') FROM ai_usage_events;")).toBe("JSONbored/gittensory"); expect(sqlite(outDb, "SELECT json_extract(metadata_json, '$.private') IS NULL FROM ai_usage_events;")).toBe("1"); + expect(readdirSync(csvTmp)).toEqual([]); }); it("fails closed when Postgres metadata cannot be inspected", () => { @@ -268,4 +298,22 @@ exit 7 ).toThrow(); expect(existsSync(outDb)).toBe(false); }); + + it("cleans transient Postgres CSV files when COPY fails", () => { + const root = tmpRoot(); + const outDb = join(root, "reporting.sqlite"); + const bin = failingCopyPsql(root); + const csvTmp = join(root, "csv temp"); + mkdirSync(csvTmp); + + expect(() => + runExporter(root, join(root, "unused.sqlite"), outDb, { + DATABASE_URL: "postgres://gittensory:pw@postgres:5432/gittensory", + PATH: `${bin}:${process.env.PATH ?? ""}`, + TMPDIR: csvTmp, + }), + ).toThrow(); + expect(existsSync(outDb)).toBe(false); + expect(readdirSync(csvTmp)).toEqual([]); + }); });