Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion BackEnd/src/events/events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EventsService } from './events.service';
import { AuditLogService } from './services/audit-log.service';
import { RetryService } from './services/retry.service';
import { PoisonMessageService } from './services/poison-message.service';
import { DlqAlertService } from './services/dlq-alert.service';
import { EventStoreService } from './event-store/event-store.service';
import { EventStore } from './entities/event-store.entity';
import { PoisonMessage } from './entities/poison-message.entity';
Expand Down Expand Up @@ -43,6 +44,7 @@ import { PayoutListener } from './listeners/payout.listener';
AuditLogService,
RetryService,
PoisonMessageService,
DlqAlertService,
// Event Handlers
QuestEventsHandler,
SubmissionEventsHandler,
Expand All @@ -58,6 +60,6 @@ import { PayoutListener } from './listeners/payout.listener';
SubmissionListener,
PayoutListener,
],
exports: [EventsService, EventStoreService, PoisonMessageService],
exports: [EventsService, EventStoreService, PoisonMessageService, DlqAlertService],
})
export class EventsModule {}
181 changes: 181 additions & 0 deletions BackEnd/src/events/services/dlq-alert.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import { Injectable, Logger } from '@nestjs/common';

export interface DlqAlertPayload {
/** Original event / job name that failed */
eventName: string;
/** Last error message */
error: string;
/** Current retry attempt count */
retryCount: number;
/** Maximum allowed retries */
maxRetries: number;
/** PoisonMessage DB record id */
poisonMessageId: string;
/** Original event payload (may be redacted) */
payload?: unknown;
/** True when this is the last retry (discard) */
isFinal: boolean;
}

