0
0
KafkaHow-ToBeginner · 4 min read

How to Use Kafka Consumer in Node.js: Simple Guide

To use a Kafka consumer in Node.js, install the kafkajs library, create a consumer instance with your Kafka broker details, subscribe to a topic, and then run the consumer to listen for messages. The consumer processes messages asynchronously as they arrive.
📐

Syntax

The basic syntax to create a Kafka consumer in Node.js involves importing the Kafka class from kafkajs, creating a consumer with a group ID, connecting it to the Kafka broker, subscribing to a topic, and running a message handler.

  • Kafka: Main class to connect to Kafka brokers.
  • consumer: Created with a group ID to identify the consumer group.
  • subscribe: Registers the consumer to a topic.
  • run: Starts the consumer and processes incoming messages.
javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'test-group' });

async function run() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        value: message.value.toString(),
      });
    },
  });
}

run().catch(console.error);
💻

Example

This example shows a complete Kafka consumer in Node.js using kafkajs. It connects to a local Kafka broker, subscribes to the test-topic, and logs each message received.

javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'example-consumer',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'example-group' });

async function runConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(`Received message: ${message.value.toString()}`);
    },
  });
}

runConsumer().catch(e => console.error(`[consumer error] ${e.message}`, e));
Output
Received message: Hello Kafka Received message: Another message
⚠️

Common Pitfalls

Common mistakes when using Kafka consumers in Node.js include:

  • Not awaiting consumer.connect() before subscribing or running.
  • Forgetting to handle errors, which can crash the app.
  • Not specifying fromBeginning: true if you want to read old messages.
  • Using the same groupId for multiple consumers unintentionally, causing message sharing.

Always handle errors and close the consumer gracefully on shutdown.

javascript
/* Wrong: Missing await on connect and no error handling */
consumer.connect();
consumer.subscribe({ topic: 'test-topic' });
consumer.run({
  eachMessage: async ({ message }) => {
    console.log(message.value.toString());
  }
});

/* Right: Await connect and add error handling */
async function start() {
  try {
    await consumer.connect();
    await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ message }) => {
        console.log(message.value.toString());
      },
    });
  } catch (e) {
    console.error('Error in consumer:', e);
  }
}
start();
📊

Quick Reference

Here is a quick summary of key Kafka consumer methods in kafkajs:

MethodDescription
connect()Connects the consumer to Kafka brokers.
subscribe({ topic, fromBeginning })Subscribes to a topic; fromBeginning reads old messages if true.
run({ eachMessage })Starts the consumer and processes messages with the provided handler.
disconnect()Disconnects the consumer cleanly.
pause()Pauses message consumption.
resume()Resumes message consumption.

Key Takeaways

Use the kafkajs library to create Kafka consumers in Node.js easily.
Always await consumer.connect() before subscribing or running the consumer.
Subscribe to topics with fromBeginning: true to read all messages from the start.
Handle errors properly to avoid crashes and ensure graceful shutdown.
Use unique groupId values to control message distribution among consumers.