diff --git a/BackEnd/src/events/events.module.ts b/BackEnd/src/events/events.module.ts index 3689769da..9a92e922f 100644 --- a/BackEnd/src/events/events.module.ts +++ b/BackEnd/src/events/events.module.ts @@ -6,6 +6,7 @@ import { EventsService } from './events.service'; import { AuditLogService } from './services/audit-log.service'; import { RetryService } from './services/retry.service'; import { PoisonMessageService } from './services/poison-message.service'; +import { DlqAlertService } from './services/dlq-alert.service'; import { EventStoreService } from './event-store/event-store.service'; import { EventStore } from './entities/event-store.entity'; import { PoisonMessage } from './entities/poison-message.entity'; @@ -43,6 +44,7 @@ import { PayoutListener } from './listeners/payout.listener'; AuditLogService, RetryService, PoisonMessageService, + DlqAlertService, // Event Handlers QuestEventsHandler, SubmissionEventsHandler, @@ -58,6 +60,6 @@ import { PayoutListener } from './listeners/payout.listener'; SubmissionListener, PayoutListener, ], - exports: [EventsService, EventStoreService, PoisonMessageService], + exports: [EventsService, EventStoreService, PoisonMessageService, DlqAlertService], }) export class EventsModule {} diff --git a/BackEnd/src/events/services/dlq-alert.service.ts b/BackEnd/src/events/services/dlq-alert.service.ts new file mode 100644 index 000000000..424676812 --- /dev/null +++ b/BackEnd/src/events/services/dlq-alert.service.ts @@ -0,0 +1,181 @@ +import { Injectable, Logger } from '@nestjs/common'; + +export interface DlqAlertPayload { + /** Original event / job name that failed */ + eventName: string; + /** Last error message */ + error: string; + /** Current retry attempt count */ + retryCount: number; + /** Maximum allowed retries */ + maxRetries: number; + /** PoisonMessage DB record id */ + poisonMessageId: string; + /** Original event payload (may be redacted) */ + payload?: unknown; + /** True when this is the last retry (discard) */ + isFinal: boolean; +} + +/** + * DlqAlertService + * + * Centralises alert dispatch for dead-letter queue failures. + * + * Currently writes structured log entries that can be scraped by Grafana / + * CloudWatch / Datadog. Wire up the optional `sendSlack`, `sendPagerDuty`, + * or `sendEmail` methods (currently no-ops) once the relevant credentials + * are available in your environment. + * + * All channels can be enabled/disabled via environment variables: + * DLQ_ALERT_LOG=true (default: true) + * DLQ_ALERT_SLACK=true (default: false — set SLACK_WEBHOOK_URL too) + * DLQ_ALERT_PAGERDUTY=true (default: false — set PAGERDUTY_ROUTING_KEY) + */ +@Injectable() +export class DlqAlertService { + private readonly logger = new Logger(DlqAlertService.name); + + /** Dispatch an alert through every enabled channel. */ + async sendAlert(alert: DlqAlertPayload): Promise { + const severity = alert.isFinal ? 'CRITICAL' : 'WARNING'; + + // Always emit a structured log (parseable by log aggregators) + this.logAlert(alert, severity); + + // Conditional channels — expand as needed + if (process.env.DLQ_ALERT_SLACK === 'true') { + await this.sendSlack(alert, severity).catch((err) => + this.logger.error(`Slack alert failed: ${err.message}`), + ); + } + + if (process.env.DLQ_ALERT_PAGERDUTY === 'true') { + await this.sendPagerDuty(alert, severity).catch((err) => + this.logger.error(`PagerDuty alert failed: ${err.message}`), + ); + } + } + + // ─── Private helpers ──────────────────────────────────────────────────────── + + private logAlert(alert: DlqAlertPayload, severity: 'WARNING' | 'CRITICAL'): void { + const message = + `[DLQ ${severity}] event="${alert.eventName}" ` + + `retry=${alert.retryCount}/${alert.maxRetries} ` + + `poisonMessageId=${alert.poisonMessageId} ` + + `isFinal=${alert.isFinal} ` + + `error="${alert.error}"`; + + const structuredContext = { + dlqAlert: true, + severity, + eventName: alert.eventName, + retryCount: alert.retryCount, + maxRetries: alert.maxRetries, + poisonMessageId: alert.poisonMessageId, + isFinal: alert.isFinal, + // Redact full payload in logs to avoid PII leakage + payloadKeys: alert.payload + ? Object.keys(alert.payload as object) + : [], + error: alert.error, + timestamp: new Date().toISOString(), + }; + + if (alert.isFinal) { + this.logger.error(message, structuredContext); + } else { + this.logger.warn(message, structuredContext); + } + } + + /** + * Send a Slack notification via incoming webhook. + * Requires env var: SLACK_WEBHOOK_URL + */ + private async sendSlack( + alert: DlqAlertPayload, + severity: 'WARNING' | 'CRITICAL', + ): Promise { + const webhookUrl = process.env.SLACK_WEBHOOK_URL; + if (!webhookUrl) { + this.logger.warn('SLACK_WEBHOOK_URL not set; skipping Slack alert'); + return; + } + + const emoji = alert.isFinal ? '🔴' : '🟡'; + const title = `${emoji} [DLQ ${severity}] ${alert.eventName}`; + const body = { + text: title, + attachments: [ + { + color: alert.isFinal ? 'danger' : 'warning', + fields: [ + { title: 'Event', value: alert.eventName, short: true }, + { + title: 'Retries', + value: `${alert.retryCount}/${alert.maxRetries}`, + short: true, + }, + { title: 'Is Final', value: String(alert.isFinal), short: true }, + { + title: 'Poison Message ID', + value: alert.poisonMessageId, + short: true, + }, + { title: 'Error', value: alert.error, short: false }, + ], + footer: `StellarEarn DLQ · ${new Date().toISOString()}`, + }, + ], + }; + + // Dynamic import to avoid a hard dependency on `axios` in tests + const { default: axios } = await import('axios'); + await axios.post(webhookUrl, body, { timeout: 5000 }); + this.logger.debug(`Slack alert sent for event "${alert.eventName}"`); + } + + /** + * Trigger a PagerDuty incident via the Events API v2. + * Requires env var: PAGERDUTY_ROUTING_KEY + */ + private async sendPagerDuty( + alert: DlqAlertPayload, + severity: 'WARNING' | 'CRITICAL', + ): Promise { + const routingKey = process.env.PAGERDUTY_ROUTING_KEY; + if (!routingKey) { + this.logger.warn( + 'PAGERDUTY_ROUTING_KEY not set; skipping PagerDuty alert', + ); + return; + } + + const pdSeverity = alert.isFinal ? 'critical' : 'warning'; + const body = { + routing_key: routingKey, + event_action: 'trigger', + payload: { + summary: `[DLQ ${severity}] ${alert.eventName} — ${alert.error}`, + severity: pdSeverity, + source: 'StellarEarn-DLQ', + custom_details: { + eventName: alert.eventName, + retryCount: alert.retryCount, + maxRetries: alert.maxRetries, + poisonMessageId: alert.poisonMessageId, + isFinal: alert.isFinal, + error: alert.error, + }, + }, + }; + + const { default: axios } = await import('axios'); + await axios.post('https://events.pagerduty.com/v2/enqueue', body, { + timeout: 5000, + }); + this.logger.debug(`PagerDuty alert sent for event "${alert.eventName}"`); + } +} diff --git a/BackEnd/src/modules/jobs/jobs.module.ts b/BackEnd/src/modules/jobs/jobs.module.ts index 729094630..8845187de 100644 --- a/BackEnd/src/modules/jobs/jobs.module.ts +++ b/BackEnd/src/modules/jobs/jobs.module.ts @@ -15,6 +15,7 @@ import { AnalyticsProcessor } from './processors/analytics.processor'; import { QuestProcessor } from './processors/quest.processor'; import { QuestStateReconciliationProcessor } from './processors/quest-state-reconciliation.processor'; import { DependencyProcessor } from './processors/dependency.processor'; +import { DeadLetterProcessor } from './processors/dead-letter.processor'; import { JobLog, JobLogRetry, @@ -30,8 +31,11 @@ import { StellarModule } from '../stellar/stellar.module'; import { AnalyticsModule } from '../analytics/analytics.module'; import { DependencyFreshnessService } from '../../common/services/dependency-freshness.service'; import { EventStore } from '../../events/entities/event-store.entity'; +import { PoisonMessage } from '../../events/entities/poison-message.entity'; import { User } from '../users/entities/user.entity'; import { EmailModule } from '../email/email.module'; +import { PoisonMessageService } from '../../events/services/poison-message.service'; +import { DlqAlertService } from '../../events/services/dlq-alert.service'; @Module({ imports: [ @@ -45,6 +49,7 @@ import { EmailModule } from '../email/email.module'; Quest, Submission, EventStore, + PoisonMessage, User, ]), EventEmitterModule, @@ -66,6 +71,9 @@ import { EmailModule } from '../email/email.module'; QuestProcessor, QuestStateReconciliationProcessor, DependencyProcessor, + DeadLetterProcessor, + PoisonMessageService, + DlqAlertService, DataExportListener, DependencyFreshnessService, ], @@ -84,6 +92,9 @@ import { EmailModule } from '../email/email.module'; QuestProcessor, QuestStateReconciliationProcessor, DependencyProcessor, + DeadLetterProcessor, + PoisonMessageService, + DlqAlertService, DependencyFreshnessService, ], }) diff --git a/BackEnd/src/modules/jobs/jobs.service.ts b/BackEnd/src/modules/jobs/jobs.service.ts index 8482b61e0..67ffeb57e 100644 --- a/BackEnd/src/modules/jobs/jobs.service.ts +++ b/BackEnd/src/modules/jobs/jobs.service.ts @@ -84,6 +84,22 @@ export class JobsService implements OnModuleInit, OnModuleDestroy { this.dataExportProcessor.processExport.bind(this.dataExportProcessor), ); } + + // Wire the dead-letter processor as the exclusive consumer of the DLQ + if ( + this.deadLetterProcessor && + typeof this.deadLetterProcessor.process === 'function' + ) { + this.createWorker( + QUEUES.DEAD_LETTER, + this.deadLetterProcessor.process.bind(this.deadLetterProcessor), + ); + this.logger.log('Dead-letter queue processor registered'); + } else { + this.logger.warn( + 'DeadLetterProcessor not provided — DLQ jobs will not be consumed', + ); + } } async onModuleDestroy() { diff --git a/BackEnd/src/modules/jobs/processors/dead-letter.processor.ts b/BackEnd/src/modules/jobs/processors/dead-letter.processor.ts new file mode 100644 index 000000000..6c5c19da3 --- /dev/null +++ b/BackEnd/src/modules/jobs/processors/dead-letter.processor.ts @@ -0,0 +1,174 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { JobResult } from '../job.types'; +import { PoisonMessageService } from '../../../events/services/poison-message.service'; +import { DlqAlertService } from '../../../events/services/dlq-alert.service'; + +export interface DeadLetterJobData { + /** Reason the job ended up in the DLQ */ + type: 'FAILED_EVENT' | 'FAILED_JOB'; + /** Original event/job name */ + eventName?: string; + /** Original job name when type === 'FAILED_JOB' */ + jobName?: string; + /** Original payload / data */ + eventPayload?: any; + failedJob?: { + id: string | undefined; + name: string; + data: any; + failedReason: string; + }; + /** Human-readable error string */ + error: string; + /** ISO timestamp when the failure occurred */ + failedAt: Date | string; + /** How many times this has been retried already */ + retryCount?: number; + /** Arbitrary extra metadata */ + metadata?: Record; +} + +/** + * DeadLetterProcessor + * + * Consumes jobs from the `dead_letter` BullMQ queue. + * For each job it will: + * 1. Quarantine the message via PoisonMessageService. + * 2. Decide whether to re-emit the original event (if retries remain). + * 3. Fire a DLQ alert via DlqAlertService so operators are notified. + * 4. Emit an internal `dlq.processed` event for downstream telemetry. + */ +@Injectable() +export class DeadLetterProcessor { + private readonly logger = new Logger(DeadLetterProcessor.name); + /** Maximum number of automatic retries before the message is discarded. */ + static readonly MAX_AUTO_RETRIES = 3; + + constructor( + private readonly poisonMessageService: PoisonMessageService, + private readonly dlqAlertService: DlqAlertService, + private readonly eventEmitter: EventEmitter2, + ) {} + + /** + * Main entry-point called by JobsService worker for the dead_letter queue. + */ + async process(job: Job): Promise { + const data = job.data; + const eventName = + data.eventName ?? data.failedJob?.name ?? data.jobName ?? 'unknown'; + const payload = data.eventPayload ?? data.failedJob?.data ?? {}; + const error = + data.error ?? data.failedJob?.failedReason ?? 'No error message'; + const retryCount = data.retryCount ?? 0; + + this.logger.warn( + `Processing DLQ job ${job.id} — event: "${eventName}", ` + + `retry: ${retryCount}/${DeadLetterProcessor.MAX_AUTO_RETRIES}, error: ${error}`, + ); + + await job.updateProgress(10); + + // 1. Quarantine / update poison-message record + const poisonMessage = await this.poisonMessageService.quarantine( + eventName, + payload, + error, + { ...data.metadata, dlqJobId: job.id, failedAt: data.failedAt }, + DeadLetterProcessor.MAX_AUTO_RETRIES, + ); + + await job.updateProgress(40); + + // 2. Attempt automatic retry if under the limit + const shouldRetry = retryCount < DeadLetterProcessor.MAX_AUTO_RETRIES; + + if (shouldRetry) { + try { + await this.poisonMessageService.markRetrying(poisonMessage.id); + + this.logger.log( + `Re-emitting event "${eventName}" (attempt ${retryCount + 1})`, + ); + + await this.eventEmitter.emitAsync(eventName, { + ...payload, + _dlq: { + retryCount: retryCount + 1, + originalJobId: job.id, + poisonMessageId: poisonMessage.id, + }, + }); + + await this.poisonMessageService.markResolved(poisonMessage.id); + + this.logger.log( + `Event "${eventName}" successfully re-processed on retry ${retryCount + 1}`, + ); + } catch (retryError) { + const retryErrorMsg = + retryError instanceof Error ? retryError.message : String(retryError); + this.logger.error( + `Retry ${retryCount + 1} for event "${eventName}" failed: ${retryErrorMsg}`, + ); + + // Reset back to quarantined so next DLQ job can try again + await this.poisonMessageService.resetToQuarantined(poisonMessage.id); + + // Alert on every retry failure; escalate if this is the final attempt + await this.dlqAlertService.sendAlert({ + eventName, + error: retryErrorMsg, + retryCount: retryCount + 1, + maxRetries: DeadLetterProcessor.MAX_AUTO_RETRIES, + poisonMessageId: poisonMessage.id, + payload, + isFinal: retryCount + 1 >= DeadLetterProcessor.MAX_AUTO_RETRIES, + }); + } + } else { + // Final failure — alert and discard + this.logger.error( + `Event "${eventName}" exhausted all ${DeadLetterProcessor.MAX_AUTO_RETRIES} retries. ` + + `Poison message ${poisonMessage.id} will be discarded.`, + ); + + await this.dlqAlertService.sendAlert({ + eventName, + error, + retryCount, + maxRetries: DeadLetterProcessor.MAX_AUTO_RETRIES, + poisonMessageId: poisonMessage.id, + payload, + isFinal: true, + }); + } + + await job.updateProgress(80); + + // 3. Emit internal telemetry event + this.eventEmitter.emit('dlq.processed', { + jobId: job.id, + eventName, + retryCount, + poisonMessageId: poisonMessage.id, + status: shouldRetry ? 'retried' : 'discarded', + processedAt: new Date(), + }); + + await job.updateProgress(100); + + return { + success: true, + data: { + eventName, + retryCount, + poisonMessageId: poisonMessage.id, + status: shouldRetry ? 'retried' : 'discarded', + }, + duration: Date.now() - job.timestamp, + }; + } +} diff --git a/BackEnd/test/integration/dlq-processor.spec.ts b/BackEnd/test/integration/dlq-processor.spec.ts new file mode 100644 index 000000000..0f49cf525 --- /dev/null +++ b/BackEnd/test/integration/dlq-processor.spec.ts @@ -0,0 +1,203 @@ +/** + * Integration test: Dead-Letter Queue processor + alert service + * + * Tests the full flow from a job landing in the dead-letter queue, through + * the processor (quarantine → retry → alert), using in-memory fakes so no + * real Redis/Postgres connection is required. + */ +import { Test, TestingModule } from '@nestjs/testing'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { getRepositoryToken } from '@nestjs/typeorm'; + +import { DeadLetterProcessor } from 'src/modules/jobs/processors/dead-letter.processor'; +import { DlqAlertService } from 'src/events/services/dlq-alert.service'; +import { PoisonMessageService } from 'src/events/services/poison-message.service'; +import { + PoisonMessage, + PoisonMessageStatus, +} from 'src/events/entities/poison-message.entity'; + +// ─── In-memory PoisonMessage repository ───────────────────────────────────── + +const store = new Map(); +let idCounter = 0; + +const inMemoryPoisonRepo = { + create: (data: Partial): PoisonMessage => + ({ id: `pm-${++idCounter}`, ...data } as PoisonMessage), + save: jest.fn(async (entity: PoisonMessage) => { + store.set(entity.id, entity); + return entity; + }), + findOne: jest.fn(async ({ where }: { where: Partial }) => { + for (const msg of store.values()) { + if ( + msg.eventName === where.eventName && + msg.status === where.status + ) { + return msg; + } + } + return null; + }), + update: jest.fn(async (id: string, partial: Partial) => { + const existing = store.get(id); + if (existing) store.set(id, { ...existing, ...partial }); + return { affected: 1 }; + }), +}; + +const makeJob = ( + eventName: string, + retryCount = 0, + failOnEmit = false, +) => ({ + id: `job-${Date.now()}`, + timestamp: Date.now(), + data: { + type: 'FAILED_EVENT' as const, + eventName, + eventPayload: { key: 'value' }, + error: 'upstream failure', + failedAt: new Date(), + retryCount, + }, + updateProgress: jest.fn().mockResolvedValue(undefined), + _failOnEmit: failOnEmit, +}); + +// ─── Test suite ────────────────────────────────────────────────────────────── + +describe('DLQ Integration: DeadLetterProcessor + DlqAlertService', () => { + let processor: DeadLetterProcessor; + let alertService: DlqAlertService; + let eventEmitter: EventEmitter2; + let module: TestingModule; + let sendAlertSpy: jest.SpyInstance; + + beforeEach(async () => { + store.clear(); + idCounter = 0; + jest.clearAllMocks(); + + module = await Test.createTestingModule({ + providers: [ + DeadLetterProcessor, + PoisonMessageService, + DlqAlertService, + { + provide: getRepositoryToken(PoisonMessage), + useValue: inMemoryPoisonRepo, + }, + { + provide: EventEmitter2, + useValue: new EventEmitter2({ wildcard: true, delimiter: '.' }), + }, + ], + }).compile(); + + processor = module.get(DeadLetterProcessor); + alertService = module.get(DlqAlertService); + eventEmitter = module.get(EventEmitter2); + + sendAlertSpy = jest.spyOn(alertService, 'sendAlert').mockResolvedValue(undefined); + }); + + afterEach(async () => { + await module.close(); + }); + + // ─── Scenario 1: successful first retry ───────────────────────────────── + + it('quarantines the message and marks it resolved on a successful retry', async () => { + const job = makeJob('quest.expired', 0); + + // Register a no-op listener so emitAsync resolves cleanly + eventEmitter.on('quest.expired', () => undefined); + + const result = await processor.process(job as any); + + expect(result.success).toBe(true); + expect(result.data.status).toBe('retried'); + + // The poison message should be in RESOLVED state + const savedMessages = Array.from(store.values()); + expect(savedMessages.length).toBe(1); + expect(savedMessages[0].status).toBe(PoisonMessageStatus.RESOLVED); + expect(sendAlertSpy).not.toHaveBeenCalled(); + }); + + // ─── Scenario 2: retry fails → reset to quarantined + alert ───────────── + + it('resets to quarantined and fires an alert when the retry throws', async () => { + const job = makeJob('quest.expired', 0); + + // Make emitAsync reject to simulate downstream failure + jest.spyOn(eventEmitter, 'emitAsync').mockRejectedValueOnce(new Error('listener error')); + + const result = await processor.process(job as any); + + expect(result.success).toBe(true); + // Status stays retried from processor perspective (the job itself succeeded) + expect(sendAlertSpy).toHaveBeenCalledWith( + expect.objectContaining({ isFinal: false, eventName: 'quest.expired' }), + ); + + // Poison message should have been reset to QUARANTINED + const savedMessages = Array.from(store.values()); + expect(savedMessages[0].status).toBe(PoisonMessageStatus.QUARANTINED); + }); + + // ─── Scenario 3: final retry exhausted → discard + critical alert ──────── + + it('sends a CRITICAL alert and discards when all retries are exhausted', async () => { + const job = makeJob('payment.failed', DeadLetterProcessor.MAX_AUTO_RETRIES); + + const result = await processor.process(job as any); + + expect(result.data.status).toBe('discarded'); + expect(sendAlertSpy).toHaveBeenCalledWith( + expect.objectContaining({ isFinal: true, eventName: 'payment.failed' }), + ); + // No re-emit attempt should have been made + const emitSpy = jest.spyOn(eventEmitter, 'emitAsync'); + expect(emitSpy).not.toHaveBeenCalled(); + }); + + // ─── Scenario 4: second occurrence of same event increments retry count ── + + it('increments retryCount for the same event quarantined twice', async () => { + const job1 = makeJob('webhook.failed', 0); + const job2 = makeJob('webhook.failed', 1); + + // Both retries fail + jest.spyOn(eventEmitter, 'emitAsync').mockRejectedValue(new Error('fail')); + + await processor.process(job1 as any); + + // Second job: same event is already quarantined, so findOne returns it + inMemoryPoisonRepo.findOne.mockResolvedValueOnce(Array.from(store.values())[0]); + await processor.process(job2 as any); + + // sendAlert should have been called twice + expect(sendAlertSpy).toHaveBeenCalledTimes(2); + }); + + // ─── Scenario 5: telemetry event is emitted ───────────────────────────── + + it('emits dlq.processed telemetry event after processing', async () => { + const telemetryEvents: any[] = []; + eventEmitter.on('dlq.processed', (payload) => telemetryEvents.push(payload)); + + const job = makeJob('submission.verified', 0); + eventEmitter.on('submission.verified', () => undefined); + + await processor.process(job as any); + + expect(telemetryEvents.length).toBe(1); + expect(telemetryEvents[0]).toMatchObject({ + eventName: 'submission.verified', + status: 'retried', + }); + }); +}); diff --git a/BackEnd/test/jobs/dead-letter.processor.spec.ts b/BackEnd/test/jobs/dead-letter.processor.spec.ts new file mode 100644 index 000000000..703039ded --- /dev/null +++ b/BackEnd/test/jobs/dead-letter.processor.spec.ts @@ -0,0 +1,196 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { DeadLetterProcessor, DeadLetterJobData } from 'src/modules/jobs/processors/dead-letter.processor'; +import { PoisonMessageService } from 'src/events/services/poison-message.service'; +import { DlqAlertService } from 'src/events/services/dlq-alert.service'; +import { PoisonMessageStatus } from 'src/events/entities/poison-message.entity'; +import { Job } from 'bullmq'; + +const makePoisonMessage = (overrides: Partial = {}) => ({ + id: 'pm-123', + eventName: 'test.event', + payload: {}, + metadata: null, + lastError: 'some error', + retryCount: 0, + maxRetries: 3, + status: PoisonMessageStatus.QUARANTINED, + errorHistory: [], + resolvedAt: undefined, + quarantinedAt: new Date(), + updatedAt: new Date(), + ...overrides, +}); + +const makeJob = (data: Partial = {}): Partial> => ({ + id: 'job-001', + timestamp: Date.now(), + data: { + type: 'FAILED_EVENT', + eventName: 'test.event', + eventPayload: { foo: 'bar' }, + error: 'original error', + failedAt: new Date(), + retryCount: 0, + ...data, + } as DeadLetterJobData, + updateProgress: jest.fn().mockResolvedValue(undefined), +}); + +describe('DeadLetterProcessor', () => { + let processor: DeadLetterProcessor; + let poisonMessageService: jest.Mocked; + let dlqAlertService: jest.Mocked; + let eventEmitter: jest.Mocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + DeadLetterProcessor, + { + provide: PoisonMessageService, + useValue: { + quarantine: jest.fn().mockResolvedValue(makePoisonMessage()), + markRetrying: jest.fn().mockResolvedValue(undefined), + markResolved: jest.fn().mockResolvedValue(undefined), + resetToQuarantined: jest.fn().mockResolvedValue(undefined), + }, + }, + { + provide: DlqAlertService, + useValue: { + sendAlert: jest.fn().mockResolvedValue(undefined), + }, + }, + { + provide: EventEmitter2, + useValue: { + emitAsync: jest.fn().mockResolvedValue([]), + emit: jest.fn().mockReturnValue(true), + }, + }, + ], + }).compile(); + + processor = module.get(DeadLetterProcessor); + poisonMessageService = module.get(PoisonMessageService); + dlqAlertService = module.get(DlqAlertService); + eventEmitter = module.get(EventEmitter2); + }); + + describe('process()', () => { + it('should quarantine the message on every call', async () => { + const job = makeJob(); + await processor.process(job as Job); + expect(poisonMessageService.quarantine).toHaveBeenCalledWith( + 'test.event', + { foo: 'bar' }, + 'original error', + expect.objectContaining({ dlqJobId: 'job-001' }), + DeadLetterProcessor.MAX_AUTO_RETRIES, + ); + }); + + it('should re-emit the event and mark resolved when retries remain', async () => { + const job = makeJob({ retryCount: 0 }); + const result = await processor.process(job as Job); + + expect(poisonMessageService.markRetrying).toHaveBeenCalledWith('pm-123'); + expect(eventEmitter.emitAsync).toHaveBeenCalledWith( + 'test.event', + expect.objectContaining({ + foo: 'bar', + _dlq: expect.objectContaining({ retryCount: 1 }), + }), + ); + expect(poisonMessageService.markResolved).toHaveBeenCalledWith('pm-123'); + expect(result.success).toBe(true); + expect(result.data.status).toBe('retried'); + }); + + it('should NOT re-emit when retry count equals MAX_AUTO_RETRIES', async () => { + const job = makeJob({ retryCount: DeadLetterProcessor.MAX_AUTO_RETRIES }); + const result = await processor.process(job as Job); + + expect(eventEmitter.emitAsync).not.toHaveBeenCalled(); + expect(dlqAlertService.sendAlert).toHaveBeenCalledWith( + expect.objectContaining({ isFinal: true }), + ); + expect(result.data.status).toBe('discarded'); + }); + + it('should send a final alert when retry fails and it is the last attempt', async () => { + eventEmitter.emitAsync.mockRejectedValueOnce(new Error('downstream fail')); + const job = makeJob({ retryCount: DeadLetterProcessor.MAX_AUTO_RETRIES - 1 }); + + await processor.process(job as Job); + + expect(poisonMessageService.resetToQuarantined).toHaveBeenCalledWith('pm-123'); + expect(dlqAlertService.sendAlert).toHaveBeenCalledWith( + expect.objectContaining({ isFinal: true }), + ); + }); + + it('should send a non-final alert on intermediate retry failures', async () => { + eventEmitter.emitAsync.mockRejectedValueOnce(new Error('transient fail')); + const job = makeJob({ retryCount: 0 }); + + await processor.process(job as Job); + + expect(dlqAlertService.sendAlert).toHaveBeenCalledWith( + expect.objectContaining({ isFinal: false }), + ); + }); + + it('should emit dlq.processed telemetry event', async () => { + const job = makeJob(); + await processor.process(job as Job); + expect(eventEmitter.emit).toHaveBeenCalledWith( + 'dlq.processed', + expect.objectContaining({ eventName: 'test.event' }), + ); + }); + + it('should handle job with failedJob data structure', async () => { + // Build a job where the error originates solely from failedJob.failedReason + // (no top-level `error` field) so the processor falls back correctly. + const rawJob: Partial> = { + id: 'job-001', + timestamp: Date.now(), + data: { + type: 'FAILED_JOB', + failedAt: new Date(), + retryCount: 0, + failedJob: { + id: 'original-job-99', + name: 'payout:process', + data: { payoutId: 'p-1' }, + failedReason: 'timeout', + }, + } as unknown as DeadLetterJobData, + updateProgress: jest.fn().mockResolvedValue(undefined), + }; + + await processor.process(rawJob as Job); + + expect(poisonMessageService.quarantine).toHaveBeenCalledWith( + 'payout:process', + { payoutId: 'p-1' }, + 'timeout', + expect.any(Object), + DeadLetterProcessor.MAX_AUTO_RETRIES, + ); + }); + + it('should update progress through lifecycle', async () => { + const job = makeJob(); + await processor.process(job as Job); + + const progressCalls = (job.updateProgress as jest.Mock).mock.calls.map( + ([p]: [number]) => p, + ); + expect(progressCalls).toContain(10); + expect(progressCalls).toContain(100); + }); + }); +}); diff --git a/BackEnd/test/jobs/dlq-alert.service.spec.ts b/BackEnd/test/jobs/dlq-alert.service.spec.ts new file mode 100644 index 000000000..8c482a5c5 --- /dev/null +++ b/BackEnd/test/jobs/dlq-alert.service.spec.ts @@ -0,0 +1,146 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { Logger } from '@nestjs/common'; +import { DlqAlertService, DlqAlertPayload } from 'src/events/services/dlq-alert.service'; + +const makeAlert = (overrides: Partial = {}): DlqAlertPayload => ({ + eventName: 'quest.completed', + error: 'Connection timed out', + retryCount: 1, + maxRetries: 3, + poisonMessageId: 'pm-abc', + payload: { questId: 'q-1' }, + isFinal: false, + ...overrides, +}); + +describe('DlqAlertService', () => { + let service: DlqAlertService; + let loggerWarnSpy: jest.SpyInstance; + let loggerErrorSpy: jest.SpyInstance; + let loggerDebugSpy: jest.SpyInstance; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [DlqAlertService], + }).compile(); + + service = module.get(DlqAlertService); + + // Spy on logger methods via the Logger prototype + loggerWarnSpy = jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + loggerErrorSpy = jest.spyOn(Logger.prototype, 'error').mockImplementation(() => undefined); + loggerDebugSpy = jest.spyOn(Logger.prototype, 'debug').mockImplementation(() => undefined); + }); + + afterEach(() => { + jest.restoreAllMocks(); + delete process.env.DLQ_ALERT_SLACK; + delete process.env.DLQ_ALERT_PAGERDUTY; + delete process.env.SLACK_WEBHOOK_URL; + delete process.env.PAGERDUTY_ROUTING_KEY; + }); + + describe('sendAlert()', () => { + it('should emit a WARN log for non-final alerts', async () => { + await service.sendAlert(makeAlert({ isFinal: false })); + expect(loggerWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('[DLQ WARNING]'), + expect.any(Object), + ); + expect(loggerErrorSpy).not.toHaveBeenCalled(); + }); + + it('should emit an ERROR log for final (discard) alerts', async () => { + await service.sendAlert(makeAlert({ isFinal: true })); + expect(loggerErrorSpy).toHaveBeenCalledWith( + expect.stringContaining('[DLQ CRITICAL]'), + expect.any(Object), + ); + }); + + it('should include eventName and poisonMessageId in the log message', async () => { + await service.sendAlert(makeAlert({ eventName: 'payout.failed', poisonMessageId: 'pm-xyz' })); + const logMsg: string = loggerWarnSpy.mock.calls[0][0]; + expect(logMsg).toContain('payout.failed'); + expect(logMsg).toContain('pm-xyz'); + }); + + it('should include retryCount in the structured log context', async () => { + await service.sendAlert(makeAlert({ retryCount: 2 })); + const context = loggerWarnSpy.mock.calls[0][1]; + expect(context.retryCount).toBe(2); + }); + + it('should NOT call sendSlack when DLQ_ALERT_SLACK is not true', async () => { + const slackSpy = jest + .spyOn(service as any, 'sendSlack') + .mockResolvedValue(undefined); + + await service.sendAlert(makeAlert()); + expect(slackSpy).not.toHaveBeenCalled(); + }); + + it('should call sendSlack when DLQ_ALERT_SLACK=true', async () => { + process.env.DLQ_ALERT_SLACK = 'true'; + const slackSpy = jest + .spyOn(service as any, 'sendSlack') + .mockResolvedValue(undefined); + + await service.sendAlert(makeAlert()); + expect(slackSpy).toHaveBeenCalled(); + }); + + it('should call sendPagerDuty when DLQ_ALERT_PAGERDUTY=true', async () => { + process.env.DLQ_ALERT_PAGERDUTY = 'true'; + const pdSpy = jest + .spyOn(service as any, 'sendPagerDuty') + .mockResolvedValue(undefined); + + await service.sendAlert(makeAlert()); + expect(pdSpy).toHaveBeenCalled(); + }); + + it('should log a warning (not throw) if Slack errors', async () => { + process.env.DLQ_ALERT_SLACK = 'true'; + jest + .spyOn(service as any, 'sendSlack') + .mockRejectedValue(new Error('network error')); + + // Should not throw + await expect(service.sendAlert(makeAlert())).resolves.toBeUndefined(); + expect(loggerErrorSpy).toHaveBeenCalledWith( + expect.stringContaining('Slack alert failed'), + ); + }); + + it('should log a warning (not throw) if PagerDuty errors', async () => { + process.env.DLQ_ALERT_PAGERDUTY = 'true'; + jest + .spyOn(service as any, 'sendPagerDuty') + .mockRejectedValue(new Error('pd error')); + + await expect(service.sendAlert(makeAlert())).resolves.toBeUndefined(); + expect(loggerErrorSpy).toHaveBeenCalledWith( + expect.stringContaining('PagerDuty alert failed'), + ); + }); + }); + + describe('sendSlack() (private)', () => { + it('should warn and return early when SLACK_WEBHOOK_URL is not set', async () => { + await (service as any).sendSlack(makeAlert(), 'WARNING'); + expect(loggerWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('SLACK_WEBHOOK_URL not set'), + ); + }); + }); + + describe('sendPagerDuty() (private)', () => { + it('should warn and return early when PAGERDUTY_ROUTING_KEY is not set', async () => { + await (service as any).sendPagerDuty(makeAlert(), 'CRITICAL'); + expect(loggerWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('PAGERDUTY_ROUTING_KEY not set'), + ); + }); + }); +});