diff --git a/package.json b/package.json index 23e57d910..3bb461ec9 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,9 @@ "migration:test-idempotency": "tsx src/scripts/test-migration-idempotency.ts", "migrations:types": "tsx src/scripts/migrations-types.ts", "docs:export": "tsx ./src/scripts/export-docs.ts", + "jobs:list": "tsx src/scripts/jobs-client.ts list", + "jobs:backup": "tsx src/scripts/jobs-client.ts backup", + "jobs:restore": "tsx src/scripts/jobs-client.ts restore", "pprof:capture": "tsx src/scripts/pprof-client.ts", "test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts", "test:unit": "vitest run --config vitest.unit.config.ts", diff --git a/src/http/routes/admin/queue.ts b/src/http/routes/admin/queue.ts index 2af65c583..643393078 100644 --- a/src/http/routes/admin/queue.ts +++ b/src/http/routes/admin/queue.ts @@ -2,6 +2,14 @@ import { MoveJobs, UpgradePgBossV10 } from '@storage/events' import { FastifyInstance, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { getConfig } from '../../../config' +import { + backupQueueOverflow, + JOB_OVERFLOW_LIST_LIMIT_DEFAULT, + JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT, + listQueueOverflow, + parseQueueOverflowCsv, + restoreQueueOverflow, +} from '../../../internal/queue/overflow' import apiKey from '../../plugins/apikey' const { pgQueueEnable } = getConfig() @@ -25,10 +33,130 @@ const moveJobsSchema = { }, } as const +const listQueueOverflowSchema = { + description: 'List created pgBoss jobs from the live queue table or overflow backup table.', + querystring: { + type: 'object', + properties: { + source: { + type: 'string', + enum: ['job', 'backup'], + default: 'job', + }, + groupBy: { + type: 'string', + enum: ['summary', 'tenant'], + default: 'summary', + }, + name: { + type: 'string', + minLength: 1, + }, + eventTypes: { + type: 'string', + minLength: 1, + description: 'Comma-separated event types to filter on.', + }, + tenantRefs: { + type: 'string', + minLength: 1, + description: 'Comma-separated tenant refs to filter on.', + }, + limit: { + type: 'integer', + minimum: 1, + default: JOB_OVERFLOW_LIST_LIMIT_DEFAULT, + }, + }, + additionalProperties: false, + }, +} as const + +const backupQueueOverflowSchema = { + description: 'Move created pgBoss jobs into the overflow backup table.', + body: { + type: 'object', + properties: { + name: { + type: 'string', + minLength: 1, + }, + eventTypes: { + type: 'array', + minItems: 1, + items: { + type: 'string', + minLength: 1, + }, + }, + tenantRefs: { + type: 'array', + minItems: 1, + items: { + type: 'string', + minLength: 1, + }, + }, + limit: { + type: 'integer', + minimum: 1, + }, + }, + additionalProperties: false, + }, +} as const + +const restoreQueueOverflowSchema = { + description: 'Restore created pgBoss jobs from the overflow backup table in batches.', + body: { + type: 'object', + properties: { + name: { + type: 'string', + minLength: 1, + }, + eventTypes: { + type: 'array', + minItems: 1, + items: { + type: 'string', + minLength: 1, + }, + }, + tenantRefs: { + type: 'array', + minItems: 1, + items: { + type: 'string', + minLength: 1, + }, + }, + limit: { + type: 'integer', + minimum: 1, + default: JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT, + }, + }, + additionalProperties: false, + }, +} as const + interface MoveJobsRequestInterface extends RequestGenericInterface { Body: FromSchema } +interface ListQueueOverflowRequestInterface extends RequestGenericInterface { + Querystring: FromSchema +} + +interface BackupQueueOverflowRequestInterface extends RequestGenericInterface { + Body: FromSchema +} + +interface RestoreQueueOverflowRequestInterface extends RequestGenericInterface { + Body: FromSchema +} + export default async function routes(fastify: FastifyInstance) { fastify.register(apiKey) @@ -63,4 +191,39 @@ export default async function routes(fastify: FastifyInstance) { return reply.send({ message: 'Move jobs scheduled' }) } ) + + fastify.get( + '/overflow', + { schema: { ...listQueueOverflowSchema, tags: ['queue'] } }, + async (req, reply) => { + const data = await listQueueOverflow({ + source: req.query.source, + groupBy: req.query.groupBy, + name: req.query.name, + eventTypes: parseQueueOverflowCsv(req.query.eventTypes), + tenantRefs: parseQueueOverflowCsv(req.query.tenantRefs), + limit: req.query.limit, + }) + + return reply.send(data) + } + ) + + fastify.post( + '/overflow/backup', + { schema: { ...backupQueueOverflowSchema, tags: ['queue'] } }, + async (req, reply) => { + const data = await backupQueueOverflow(req.body) + return reply.send(data) + } + ) + + fastify.post( + '/overflow/restore', + { schema: { ...restoreQueueOverflowSchema, tags: ['queue'] } }, + async (req, reply) => { + const data = await restoreQueueOverflow(req.body) + return reply.send(data) + } + ) } diff --git a/src/internal/queue/index.ts b/src/internal/queue/index.ts index 209afebc0..2f459c51a 100644 --- a/src/internal/queue/index.ts +++ b/src/internal/queue/index.ts @@ -1,2 +1,3 @@ export * from './event' +export * from './overflow' export * from './queue' diff --git a/src/internal/queue/overflow.test.ts b/src/internal/queue/overflow.test.ts new file mode 100644 index 000000000..6ab343290 --- /dev/null +++ b/src/internal/queue/overflow.test.ts @@ -0,0 +1,63 @@ +import { + buildQueueOverflowWhereClause, + normalizeQueueOverflowFilters, + parseQueueOverflowCsv, +} from './overflow' + +describe('parseQueueOverflowCsv', () => { + it('returns undefined for empty input', () => { + expect(parseQueueOverflowCsv(undefined)).toBeUndefined() + expect(parseQueueOverflowCsv(' , , ')).toBeUndefined() + }) + + it('trims, de-duplicates, and preserves order', () => { + expect( + parseQueueOverflowCsv(' ObjectRemoved:Delete, ObjectCreated:Put,ObjectRemoved:Delete ') + ).toEqual(['ObjectRemoved:Delete', 'ObjectCreated:Put']) + }) +}) + +describe('normalizeQueueOverflowFilters', () => { + it('trims values and removes empty strings', () => { + expect( + normalizeQueueOverflowFilters({ + name: ' webhooks ', + eventTypes: [' ObjectRemoved:Delete ', ''], + tenantRefs: [' tenant-a ', 'tenant-a', ' '], + }) + ).toEqual({ + name: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + tenantRefs: ['tenant-a'], + }) + }) +}) + +describe('buildQueueOverflowWhereClause', () => { + it('always scopes queries to created jobs', () => { + expect(buildQueueOverflowWhereClause({})).toEqual({ + sql: 'state = ?', + bindings: ['created'], + }) + }) + + it('adds queue, event-type, and tenant filters in a stable order', () => { + expect( + buildQueueOverflowWhereClause({ + name: ' webhooks ', + eventTypes: ['ObjectRemoved:Delete', ' ObjectCreated:Put '], + tenantRefs: ['tenant-b', 'tenant-a'], + }) + ).toEqual({ + sql: "state = ? AND name = ? AND data->'event'->>'type' IN (?, ?) AND data->'tenant'->>'ref' IN (?, ?)", + bindings: [ + 'created', + 'webhooks', + 'ObjectRemoved:Delete', + 'ObjectCreated:Put', + 'tenant-b', + 'tenant-a', + ], + }) + }) +}) diff --git a/src/internal/queue/overflow.ts b/src/internal/queue/overflow.ts new file mode 100644 index 000000000..ebef0519e --- /dev/null +++ b/src/internal/queue/overflow.ts @@ -0,0 +1,316 @@ +import { multitenantKnex } from '@internal/database' +import { Knex } from 'knex' +import { PG_BOSS_SCHEMA } from './queue' + +const CREATED_STATE = 'created' +const JOB_TABLE = `${PG_BOSS_SCHEMA}.job` +export const JOB_OVERFLOW_BACKUP_TABLE = `${PG_BOSS_SCHEMA}.job_overflow_backup` +export const JOB_OVERFLOW_LIST_LIMIT_DEFAULT = 50 +export const JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT = 50000 + +export type QueueOverflowSource = 'job' | 'backup' +export type QueueOverflowGroupBy = 'summary' | 'tenant' + +export interface QueueOverflowFilters { + eventTypes?: readonly string[] + name?: string + tenantRefs?: readonly string[] +} + +export interface ListQueueOverflowOptions extends QueueOverflowFilters { + groupBy?: QueueOverflowGroupBy + limit?: number + source?: QueueOverflowSource +} + +export interface MoveQueueOverflowOptions extends QueueOverflowFilters { + limit?: number +} + +export interface QueueOverflowSummaryRow { + count: number + eventType: string | null + name: string +} + +export interface QueueOverflowTenantRow { + count: number + tenantRef: string | null +} + +interface QueueOverflowWhereClause { + bindings: unknown[] + sql: string +} + +type KnexLike = Knex | Knex.Transaction + +export function parseQueueOverflowCsv(value: string | undefined) { + if (!value) { + return undefined + } + + const normalized = Array.from( + new Set( + value + .split(',') + .map((item) => item.trim()) + .filter((item) => item.length > 0) + ) + ) + + return normalized.length > 0 ? normalized : undefined +} + +function normalizeOverflowString(value: string | undefined) { + const normalized = value?.trim() + return normalized ? normalized : undefined +} + +function normalizeOverflowStringList(values: readonly string[] | undefined) { + if (!values) { + return undefined + } + + const normalized = Array.from( + new Set(values.map((value) => value.trim()).filter((value) => value.length > 0)) + ) + + return normalized.length > 0 ? normalized : undefined +} + +export function normalizeQueueOverflowFilters(filters: QueueOverflowFilters): QueueOverflowFilters { + return { + name: normalizeOverflowString(filters.name), + eventTypes: normalizeOverflowStringList(filters.eventTypes), + tenantRefs: normalizeOverflowStringList(filters.tenantRefs), + } +} + +export function buildQueueOverflowWhereClause( + filters: QueueOverflowFilters +): QueueOverflowWhereClause { + const normalizedFilters = normalizeQueueOverflowFilters(filters) + const clauses = ['state = ?'] + const bindings: unknown[] = [CREATED_STATE] + + if (normalizedFilters.name) { + clauses.push('name = ?') + bindings.push(normalizedFilters.name) + } + + if (normalizedFilters.eventTypes?.length) { + clauses.push( + `data->'event'->>'type' IN (${normalizedFilters.eventTypes.map(() => '?').join(', ')})` + ) + bindings.push(...normalizedFilters.eventTypes) + } + + if (normalizedFilters.tenantRefs?.length) { + clauses.push( + `data->'tenant'->>'ref' IN (${normalizedFilters.tenantRefs.map(() => '?').join(', ')})` + ) + bindings.push(...normalizedFilters.tenantRefs) + } + + return { + sql: clauses.join(' AND '), + bindings, + } +} + +export async function queueOverflowBackupTableExists(db: KnexLike = multitenantKnex) { + const result = await db.raw('SELECT to_regclass(?) AS table_name', [JOB_OVERFLOW_BACKUP_TABLE]) + return Boolean(result.rows[0]?.table_name) +} + +export async function ensureQueueOverflowBackupTable(db: KnexLike = multitenantKnex) { + const existed = await queueOverflowBackupTableExists(db) + + if (!existed) { + await db.raw( + `CREATE TABLE IF NOT EXISTS ${JOB_OVERFLOW_BACKUP_TABLE} (LIKE ${JOB_TABLE} INCLUDING ALL)` + ) + } + + return { + created: !existed, + } +} + +function resolveQueueOverflowTable(source: QueueOverflowSource) { + return source === 'backup' ? JOB_OVERFLOW_BACKUP_TABLE : JOB_TABLE +} + +function buildMatchingJobIdsQuery( + db: KnexLike, + tableName: string, + filters: QueueOverflowFilters, + limit?: number +) { + const whereClause = buildQueueOverflowWhereClause(filters) + const query = db(tableName) + .select('id') + .whereRaw(whereClause.sql, whereClause.bindings) + .orderBy('id') + + if (limit !== undefined) { + query.limit(limit) + } + + return query +} + +export async function listQueueOverflow( + options: ListQueueOverflowOptions, + db: KnexLike = multitenantKnex +) { + const source = options.source ?? 'job' + const groupBy = options.groupBy ?? 'summary' + const limit = options.limit ?? JOB_OVERFLOW_LIST_LIMIT_DEFAULT + const filters = normalizeQueueOverflowFilters(options) + const backupTableExists = source === 'backup' ? await queueOverflowBackupTableExists(db) : true + + if (!backupTableExists) { + return { + backupTableExists, + data: [] as QueueOverflowSummaryRow[] | QueueOverflowTenantRow[], + filters, + groupBy, + source, + } + } + + const tableName = resolveQueueOverflowTable(source) + const whereClause = buildQueueOverflowWhereClause(filters) + + if (groupBy === 'tenant') { + const rows = (await db(tableName) + .select(db.raw("data->'tenant'->>'ref' AS tenant_ref")) + .count<{ count: string; tenant_ref: string | null }[]>('* AS count') + .whereRaw(whereClause.sql, whereClause.bindings) + .groupByRaw("data->'tenant'->>'ref'") + .orderBy('count', 'desc') + .orderBy('tenant_ref', 'asc') + .limit(limit)) as { count: string; tenant_ref: string | null }[] + + return { + backupTableExists, + data: rows.map((row) => ({ + count: Number(row.count), + tenantRef: row.tenant_ref, + })), + filters, + groupBy, + source, + } + } + + const rows = (await db(tableName) + .select('name') + .select(db.raw("data->'event'->>'type' AS event_type")) + .count<{ count: string; event_type: string | null; name: string }[]>('* AS count') + .whereRaw(whereClause.sql, whereClause.bindings) + .groupBy('name') + .groupByRaw("data->'event'->>'type'") + .orderBy('count', 'desc') + .orderBy('name', 'asc') + .orderBy('event_type', 'asc') + .limit(limit)) as { count: string; event_type: string | null; name: string }[] + + return { + backupTableExists, + data: rows.map((row) => ({ + count: Number(row.count), + eventType: row.event_type, + name: row.name, + })), + filters, + groupBy, + source, + } +} + +async function moveQueueOverflowJobs( + db: KnexLike, + options: MoveQueueOverflowOptions, + sourceTable: string, + targetTable: string +) { + const selectedIdsQuery = buildMatchingJobIdsQuery(db, sourceTable, options, options.limit) + const compiledQuery = selectedIdsQuery.toSQL() + const result = await db.raw( + ` + WITH moved AS ( + DELETE FROM ${sourceTable} + WHERE id IN (${compiledQuery.sql}) + RETURNING * + ), + inserted AS ( + INSERT INTO ${targetTable} + SELECT * FROM moved + RETURNING 1 + ) + SELECT COUNT(*)::bigint AS moved_count FROM inserted + `, + compiledQuery.bindings + ) + + return Number(result.rows[0]?.moved_count ?? 0) +} + +export async function backupQueueOverflow( + options: MoveQueueOverflowOptions, + db: KnexLike = multitenantKnex +) { + return db.transaction(async (tnx) => { + const { created } = await ensureQueueOverflowBackupTable(tnx) + await tnx.raw(`LOCK TABLE ${JOB_TABLE} IN SHARE ROW EXCLUSIVE MODE`) + + const movedCount = await moveQueueOverflowJobs( + tnx, + options, + JOB_TABLE, + JOB_OVERFLOW_BACKUP_TABLE + ) + + return { + backupTableCreated: created, + filters: normalizeQueueOverflowFilters(options), + limit: options.limit ?? null, + movedCount, + } + }) +} + +export async function restoreQueueOverflow( + options: MoveQueueOverflowOptions, + db: KnexLike = multitenantKnex +) { + const backupTableExists = await queueOverflowBackupTableExists(db) + + if (!backupTableExists) { + return { + backupTableExists, + filters: normalizeQueueOverflowFilters(options), + limit: options.limit ?? JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT, + movedCount: 0, + } + } + + return db.transaction(async (tnx) => { + const movedCount = await moveQueueOverflowJobs( + tnx, + { ...options, limit: options.limit ?? JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT }, + JOB_OVERFLOW_BACKUP_TABLE, + JOB_TABLE + ) + + return { + backupTableExists: true, + filters: normalizeQueueOverflowFilters(options), + limit: options.limit ?? JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT, + movedCount, + } + }) +} diff --git a/src/scripts/jobs-client.test.ts b/src/scripts/jobs-client.test.ts new file mode 100644 index 000000000..609c1fbd8 --- /dev/null +++ b/src/scripts/jobs-client.test.ts @@ -0,0 +1,150 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' +import { + buildJobsRequest, + main, + parseJobsConfig, + parseJobsCsv, + resolveJobsAdminUrl, +} from './jobs-client' + +describe('parseJobsCsv', () => { + it('returns undefined for empty input', () => { + expect(parseJobsCsv(undefined)).toBeUndefined() + expect(parseJobsCsv(' , , ')).toBeUndefined() + }) + + it('trims and de-duplicates values', () => { + expect(parseJobsCsv('tenant-a, tenant-b,tenant-a')).toEqual(['tenant-a', 'tenant-b']) + }) +}) + +describe('resolveJobsAdminUrl', () => { + it('preserves base URL path prefixes and query params', () => { + const url = resolveJobsAdminUrl('http://localhost:54321/admin/', '/queue/overflow', { + source: 'backup', + groupBy: 'tenant', + }) + + expect(url.toString()).toBe( + 'http://localhost:54321/admin/queue/overflow?source=backup&groupBy=tenant' + ) + }) +}) + +describe('parseJobsConfig', () => { + it('uses defaults when optional env is omitted', () => { + expect( + parseJobsConfig({ + ADMIN_URL: 'http://localhost:54321/admin', + ADMIN_API_KEY: 'test-key', + }) + ).toEqual({ + adminApiKey: 'test-key', + adminUrl: 'http://localhost:54321/admin', + queueName: undefined, + eventTypes: undefined, + tenantRefs: undefined, + source: 'job', + groupBy: 'summary', + limit: undefined, + }) + }) + + it('rejects invalid JOBS_SOURCE and JOBS_LIMIT values', () => { + expect( + parseJobsConfig({ + ADMIN_URL: 'http://localhost:54321/admin', + ADMIN_API_KEY: 'test-key', + JOBS_SOURCE: 'archive', + }) + ).toBe('JOBS_SOURCE must be either job or backup') + + expect( + parseJobsConfig({ + ADMIN_URL: 'http://localhost:54321/admin', + ADMIN_API_KEY: 'test-key', + JOBS_LIMIT: '0', + }) + ).toBe('JOBS_LIMIT must be a positive integer') + }) +}) + +describe('buildJobsRequest', () => { + const baseConfig = { + adminApiKey: 'test-key', + adminUrl: 'http://localhost:54321/admin', + queueName: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + tenantRefs: ['tenant-a'], + source: 'backup' as const, + groupBy: 'tenant' as const, + limit: 123, + } + + it('builds list requests with query params', () => { + const request = buildJobsRequest('list', baseConfig) + + expect(request.method).toBe('GET') + expect(request.body).toBeUndefined() + expect(request.url.toString()).toBe( + 'http://localhost:54321/admin/queue/overflow?source=backup&groupBy=tenant&name=webhooks&eventTypes=ObjectRemoved%3ADelete&tenantRefs=tenant-a&limit=123' + ) + expect((request.headers as Headers).get('ApiKey')).toBe('test-key') + }) + + it('builds backup requests with a JSON body', () => { + const request = buildJobsRequest('backup', baseConfig) + + expect(request.method).toBe('POST') + expect(request.url.toString()).toBe('http://localhost:54321/admin/queue/overflow/backup') + expect(JSON.parse(request.body as string)).toEqual({ + name: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + tenantRefs: ['tenant-a'], + limit: 123, + }) + expect((request.headers as Headers).get('ApiKey')).toBe('test-key') + expect((request.headers as Headers).get('Content-Type')).toBe('application/json') + }) +}) + +describe('main', () => { + afterEach(() => { + vi.restoreAllMocks() + process.exitCode = undefined + }) + + it('sets a non-zero exit code for invalid actions', async () => { + vi.spyOn(console, 'error').mockImplementation(() => {}) + + await expect(main({}, ['node', 'jobs-client.ts', 'invalid'])).resolves.toBe(false) + + expect(process.exitCode).toBe(1) + expect(console.error).toHaveBeenCalledWith('Please provide an action: list, backup, or restore') + }) + + it('prints the JSON response for successful requests', async () => { + vi.spyOn(globalThis, 'fetch').mockResolvedValue( + new Response(JSON.stringify({ movedCount: 3 }), { + status: 200, + headers: { + 'Content-Type': 'application/json', + }, + }) + ) + vi.spyOn(console, 'log').mockImplementation(() => {}) + + await expect( + main( + { + ADMIN_URL: 'http://localhost:54321/admin', + ADMIN_API_KEY: 'test-key', + JOBS_QUEUE_NAME: 'webhooks', + }, + ['node', 'jobs-client.ts', 'restore'] + ) + ).resolves.toBe(true) + + expect(console.log).toHaveBeenCalledWith('{\n "movedCount": 3\n}') + }) +}) diff --git a/src/scripts/jobs-client.ts b/src/scripts/jobs-client.ts new file mode 100644 index 000000000..84a137ba8 --- /dev/null +++ b/src/scripts/jobs-client.ts @@ -0,0 +1,212 @@ +type JobsAction = 'backup' | 'list' | 'restore' +type JobsGroupBy = 'summary' | 'tenant' +type JobsSource = 'backup' | 'job' + +interface JobsClientConfig { + adminApiKey: string + adminUrl: string + eventTypes?: string[] + groupBy: JobsGroupBy + limit?: number + queueName?: string + source: JobsSource + tenantRefs?: string[] +} + +const ADMIN_URL = process.env.ADMIN_URL +const ADMIN_API_KEY = process.env.ADMIN_API_KEY +const JOBS_SOURCE = process.env.JOBS_SOURCE +const JOBS_GROUP_BY = process.env.JOBS_GROUP_BY +const JOBS_QUEUE_NAME = process.env.JOBS_QUEUE_NAME +const JOBS_EVENT_TYPES = process.env.JOBS_EVENT_TYPES +const JOBS_TENANT_REFS = process.env.JOBS_TENANT_REFS +const JOBS_LIMIT = process.env.JOBS_LIMIT + +export function parseJobsCsv(value: string | undefined) { + if (!value) { + return undefined + } + + const normalized = Array.from( + new Set( + value + .split(',') + .map((item) => item.trim()) + .filter((item) => item.length > 0) + ) + ) + + return normalized.length > 0 ? normalized : undefined +} + +function parseUnsignedInteger(value: string, errorMessage: string) { + const normalized = value.trim() + + if (!/^\d+$/.test(normalized)) { + throw new Error(errorMessage) + } + + const parsed = Number.parseInt(normalized, 10) + if (!Number.isSafeInteger(parsed) || parsed <= 0) { + throw new Error(errorMessage) + } + + return parsed +} + +export function resolveJobsAdminUrl( + baseUrl: string, + requestPath: string, + query?: Record +) { + const url = new URL(`${baseUrl.replace(/\/+$/, '')}/${requestPath.replace(/^\/+/, '')}`) + + for (const [key, value] of Object.entries(query ?? {})) { + if (value !== undefined) { + url.searchParams.set(key, value) + } + } + + return url +} + +export function parseJobsConfig(env: NodeJS.ProcessEnv): JobsClientConfig | string { + const source = (env.JOBS_SOURCE?.trim() || 'job') as JobsSource + const groupBy = (env.JOBS_GROUP_BY?.trim() || 'summary') as JobsGroupBy + + if (!env.ADMIN_URL) { + return 'Please provide ADMIN_URL' + } + + if (!env.ADMIN_API_KEY) { + return 'Please provide ADMIN_API_KEY' + } + + if (source !== 'job' && source !== 'backup') { + return 'JOBS_SOURCE must be either job or backup' + } + + if (groupBy !== 'summary' && groupBy !== 'tenant') { + return 'JOBS_GROUP_BY must be either summary or tenant' + } + + try { + return { + adminApiKey: env.ADMIN_API_KEY, + adminUrl: env.ADMIN_URL, + queueName: env.JOBS_QUEUE_NAME?.trim() || undefined, + eventTypes: parseJobsCsv(env.JOBS_EVENT_TYPES), + tenantRefs: parseJobsCsv(env.JOBS_TENANT_REFS), + source, + groupBy, + limit: env.JOBS_LIMIT + ? parseUnsignedInteger(env.JOBS_LIMIT, 'JOBS_LIMIT must be a positive integer') + : undefined, + } + } catch (error) { + return error instanceof Error ? error.message : String(error) + } +} + +export function buildJobsRequest(action: JobsAction, config: JobsClientConfig) { + const headers = new Headers({ + ApiKey: config.adminApiKey, + }) + + if (action === 'list') { + return { + method: 'GET', + url: resolveJobsAdminUrl(config.adminUrl, '/queue/overflow', { + source: config.source, + groupBy: config.groupBy, + name: config.queueName, + eventTypes: config.eventTypes?.join(','), + tenantRefs: config.tenantRefs?.join(','), + limit: config.limit?.toString(), + }), + headers, + body: undefined, + } + } + + headers.set('Content-Type', 'application/json') + + return { + method: 'POST', + url: resolveJobsAdminUrl( + config.adminUrl, + action === 'backup' ? '/queue/overflow/backup' : '/queue/overflow/restore' + ), + headers, + body: JSON.stringify({ + name: config.queueName, + eventTypes: config.eventTypes, + tenantRefs: config.tenantRefs, + limit: config.limit, + }), + } +} + +async function assertJsonResponse(response: Response, context: string) { + if (response.ok) { + return response.json() + } + + const body = await response.text() + const details = body ? `: ${body}` : '' + + throw new Error(`${context} failed with ${response.status} ${response.statusText}${details}`) +} + +function fail(message: string) { + process.exitCode = 1 + console.error(message) + return false +} + +export async function main( + env: NodeJS.ProcessEnv = process.env, + argv: string[] = process.argv +): Promise { + const action = argv[2] + + if (action !== 'list' && action !== 'backup' && action !== 'restore') { + return fail('Please provide an action: list, backup, or restore') + } + + const config = parseJobsConfig(env) + if (typeof config === 'string') { + return fail(config) + } + + try { + const request = buildJobsRequest(action, config) + const response = await fetch(request.url, { + method: request.method, + headers: request.headers, + body: request.body, + }) + + const data = await assertJsonResponse(response, `${action.toUpperCase()} ${request.url}`) + console.log(JSON.stringify(data, null, 2)) + return true + } catch (error) { + return fail(error instanceof Error ? error.message : String(error)) + } +} + +if (require.main === module) { + main({ + ADMIN_URL, + ADMIN_API_KEY, + JOBS_SOURCE, + JOBS_GROUP_BY, + JOBS_QUEUE_NAME, + JOBS_EVENT_TYPES, + JOBS_TENANT_REFS, + JOBS_LIMIT, + }).catch((error) => { + process.exitCode = 1 + console.error(error) + }) +} diff --git a/src/test/admin-queue-overflow.test.ts b/src/test/admin-queue-overflow.test.ts new file mode 100644 index 000000000..1a454878f --- /dev/null +++ b/src/test/admin-queue-overflow.test.ts @@ -0,0 +1,346 @@ +import * as migrations from '@internal/database/migrations' +import { randomUUID } from 'crypto' +import type { FastifyInstance } from 'fastify' +import { multitenantKnex } from '../internal/database/multitenant-db' +import { PG_BOSS_SCHEMA } from '../internal/queue/queue' +import { createAdminApp } from './common' + +const pgBossJobTable = `${PG_BOSS_SCHEMA}.job` +const pgBossBackupTable = `${PG_BOSS_SCHEMA}.job_overflow_backup` +const headers = { + apikey: process.env.ADMIN_API_KEYS, +} + +type SeedJobOptions = { + eventType?: string + id?: string + name: string + state?: string + tenantRef?: string +} + +let adminApp: FastifyInstance + +async function seedJob(options: SeedJobOptions) { + const id = options.id ?? randomUUID() + const data: Record = {} + + if (options.eventType) { + data.event = { + type: options.eventType, + } + } + + if (options.tenantRef) { + data.tenant = { + ref: options.tenantRef, + } + } + + await multitenantKnex(pgBossJobTable).insert({ + id, + name: options.name, + state: options.state ?? 'created', + data, + }) + + return id +} + +describe('Admin queue overflow routes', () => { + beforeAll(async () => { + await migrations.runMultitenantMigrations() + await multitenantKnex.raw(`CREATE SCHEMA IF NOT EXISTS ${PG_BOSS_SCHEMA}`) + await multitenantKnex.raw(` + CREATE TABLE IF NOT EXISTS ${pgBossJobTable} ( + id uuid PRIMARY KEY, + name text NOT NULL, + state text NOT NULL, + created_on timestamptz NOT NULL DEFAULT now(), + data jsonb NOT NULL DEFAULT '{}'::jsonb + ) + `) + adminApp = await createAdminApp() + }) + + afterEach(async () => { + await multitenantKnex.raw(`DROP TABLE IF EXISTS ${pgBossBackupTable}`) + await multitenantKnex(pgBossJobTable).delete() + }) + + afterAll(async () => { + await adminApp.close() + await multitenantKnex.destroy() + }) + + it('returns summary counts for created jobs in the live queue table', async () => { + await seedJob({ + id: '00000000-0000-0000-0000-000000000011', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000012', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-b', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000013', + name: 'backup-object', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000014', + name: 'webhooks', + state: 'active', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-c', + }) + + const response = await adminApp.inject({ + method: 'GET', + url: '/queue/overflow?limit=10', + headers, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + backupTableExists: true, + source: 'job', + groupBy: 'summary', + filters: {}, + data: [ + { + count: 2, + eventType: 'ObjectRemoved:Delete', + name: 'webhooks', + }, + { + count: 1, + eventType: null, + name: 'backup-object', + }, + ], + }) + }) + + it('returns tenant counts for a filtered queue and event type', async () => { + await seedJob({ + id: '00000000-0000-0000-0000-000000000021', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-b', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000022', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-b', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000023', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000024', + name: 'backup-object', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-z', + }) + + const response = await adminApp.inject({ + method: 'GET', + url: '/queue/overflow?groupBy=tenant&name=webhooks&eventTypes=ObjectRemoved:Delete', + headers, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + backupTableExists: true, + source: 'job', + groupBy: 'tenant', + filters: { + name: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + }, + data: [ + { + count: 2, + tenantRef: 'tenant-b', + }, + { + count: 1, + tenantRef: 'tenant-a', + }, + ], + }) + }) + + it('returns an empty result when the overflow backup table does not exist', async () => { + const response = await adminApp.inject({ + method: 'GET', + url: '/queue/overflow?source=backup', + headers, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + backupTableExists: false, + source: 'backup', + groupBy: 'summary', + filters: {}, + data: [], + }) + }) + + it('backs up matching jobs into the overflow table', async () => { + const movedJobId = await seedJob({ + id: '00000000-0000-0000-0000-000000000031', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000032', + name: 'webhooks', + eventType: 'ObjectCreated:Put', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000033', + name: 'backup-object', + tenantRef: 'tenant-a', + }) + + const response = await adminApp.inject({ + method: 'POST', + url: '/queue/overflow/backup', + headers, + payload: { + name: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + }, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + backupTableCreated: true, + filters: { + name: 'webhooks', + eventTypes: ['ObjectRemoved:Delete'], + }, + limit: null, + movedCount: 1, + }) + + await expect( + multitenantKnex(pgBossJobTable).where({ id: movedJobId }).first() + ).resolves.toBeFalsy() + await expect( + multitenantKnex(pgBossBackupTable) + .select('id', 'name', 'state') + .where({ id: movedJobId }) + .first() + ).resolves.toEqual({ + id: movedJobId, + name: 'webhooks', + state: 'created', + }) + }) + + it('restores backed up jobs in batches', async () => { + await seedJob({ + id: '00000000-0000-0000-0000-000000000041', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-a', + }) + await seedJob({ + id: '00000000-0000-0000-0000-000000000042', + name: 'webhooks', + eventType: 'ObjectRemoved:Delete', + tenantRef: 'tenant-b', + }) + + const backupResponse = await adminApp.inject({ + method: 'POST', + url: '/queue/overflow/backup', + headers, + payload: { + name: 'webhooks', + }, + }) + + expect(backupResponse.statusCode).toBe(200) + + const listResponse = await adminApp.inject({ + method: 'GET', + url: '/queue/overflow?source=backup&name=webhooks', + headers, + }) + + expect(listResponse.statusCode).toBe(200) + expect(listResponse.json()).toEqual({ + backupTableExists: true, + source: 'backup', + groupBy: 'summary', + filters: { + name: 'webhooks', + }, + data: [ + { + count: 2, + eventType: 'ObjectRemoved:Delete', + name: 'webhooks', + }, + ], + }) + + const restoreResponse = await adminApp.inject({ + method: 'POST', + url: '/queue/overflow/restore', + headers, + payload: { + name: 'webhooks', + limit: 1, + }, + }) + + expect(restoreResponse.statusCode).toBe(200) + expect(restoreResponse.json()).toEqual({ + backupTableExists: true, + filters: { + name: 'webhooks', + }, + limit: 1, + movedCount: 1, + }) + + await expect( + multitenantKnex(pgBossJobTable).count<{ count: number }[]>('* AS count') + ).resolves.toEqual([ + { + count: 1, + }, + ]) + await expect( + multitenantKnex(pgBossBackupTable).count<{ count: number }[]>('* AS count') + ).resolves.toEqual([ + { + count: 1, + }, + ]) + await expect( + multitenantKnex(pgBossJobTable) + .select('id') + .where({ id: '00000000-0000-0000-0000-000000000041' }) + .first() + ).resolves.toEqual({ + id: '00000000-0000-0000-0000-000000000041', + }) + }) +})