0
0
RabbitmqHow-ToBeginner · 4 min read

How to Implement Request-Reply Pattern in RabbitMQ

To implement request-reply in RabbitMQ, the client sends a message with a reply_to queue and a unique correlation_id. The server processes the request and sends the response back to the reply_to queue using the same correlation_id so the client can match replies to requests.
📐

Syntax

The request-reply pattern in RabbitMQ involves these key parts:

  • Request Queue: Where the client sends the request message.
  • Reply Queue: A temporary queue where the client listens for responses.
  • Correlation ID: A unique identifier to match replies with requests.
  • Message Properties: reply_to and correlation_id are set in the request message.

The server consumes from the request queue, processes the message, and sends the reply to the reply_to queue with the same correlation_id.

java
channel.basicPublish("request_queue", null, new AMQP.BasicProperties.Builder()
    .correlationId(corrId)
    .replyTo(replyQueueName)
    .build(), messageBody.getBytes());

channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
    if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        // process reply
    }
}, consumerTag -> {});
💻

Example

This example shows a simple client sending a request and waiting for a reply, and a server processing the request and replying.

java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class RpcClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RpcClient(ConnectionFactory factory) throws Exception {
        connection = factory.newConnection();
        channel = connection.createChannel();
        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws Exception {
        final String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8));
            }
        }, consumerTag -> {});

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws Exception {
        connection.close();
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        RpcClient rpcClient = new RpcClient(factory);
        System.out.println(" [x] Requesting fib(6)");
        String response = rpcClient.call("6");
        System.out.println(" [.] Got '" + response + "'");
        rpcClient.close();
    }
}

// Server side
public class RpcServer {
    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("rpc_queue", false, false, false, null);
        channel.basicQos(1);

        System.out.println(" [x] Awaiting RPC requests");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            AMQP.BasicProperties props = delivery.getProperties();
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String response = "";

            try {
                String message = new String(delivery.getBody(), "UTF-8");
                int n = Integer.parseInt(message);
                System.out.println(" [.] fib(" + message + ")");
                response += fib(n);
            } catch (RuntimeException e) {
                System.out.println(" [.] " + e.toString());
            } finally {
                channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        channel.basicConsume("rpc_queue", false, deliverCallback, consumerTag -> {});
    }
}
Output
[x] Requesting fib(6) [.] fib(6) [.] Got '8'
⚠️

Common Pitfalls

  • Not setting reply_to or correlation_id: Without these, the client cannot receive or match replies.
  • Using a shared reply queue without correlation IDs: This causes confusion when multiple requests are in flight.
  • Not acknowledging messages on server: Can cause message redelivery and duplicates.
  • Blocking the consumer thread: The server should process requests quickly or use async to avoid blocking.
java
/* Wrong: No correlation_id or reply_to set */
channel.basicPublish("request_queue", null, null, message.getBytes());

/* Right: Set correlation_id and reply_to */
channel.basicPublish("request_queue", null, new AMQP.BasicProperties.Builder()
    .correlationId(corrId)
    .replyTo(replyQueueName)
    .build(), message.getBytes());
📊

Quick Reference

Request-Reply Key Points:

  • Client sends request with reply_to queue and unique correlation_id.
  • Server consumes request, processes it, and sends reply to reply_to queue with same correlation_id.
  • Client listens on reply queue and matches replies by correlation_id.
  • Use temporary exclusive queues for replies to avoid conflicts.
  • Always acknowledge messages on server after processing.

Key Takeaways

Always set both reply_to and correlation_id properties in request messages.
Use a unique correlation_id to match replies to requests correctly.
The server must send replies to the reply_to queue with the same correlation_id.
Use temporary exclusive queues for replies to avoid message mix-ups.
Acknowledge messages on the server to prevent duplicate processing.