/**
* DlqAlertService
*
* Centralises alert dispatch for dead-letter queue failures.
*
* Currently writes structured log entries that can be scraped by Grafana /
* CloudWatch / Datadog. Wire up the optional `sendSlack`, `sendPagerDuty`,
* or `sendEmail` methods (currently no-ops) once the relevant credentials
* are available in your environment.
*
* All channels can be enabled/disabled via environment variables:
* DLQ_ALERT_LOG=true (default: true)
* DLQ_ALERT_SLACK=true (default: false — set SLACK_WEBHOOK_URL too)
* DLQ_ALERT_PAGERDUTY=true (default: false — set PAGERDUTY_ROUTING_KEY)
*/
@Injectable()
export class DlqAlertService {
private readonly logger = new Logger(DlqAlertService.name);

/** Dispatch an alert through every enabled channel. */
async sendAlert(alert: DlqAlertPayload): Promise<void> {
const severity = alert.isFinal ? 'CRITICAL' : 'WARNING';

// Always emit a structured log (parseable by log aggregators)
this.logAlert(alert, severity);

// Conditional channels — expand as needed
if (process.env.DLQ_ALERT_SLACK === 'true') {
await this.sendSlack(alert, severity).catch((err) =>
this.logger.error(`Slack alert failed: ${err.message}`),
);
}

if (process.env.DLQ_ALERT_PAGERDUTY === 'true') {
await this.sendPagerDuty(alert, severity).catch((err) =>
this.logger.error(`PagerDuty alert failed: ${err.message}`),
);
}
}

// ─── Private helpers ────────────────────────────────────────────────────────

private logAlert(alert: DlqAlertPayload, severity: 'WARNING' | 'CRITICAL'): void {
const message =
`[DLQ ${severity}] event="${alert.eventName}" ` +
`retry=${alert.retryCount}/${alert.maxRetries} ` +
`poisonMessageId=${alert.poisonMessageId} ` +
`isFinal=${alert.isFinal} ` +
`error="${alert.error}"`;

const structuredContext = {
dlqAlert: true,
severity,
eventName: alert.eventName,
retryCount: alert.retryCount,
maxRetries: alert.maxRetries,
poisonMessageId: alert.poisonMessageId,
isFinal: alert.isFinal,
// Redact full payload in logs to avoid PII leakage
payloadKeys: alert.payload
? Object.keys(alert.payload as object)
: [],
error: alert.error,
timestamp: new Date().toISOString(),
};

if (alert.isFinal) {
this.logger.error(message, structuredContext);
} else {
this.logger.warn(message, structuredContext);
}
}

/**
* Send a Slack notification via incoming webhook.
* Requires env var: SLACK_WEBHOOK_URL
*/
private async sendSlack(
alert: DlqAlertPayload,
severity: 'WARNING' | 'CRITICAL',
): Promise<void> {
const webhookUrl = process.env.SLACK_WEBHOOK_URL;
if (!webhookUrl) {
this.logger.warn('SLACK_WEBHOOK_URL not set; skipping Slack alert');
return;
}

const emoji = alert.isFinal ? '🔴' : '🟡';
const title = `${emoji} [DLQ ${severity}] ${alert.eventName}`;
const body = {
text: title,
attachments: [
{
color: alert.isFinal ? 'danger' : 'warning',
fields: [
{ title: 'Event', value: alert.eventName, short: true },
{
title: 'Retries',
value: `${alert.retryCount}/${alert.maxRetries}`,
short: true,
},
{ title: 'Is Final', value: String(alert.isFinal), short: true },
{
title: 'Poison Message ID',
value: alert.poisonMessageId,
short: true,
},
{ title: 'Error', value: alert.error, short: false },
],
footer: `StellarEarn DLQ · ${new Date().toISOString()}`,
},
],
};

// Dynamic import to avoid a hard dependency on `axios` in tests
const { default: axios } = await import('axios');
await axios.post(webhookUrl, body, { timeout: 5000 });
this.logger.debug(`Slack alert sent for event "${alert.eventName}"`);
}

/**
* Trigger a PagerDuty incident via the Events API v2.
* Requires env var: PAGERDUTY_ROUTING_KEY
*/
private async sendPagerDuty(
alert: DlqAlertPayload,
severity: 'WARNING' | 'CRITICAL',
): Promise<void> {
const routingKey = process.env.PAGERDUTY_ROUTING_KEY;
if (!routingKey) {
this.logger.warn(
'PAGERDUTY_ROUTING_KEY not set; skipping PagerDuty alert',
);
return;
}

const pdSeverity = alert.isFinal ? 'critical' : 'warning';
const body = {
routing_key: routingKey,
event_action: 'trigger',
payload: {
summary: `[DLQ ${severity}] ${alert.eventName} — ${alert.error}`,
severity: pdSeverity,
source: 'StellarEarn-DLQ',
custom_details: {
eventName: alert.eventName,
retryCount: alert.retryCount,
maxRetries: alert.maxRetries,
poisonMessageId: alert.poisonMessageId,
isFinal: alert.isFinal,
error: alert.error,
},
},
};

const { default: axios } = await import('axios');
await axios.post('https://events.pagerduty.com/v2/enqueue', body, {
timeout: 5000,
});
this.logger.debug(`PagerDuty alert sent for event "${alert.eventName}"`);
}
}
11 changes: 11 additions & 0 deletions BackEnd/src/modules/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { AnalyticsProcessor } from './processors/analytics.processor';
import { QuestProcessor } from './processors/quest.processor';
import { QuestStateReconciliationProcessor } from './processors/quest-state-reconciliation.processor';
import { DependencyProcessor } from './processors/dependency.processor';
import { DeadLetterProcessor } from './processors/dead-letter.processor';
import {
JobLog,
JobLogRetry,
Expand All @@ -30,8 +31,11 @@ import { StellarModule } from '../stellar/stellar.module';
import { AnalyticsModule } from '../analytics/analytics.module';
import { DependencyFreshnessService } from '../../common/services/dependency-freshness.service';
import { EventStore } from '../../events/entities/event-store.entity';
import { PoisonMessage } from '../../events/entities/poison-message.entity';
import { User } from '../users/entities/user.entity';
import { EmailModule } from '../email/email.module';
import { PoisonMessageService } from '../../events/services/poison-message.service';
import { DlqAlertService } from '../../events/services/dlq-alert.service';

@Module({
imports: [
Expand All @@ -45,6 +49,7 @@ import { EmailModule } from '../email/email.module';
Quest,
Submission,
EventStore,
PoisonMessage,
User,
]),
EventEmitterModule,
Expand All @@ -66,6 +71,9 @@ import { EmailModule } from '../email/email.module';
QuestProcessor,
QuestStateReconciliationProcessor,
DependencyProcessor,
DeadLetterProcessor,
PoisonMessageService,
DlqAlertService,
DataExportListener,
DependencyFreshnessService,
],
Expand All @@ -84,6 +92,9 @@ import { EmailModule } from '../email/email.module';
QuestProcessor,
QuestStateReconciliationProcessor,
DependencyProcessor,
DeadLetterProcessor,
PoisonMessageService,
DlqAlertService,
DependencyFreshnessService,
],
})
Expand Down
16 changes: 16 additions & 0 deletions BackEnd/src/modules/jobs/jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ export class JobsService implements OnModuleInit, OnModuleDestroy {
this.dataExportProcessor.processExport.bind(this.dataExportProcessor),
);
}

// Wire the dead-letter processor as the exclusive consumer of the DLQ
if (
this.deadLetterProcessor &&
typeof this.deadLetterProcessor.process === 'function'
) {
this.createWorker(
QUEUES.DEAD_LETTER,
this.deadLetterProcessor.process.bind(this.deadLetterProcessor),
);
this.logger.log('Dead-letter queue processor registered');
} else {
this.logger.warn(
'DeadLetterProcessor not provided — DLQ jobs will not be consumed',
);
}
}

async onModuleDestroy() {
Expand Down
Loading
Loading