How to Implement Request-Reply Pattern in RabbitMQ
To implement
request-reply in RabbitMQ, the client sends a message with a reply_to queue and a unique correlation_id. The server processes the request and sends the response back to the reply_to queue using the same correlation_id so the client can match replies to requests.Syntax
The request-reply pattern in RabbitMQ involves these key parts:
- Request Queue: Where the client sends the request message.
- Reply Queue: A temporary queue where the client listens for responses.
- Correlation ID: A unique identifier to match replies with requests.
- Message Properties:
reply_toandcorrelation_idare set in the request message.
The server consumes from the request queue, processes the message, and sends the reply to the reply_to queue with the same correlation_id.
java
channel.basicPublish("request_queue", null, new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(), messageBody.getBytes()); channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { // process reply } }, consumerTag -> {});
Example
This example shows a simple client sending a request and waiting for a reply, and a server processing the request and replying.
java
import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class RpcClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RpcClient(ConnectionFactory factory) throws Exception { connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws Exception { final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8)); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8)); } }, consumerTag -> {}); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); RpcClient rpcClient = new RpcClient(factory); System.out.println(" [x] Requesting fib(6)"); String response = rpcClient.call("6"); System.out.println(" [.] Got '" + response + "'"); rpcClient.close(); } } // Server side public class RpcServer { private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("rpc_queue", false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties props = delivery.getProperties(); AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume("rpc_queue", false, deliverCallback, consumerTag -> {}); } }
Output
[x] Requesting fib(6)
[.] fib(6)
[.] Got '8'
Common Pitfalls
- Not setting
reply_toorcorrelation_id: Without these, the client cannot receive or match replies. - Using a shared reply queue without correlation IDs: This causes confusion when multiple requests are in flight.
- Not acknowledging messages on server: Can cause message redelivery and duplicates.
- Blocking the consumer thread: The server should process requests quickly or use async to avoid blocking.
java
/* Wrong: No correlation_id or reply_to set */ channel.basicPublish("request_queue", null, null, message.getBytes()); /* Right: Set correlation_id and reply_to */ channel.basicPublish("request_queue", null, new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(), message.getBytes());
Quick Reference
Request-Reply Key Points:
- Client sends request with
reply_toqueue and uniquecorrelation_id. - Server consumes request, processes it, and sends reply to
reply_toqueue with samecorrelation_id. - Client listens on reply queue and matches replies by
correlation_id. - Use temporary exclusive queues for replies to avoid conflicts.
- Always acknowledge messages on server after processing.
Key Takeaways
Always set both reply_to and correlation_id properties in request messages.
Use a unique correlation_id to match replies to requests correctly.
The server must send replies to the reply_to queue with the same correlation_id.
Use temporary exclusive queues for replies to avoid message mix-ups.
Acknowledge messages on the server to prevent duplicate processing.