Queue consumers (processors) help your app handle tasks one by one or in groups without blocking other work. They listen to a queue and process jobs as they come.
Queue consumers (processors) in NestJS
import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('queueName') export class QueueConsumer { @Process('jobName') async handleJob(job: Job<any>) { // Your job processing logic here } }
@Processor decorator marks the class as a queue consumer for a specific queue.
@Process decorator marks the method that will handle jobs with a specific name.
import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('emailQueue') export class EmailConsumer { @Process('sendEmail') async sendEmailJob(job: Job<{ to: string; subject: string }>) { console.log(`Sending email to ${job.data.to} with subject ${job.data.subject}`); } }
import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('imageQueue') export class ImageConsumer { @Process('resizeImage') async resizeImageJob(job: Job<{ imagePath: string }>) { if (!job.data.imagePath) { console.log('No image path provided'); return; } console.log(`Resizing image at ${job.data.imagePath}`); } }
import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('emptyQueue') export class EmptyConsumer { @Process('noJob') async noJobHandler(job: Job) { console.log('This will never run because no jobs are added'); } }
This NestJS app sets up a queue named 'taskQueue' and a consumer that processes 'printTask' jobs. It adds one job with a message, which the consumer prints.
import { Injectable, Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; import { InjectQueue, Processor, Process } from '@nestjs/bull'; import { Job, Queue } from 'bull'; import { NestFactory } from '@nestjs/core'; @Processor('taskQueue') export class TaskConsumer { @Process('printTask') async handlePrintTask(job: Job<{ message: string }>) { console.log(`Processing task: ${job.data.message}`); } } @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379 } }), BullModule.registerQueue({ name: 'taskQueue' }), ], providers: [TaskConsumer, JobProducer], }) export class AppModule {} @Injectable() class JobProducer { constructor(@InjectQueue('taskQueue') private taskQueue: Queue) {} async addJob() { await this.taskQueue.add('printTask', { message: 'Hello from queue!' }); console.log('Job added to queue'); } } async function bootstrap() { const app = await NestFactory.createApplicationContext(AppModule); const producer = app.get(JobProducer); await producer.addJob(); setTimeout(() => app.close(), 2000); } bootstrap();
Processing a job usually takes O(1) time per job, but depends on job complexity.
Memory use depends on job data size and number of concurrent jobs.
Common mistake: forgetting to register the queue or consumer in the module.
Use queue consumers when tasks can be done later without blocking user actions.
Queue consumers listen to queues and process jobs asynchronously.
Use @Processor to mark a consumer class and @Process to mark job handlers.
This helps keep your app responsive by handling tasks in the background.