Skip to content

Commit 4e6adcd

Browse files
committed
feat: separate queue jobs from consumers for configurable processing
1 parent 032b69e commit 4e6adcd

31 files changed

+401
-278
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { InjectQueue } from '@nestjs/bullmq';
2+
import { Injectable } from '@nestjs/common';
3+
import { Queue } from 'bullmq';
4+
5+
export interface IRecordImageJob {
6+
bucket: string;
7+
token: string;
8+
path: string;
9+
mimetype: string;
10+
height?: number | null;
11+
}
12+
13+
export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue';
14+
15+
@Injectable()
16+
export class AttachmentsCropJob {
17+
constructor(@InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue<IRecordImageJob>) {}
18+
19+
addAttachmentCropImage(data: IRecordImageJob) {
20+
return this.queue.add('attachment_crop_image', data);
21+
}
22+
}
Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../event-emitter/event-job/event-job.module';
3-
import {
4-
ATTACHMENTS_CROP_QUEUE,
5-
AttachmentsCropQueueProcessor,
6-
} from './attachments-crop.processor';
3+
import { conditionalQueueProcessorProviders } from '../../utils/queue';
4+
import { ATTACHMENTS_CROP_QUEUE, AttachmentsCropJob } from './attachments-crop.job';
5+
import { AttachmentsCropQueueProcessor } from './attachments-crop.processor';
76
import { AttachmentsStorageModule } from './attachments-storage.module';
87

98
@Module({
10-
providers: [AttachmentsCropQueueProcessor],
9+
providers: [
10+
...conditionalQueueProcessorProviders(AttachmentsCropQueueProcessor),
11+
AttachmentsCropJob,
12+
],
1113
imports: [EventJobModule.registerQueue(ATTACHMENTS_CROP_QUEUE), AttachmentsStorageModule],
12-
exports: [AttachmentsCropQueueProcessor],
14+
exports: [AttachmentsCropJob],
1315
})
1416
export class AttachmentsCropModule {}

apps/nestjs-backend/src/features/attachments/attachments-crop.processor.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
1-
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
1+
import { Processor, WorkerHost } from '@nestjs/bullmq';
22
import { Injectable, Logger } from '@nestjs/common';
33
import { PrismaService } from '@teable/db-main-prisma';
4-
import { Queue } from 'bullmq';
54
import type { Job } from 'bullmq';
65
import { EventEmitterService } from '../../event-emitter/event-emitter.service';
76
import { Events } from '../../event-emitter/events';
87
import { AttachmentsStorageService } from '../attachments/attachments-storage.service';
9-
10-
interface IRecordImageJob {
11-
bucket: string;
12-
token: string;
13-
path: string;
14-
mimetype: string;
15-
height?: number | null;
16-
}
17-
18-
export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue';
8+
import type { IRecordImageJob } from './attachments-crop.job';
9+
import { ATTACHMENTS_CROP_QUEUE } from './attachments-crop.job';
1910

