0
0
NestJSframework~5 mins

Queue consumers (processors) in NestJS

Choose your learning style9 modes available
Introduction

Queue consumers (processors) help your app handle tasks one by one or in groups without blocking other work. They listen to a queue and process jobs as they come.

When you want to send emails without making users wait.
When you need to resize images after upload without slowing down the app.
When processing orders or payments in the background.
When handling notifications that can be delayed.
When you want to retry failed tasks automatically.
Syntax
NestJS
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('queueName')
export class QueueConsumer {
  @Process('jobName')
  async handleJob(job: Job<any>) {
    // Your job processing logic here
  }
}

@Processor decorator marks the class as a queue consumer for a specific queue.

@Process decorator marks the method that will handle jobs with a specific name.

Examples
Processes 'sendEmail' jobs from 'emailQueue'. Logs email details.
NestJS
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('emailQueue')
export class EmailConsumer {
  @Process('sendEmail')
  async sendEmailJob(job: Job<{ to: string; subject: string }>) {
    console.log(`Sending email to ${job.data.to} with subject ${job.data.subject}`);
  }
}
Handles 'resizeImage' jobs and checks if image path is provided before processing.
NestJS
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('imageQueue')
export class ImageConsumer {
  @Process('resizeImage')
  async resizeImageJob(job: Job<{ imagePath: string }>) {
    if (!job.data.imagePath) {
      console.log('No image path provided');
      return;
    }
    console.log(`Resizing image at ${job.data.imagePath}`);
  }
}
Example of a consumer with no jobs added yet. Shows what happens if queue is empty.
NestJS
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('emptyQueue')
export class EmptyConsumer {
  @Process('noJob')
  async noJobHandler(job: Job) {
    console.log('This will never run because no jobs are added');
  }
}
Sample Program

This NestJS app sets up a queue named 'taskQueue' and a consumer that processes 'printTask' jobs. It adds one job with a message, which the consumer prints.

NestJS
import { Injectable, Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { InjectQueue, Processor, Process } from '@nestjs/bull';
import { Job, Queue } from 'bull';
import { NestFactory } from '@nestjs/core';

@Processor('taskQueue')
export class TaskConsumer {
  @Process('printTask')
  async handlePrintTask(job: Job<{ message: string }>) {
    console.log(`Processing task: ${job.data.message}`);
  }
}

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379
      }
    }),
    BullModule.registerQueue({ name: 'taskQueue' }),
  ],
  providers: [TaskConsumer, JobProducer],
})
export class AppModule {}

@Injectable()
class JobProducer {
  constructor(@InjectQueue('taskQueue') private taskQueue: Queue) {}

  async addJob() {
    await this.taskQueue.add('printTask', { message: 'Hello from queue!' });
    console.log('Job added to queue');
  }
}

async function bootstrap() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const producer = app.get(JobProducer);
  await producer.addJob();
  setTimeout(() => app.close(), 2000);
}

bootstrap();
OutputSuccess
Important Notes

Processing a job usually takes O(1) time per job, but depends on job complexity.

Memory use depends on job data size and number of concurrent jobs.

Common mistake: forgetting to register the queue or consumer in the module.

Use queue consumers when tasks can be done later without blocking user actions.

Summary

Queue consumers listen to queues and process jobs asynchronously.

Use @Processor to mark a consumer class and @Process to mark job handlers.

This helps keep your app responsive by handling tasks in the background.