Skip to content

Commit c9603ab

Browse files
committed
feat: support selective queue consumer loading via BACKEND_QUEUE_CONSUMER env var
1 parent 4e6adcd commit c9603ab

File tree

9 files changed

+92
-17
lines changed

9 files changed

+92
-17
lines changed

apps/nestjs-backend/src/features/attachments/attachments-crop.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../event-emitter/event-job/event-job.module';
3-
import { conditionalQueueProcessorProviders } from '../../utils/queue';
3+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../utils/queue';
44
import { ATTACHMENTS_CROP_QUEUE, AttachmentsCropJob } from './attachments-crop.job';
55
import { AttachmentsCropQueueProcessor } from './attachments-crop.processor';
66
import { AttachmentsStorageModule } from './attachments-storage.module';
77

88
@Module({
99
providers: [
10-
...conditionalQueueProcessorProviders(AttachmentsCropQueueProcessor),
10+
...conditionalQueueProcessorProviders({
11+
consumer: QueueConsumerType.ImageCrop,
12+
providers: [AttachmentsCropQueueProcessor],
13+
}),
1114
AttachmentsCropJob,
1215
],
1316
imports: [EventJobModule.registerQueue(ATTACHMENTS_CROP_QUEUE), AttachmentsStorageModule],

apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
3+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
44
import { StorageModule } from '../../attachments/plugins/storage.module';
55
import {
66
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
@@ -9,7 +9,10 @@ import {
99
import { BaseImportAttachmentsCsvQueueProcessor } from './base-import-attachments-csv.processor';
1010
@Module({
1111
providers: [
12-
...conditionalQueueProcessorProviders(BaseImportAttachmentsCsvQueueProcessor),
12+
...conditionalQueueProcessorProviders({
13+
consumer: QueueConsumerType.ImportExport,
14+
providers: [BaseImportAttachmentsCsvQueueProcessor],
15+
}),
1316
BaseImportAttachmentsCsvJob,
1417
],
1518
imports: [EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule],

apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
3+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
44
import { StorageModule } from '../../attachments/plugins/storage.module';
55
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
66
import { BaseImportAttachmentsCsvModule } from './base-import-attachments-csv.module';
@@ -11,7 +11,10 @@ import {
1111
import { BaseImportAttachmentsQueueProcessor } from './base-import-attachments.processor';
1212
@Module({
1313
providers: [
14-
...conditionalQueueProcessorProviders(BaseImportAttachmentsQueueProcessor),
14+
...conditionalQueueProcessorProviders({
15+
consumer: QueueConsumerType.ImportExport,
16+
providers: [BaseImportAttachmentsQueueProcessor],
17+
}),
1518
BaseImportAttachmentsJob,
1619
],
1720
imports: [

apps/nestjs-backend/src/features/base/base-import-processor/base-import-csv.module.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
3+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
44
import { StorageModule } from '../../attachments/plugins/storage.module';
55
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
66
import { BASE_IMPORT_CSV_QUEUE, BaseImportCsvJob } from './base-import-csv.job';
77
import { BaseImportCsvQueueProcessor } from './base-import-csv.processor';
88
import { BaseImportJunctionCsvModule } from './base-import-junction-csv.module';
99

1010
@Module({
11-
providers: [BaseImportCsvJob, ...conditionalQueueProcessorProviders(BaseImportCsvQueueProcessor)],
11+
providers: [
12+
BaseImportCsvJob,
13+
...conditionalQueueProcessorProviders({
14+
consumer: QueueConsumerType.ImportExport,
15+
providers: [BaseImportCsvQueueProcessor],
16+
}),
17+
],
1218
imports: [
1319
EventJobModule.registerQueue(BASE_IMPORT_CSV_QUEUE),
1420
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE),

apps/nestjs-backend/src/features/base/base-import-processor/base-import-junction-csv.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
3+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
44
import { StorageModule } from '../../attachments/plugins/storage.module';
55
import {
66
BASE_IMPORT_JUNCTION_CSV_QUEUE,
@@ -10,7 +10,10 @@ import { BaseImportJunctionCsvQueueProcessor } from './base-import-junction.proc
1010

1111
@Module({
1212
providers: [
13-
...conditionalQueueProcessorProviders(BaseImportJunctionCsvQueueProcessor),
13+
...conditionalQueueProcessorProviders({
14+
consumer: QueueConsumerType.ImportExport,
15+
providers: [BaseImportJunctionCsvQueueProcessor],
16+
}),
1417
BaseImportJunctionCsvJob,
1518
],
1619
imports: [EventJobModule.registerQueue(BASE_IMPORT_JUNCTION_CSV_QUEUE), StorageModule],

apps/nestjs-backend/src/features/import/open-api/import-csv-chunk.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Module } from '@nestjs/common';
22
import { EventEmitterModule } from '@nestjs/event-emitter';
33
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
44
import { ShareDbModule } from '../../../share-db/share-db.module';
5-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
5+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
66
import { StorageModule } from '../../attachments/plugins/storage.module';
77
import { NotificationModule } from '../../notification/notification.module';
88
import { RecordOpenApiModule } from '../../record/open-api/record-open-api.module';
@@ -12,7 +12,10 @@ import { ImportCsvModule } from './import-csv.module';
1212

1313
@Module({
1414
providers: [
15-
...conditionalQueueProcessorProviders(ImportTableCsvChunkQueueProcessor),
15+
...conditionalQueueProcessorProviders({
16+
consumer: QueueConsumerType.ImportExport,
17+
providers: [ImportTableCsvChunkQueueProcessor],
18+
}),
1619
ImportTableCsvChunkJob,
1720
],
1821
imports: [

apps/nestjs-backend/src/features/import/open-api/import-csv.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Module } from '@nestjs/common';
22
import { EventEmitterModule } from '@nestjs/event-emitter';
33
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
44
import { ShareDbModule } from '../../../share-db/share-db.module';
5-
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
5+
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
66
import { StorageModule } from '../../attachments/plugins/storage.module';
77
import { NotificationModule } from '../../notification/notification.module';
88
import { RecordOpenApiModule } from '../../record/open-api/record-open-api.module';
@@ -11,7 +11,10 @@ import { ImportTableCsvQueueProcessor } from './import-csv.processor';
1111

1212
@Module({
1313
providers: [
14-
...conditionalQueueProcessorProviders(ImportTableCsvQueueProcessor),
14+
...conditionalQueueProcessorProviders({
15+
consumer: QueueConsumerType.ImportExport,
16+
providers: [ImportTableCsvQueueProcessor],
17+
}),
1518
ImportTableCsvJob,
1619
],
1720
imports: [

apps/nestjs-backend/src/features/mail-sender/open-api/mail-sender.merge.module.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ import { MailSenderMergeProcessor } from './mail-sender.merge.processor';
1212
EventJobModule.registerQueue(MAIL_SENDER_QUEUE),
1313
SettingOpenApiModule,
1414
],
15-
providers: [...conditionalQueueProcessorProviders(MailSenderMergeProcessor), MailSenderMergeJob],
15+
providers: [
16+
...conditionalQueueProcessorProviders({
17+
providers: [MailSenderMergeProcessor],
18+
}),
19+
MailSenderMergeJob,
20+
],
1621
exports: [MailSenderMergeJob],
1722
})
1823
export class MailSenderMergeModule {}
Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,49 @@
1-
export function conditionalQueueProcessorProviders<T>(...providers: T[]): T[] {
2-
return process.env.BACKEND_DISABLE_QUEUE_CONSUMER === 'true' ? [] : providers;
1+
import { Logger } from '@nestjs/common';
2+
3+
export enum QueueConsumerType {
4+
Automation = 'automation',
5+
ImportExport = 'import-export',
6+
ImageCrop = 'image-crop',
7+
Default = 'default',
8+
}
9+
10+
export function conditionalQueueProcessorProviders<T>(
11+
...opts: {
12+
consumer?: QueueConsumerType;
13+
providers: T[];
14+
}[]
15+
): T[] {
16+
if (process.env.BACKEND_DISABLE_QUEUE_CONSUMER === 'true') {
17+
return [];
18+
}
19+
const selectedConsumer = (process.env.BACKEND_QUEUE_CONSUMER?.split(',')?.filter((v) =>
20+
Object.values(QueueConsumerType).includes(v as QueueConsumerType)
21+
) || []) as QueueConsumerType[];
22+
23+
// If selected consumer is provided, return providers for the selected consumer
24+
if (selectedConsumer.length > 0) {
25+
const providers: T[] = [];
26+
for (const opt of opts) {
27+
const consumer = opt.consumer || QueueConsumerType.Default;
28+
if (selectedConsumer.includes(consumer)) {
29+
providers.push(...opt.providers);
30+
}
31+
}
32+
providers.length > 0 &&
33+
Logger.log(
34+
`Queue Consumer Providers (${selectedConsumer.join(', ')}): ${providers.map((p) => (typeof p === 'function' ? p.name : 'unknown')).join(', ')}`
35+
);
36+
return providers;
37+
}
38+
39+
// If no selected consumer is provided, return providers for all consumers
40+
const providers: T[] = [];
41+
for (const opt of opts) {
42+
providers.push(...opt.providers);
43+
}
44+
providers.length > 0 &&
45+
Logger.log(
46+
`Queue Consumer Providers (ALL): ${providers.map((p) => (typeof p === 'function' ? p.name : 'unknown')).join(', ')}`
47+
);
48+
return providers;
349
}

0 commit comments

Comments
 (0)