2011
@Injectable()
2112
@Processor(ATTACHMENTS_CROP_QUEUE)
@@ -25,8 +16,7 @@ export class AttachmentsCropQueueProcessor extends WorkerHost {
2516
constructor(
2617
private readonly prismaService: PrismaService,
2718
private readonly attachmentsStorageService: AttachmentsStorageService,
28-
private readonly eventEmitterService: EventEmitterService,
29-
@InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue<IRecordImageJob>
19+
private readonly eventEmitterService: EventEmitterService
3020
) {
3121
super();
3222
}

apps/nestjs-backend/src/features/attachments/attachments.service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { ThresholdConfig, IThresholdConfig } from '../../configs/threshold.confi
2727
import type { IClsStore } from '../../types/cls';
2828
import { FileUtils } from '../../utils';
2929
import { second } from '../../utils/second';
30-
import { AttachmentsCropQueueProcessor } from './attachments-crop.processor';
30+
import { AttachmentsCropJob } from './attachments-crop.job';
3131
import { AttachmentsStorageService } from './attachments-storage.service';
3232
import StorageAdapter from './plugins/adapter';
3333
import type { LocalStorage } from './plugins/local';
@@ -42,7 +42,7 @@ export class AttachmentsService {
4242
private readonly cls: ClsService<IClsStore>,
4343
private readonly cacheService: CacheService,
4444
private readonly attachmentsStorageService: AttachmentsStorageService,
45-
private readonly attachmentsCropQueueProcessor: AttachmentsCropQueueProcessor,
45+
private readonly attachmentsCropJob: AttachmentsCropJob,
4646
@StorageConfig() readonly storageConfig: IStorageConfig,
4747
@ThresholdConfig() readonly thresholdConfig: IThresholdConfig,
4848
@InjectStorageAdapter() readonly storageAdapter: StorageAdapter
@@ -170,7 +170,7 @@ export class AttachmentsService {
170170
path: true,
171171
},
172172
});
173-
await this.attachmentsCropQueueProcessor.queue.add('attachment_crop_image', {
173+
await this.attachmentsCropJob.addAttachmentCropImage({
174174
token: attachment.token,
175175
path: attachment.path,
176176
mimetype: attachment.mimetype,
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { InjectQueue } from '@nestjs/bullmq';
2+
import { Injectable } from '@nestjs/common';
3+
import { Queue } from 'bullmq';
4+
5+
export interface IBaseImportAttachmentsCsvJob {
6+
path: string;
7+
userId: string;
8+
}
9+
10+
export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue';
11+
12+
@Injectable()
13+
export class BaseImportAttachmentsCsvJob {
14+
constructor(
15+
@InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
16+
public readonly queue: Queue<IBaseImportAttachmentsCsvJob>
17+
) {}
18+
}
Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3+
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
34
import { StorageModule } from '../../attachments/plugins/storage.module';
45
import {
5-
BaseImportAttachmentsCsvQueueProcessor,
66
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
7-
} from './base-import-attachments-csv.processor';
8-
7+
BaseImportAttachmentsCsvJob,
8+
} from './base-import-attachments-csv.job';
9+
import { BaseImportAttachmentsCsvQueueProcessor } from './base-import-attachments-csv.processor';
910
@Module({
10-
providers: [BaseImportAttachmentsCsvQueueProcessor],
11+
providers: [
12+
...conditionalQueueProcessorProviders(BaseImportAttachmentsCsvQueueProcessor),
13+
BaseImportAttachmentsCsvJob,
14+
],
1115
imports: [EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule],
12-
exports: [BaseImportAttachmentsCsvQueueProcessor],
16+
exports: [BaseImportAttachmentsCsvJob],
1317
})
1418
export class BaseImportAttachmentsCsvModule {}

apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments-csv.processor.ts

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
1+
import { Processor, WorkerHost } from '@nestjs/bullmq';
22
import { Injectable, Logger } from '@nestjs/common';
33
import type { Attachments } from '@teable/db-main-prisma';
44
import { PrismaService } from '@teable/db-main-prisma';
55
import { UploadType } from '@teable/openapi';
66
import type { Job } from 'bullmq';
7-
import { Queue } from 'bullmq';
87
import * as csvParser from 'csv-parser';
98
import * as unzipper from 'unzipper';
109
import StorageAdapter from '../../attachments/plugins/adapter';
1110
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
1211
import { BatchProcessor } from '../BatchProcessor.class';
13-
14-
interface IBaseImportAttachmentsCsvJob {
15-
path: string;
16-
userId: string;
17-
}
18-
19-
export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue';
12+
import type { IBaseImportAttachmentsCsvJob } from './base-import-attachments-csv.job';
13+
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
2014

2115
@Injectable()
2216
@Processor(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
@@ -27,9 +21,7 @@ export class BaseImportAttachmentsCsvQueueProcessor extends WorkerHost {
2721

2822
constructor(
2923
private readonly prismaService: PrismaService,
30-
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
31-
@InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
32-
public readonly queue: Queue<IBaseImportAttachmentsCsvJob>
24+
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter
3325
) {
3426
super();
3527
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { InjectQueue } from '@nestjs/bullmq';
2+
import { Injectable } from '@nestjs/common';
3+
import { Queue } from 'bullmq';
4+
5+
export interface IBaseImportJob {
6+
path: string;
7+
userId: string;
8+
}
9+
10+
export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue';
11+
12+
@Injectable()
13+
export class BaseImportAttachmentsJob {
14+
constructor(
15+
@InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue<IBaseImportJob>
16+
) {}
17+
}
Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
import { Module } from '@nestjs/common';
22
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
3+
import { conditionalQueueProcessorProviders } from '../../../utils/queue';
34
import { StorageModule } from '../../attachments/plugins/storage.module';
5+
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
46
import { BaseImportAttachmentsCsvModule } from './base-import-attachments-csv.module';
5-
import {
6-
BaseImportAttachmentsCsvQueueProcessor,
7-
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
8-
} from './base-import-attachments-csv.processor';
97
import {
108
BASE_IMPORT_ATTACHMENTS_QUEUE,
11-
BaseImportAttachmentsQueueProcessor,
12-
} from './base-import-attachments.processor';
9+
BaseImportAttachmentsJob,
10+
} from './base-import-attachments.job';
11+
import { BaseImportAttachmentsQueueProcessor } from './base-import-attachments.processor';
1312
@Module({
14-
providers: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor],
13+
providers: [
14+
...conditionalQueueProcessorProviders(BaseImportAttachmentsQueueProcessor),
15+
BaseImportAttachmentsJob,
16+
],
1517
imports: [
1618
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_QUEUE),
1719
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE),
1820
StorageModule,
1921
BaseImportAttachmentsCsvModule,
2022
],
21-
exports: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor],
23+
exports: [BaseImportAttachmentsJob],
2224
})
2325
export class BaseImportAttachmentsModule {}

apps/nestjs-backend/src/features/base/base-import-processor/base-import-attachments.processor.ts

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
/* eslint-disable sonarjs/no-duplicate-string */
22
import { PassThrough } from 'stream';
3-
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
3+
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
44
import { Injectable, Logger } from '@nestjs/common';
55
import { PrismaService } from '@teable/db-main-prisma';
66
import { UploadType } from '@teable/openapi';
7-
import { Queue, Job } from 'bullmq';
7+
import { Job } from 'bullmq';
88
import * as unzipper from 'unzipper';
99
import StorageAdapter from '../../attachments/plugins/adapter';
1010
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
1111
import {
1212
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
13-
BaseImportAttachmentsCsvQueueProcessor,
14-
} from './base-import-attachments-csv.processor';
15-
16-
interface IBaseImportJob {
17-
path: string;
18-
userId: string;
19-
}
20-
21-
export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue';
13+
BaseImportAttachmentsCsvJob,
14+
} from './base-import-attachments-csv.job';
15+
import type { IBaseImportJob } from './base-import-attachments.job';
16+
import { BASE_IMPORT_ATTACHMENTS_QUEUE } from './base-import-attachments.job';
2217

2318
@Injectable()
2419
@Processor(BASE_IMPORT_ATTACHMENTS_QUEUE)
@@ -28,9 +23,8 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost {
2823

2924
constructor(
3025
private readonly prismaService: PrismaService,
31-
private readonly baseImportAttachmentsCsvQueueProcessor: BaseImportAttachmentsCsvQueueProcessor,
32-
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
33-
@InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue<IBaseImportJob>
26+
private readonly baseImportAttachmentsCsvJob: BaseImportAttachmentsCsvJob,
27+
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter
3428
) {
3529
super();
3630
}
@@ -234,7 +228,7 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost {
234228
@OnWorkerEvent('completed')
235229
async onCompleted(job: Job) {
236230
const { path, userId } = job.data;
237-
this.baseImportAttachmentsCsvQueueProcessor.queue.add(
231+
this.baseImportAttachmentsCsvJob.queue.add(
238232
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
239233
{
240234
path,

0 commit comments

Comments
 (0)