How to Use Kafka Consumer in Node.js: Simple Guide
To use a
Kafka consumer in Node.js, install the kafkajs library, create a consumer instance with your Kafka broker details, subscribe to a topic, and then run the consumer to listen for messages. The consumer processes messages asynchronously as they arrive.Syntax
The basic syntax to create a Kafka consumer in Node.js involves importing the Kafka class from kafkajs, creating a consumer with a group ID, connecting it to the Kafka broker, subscribing to a topic, and running a message handler.
- Kafka: Main class to connect to Kafka brokers.
- consumer: Created with a group ID to identify the consumer group.
- subscribe: Registers the consumer to a topic.
- run: Starts the consumer and processes incoming messages.
javascript
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'test-group' }); async function run() { await consumer.connect(); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ value: message.value.toString(), }); }, }); } run().catch(console.error);
Example
This example shows a complete Kafka consumer in Node.js using kafkajs. It connects to a local Kafka broker, subscribes to the test-topic, and logs each message received.
javascript
const { Kafka } = require('kafkajs'); const kafka = new Kafka({ clientId: 'example-consumer', brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'example-group' }); async function runConsumer() { await consumer.connect(); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log(`Received message: ${message.value.toString()}`); }, }); } runConsumer().catch(e => console.error(`[consumer error] ${e.message}`, e));
Output
Received message: Hello Kafka
Received message: Another message
Common Pitfalls
Common mistakes when using Kafka consumers in Node.js include:
- Not awaiting
consumer.connect()before subscribing or running. - Forgetting to handle errors, which can crash the app.
- Not specifying
fromBeginning: trueif you want to read old messages. - Using the same
groupIdfor multiple consumers unintentionally, causing message sharing.
Always handle errors and close the consumer gracefully on shutdown.
javascript
/* Wrong: Missing await on connect and no error handling */ consumer.connect(); consumer.subscribe({ topic: 'test-topic' }); consumer.run({ eachMessage: async ({ message }) => { console.log(message.value.toString()); } }); /* Right: Await connect and add error handling */ async function start() { try { await consumer.connect(); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }); await consumer.run({ eachMessage: async ({ message }) => { console.log(message.value.toString()); }, }); } catch (e) { console.error('Error in consumer:', e); } } start();
Quick Reference
Here is a quick summary of key Kafka consumer methods in kafkajs:
| Method | Description |
|---|---|
connect() | Connects the consumer to Kafka brokers. |
subscribe({ topic, fromBeginning }) | Subscribes to a topic; fromBeginning reads old messages if true. |
run({ eachMessage }) | Starts the consumer and processes messages with the provided handler. |
disconnect() | Disconnects the consumer cleanly. |
pause() | Pauses message consumption. |
resume() | Resumes message consumption. |
Key Takeaways
Use the kafkajs library to create Kafka consumers in Node.js easily.
Always await consumer.connect() before subscribing or running the consumer.
Subscribe to topics with fromBeginning: true to read all messages from the start.
Handle errors properly to avoid crashes and ensure graceful shutdown.
Use unique groupId values to control message distribution among consumers.