How to Implement RPC in RabbitMQ: Simple Guide with Example
To implement
RPC in RabbitMQ, create a client that sends a request message with a unique correlation_id and a reply_to queue, and a server that listens for requests, processes them, and sends back the response to the reply_to queue with the same correlation_id. This pattern enables synchronous communication over asynchronous messaging.Syntax
RPC in RabbitMQ involves these key parts:
- Request Queue: Where the client sends the request.
- Reply Queue: A temporary queue where the client listens for the response.
- Correlation ID: A unique identifier to match responses to requests.
- Client: Sends request with
reply_toandcorrelation_id, then waits for the response. - Server: Listens on request queue, processes request, and sends response to
reply_toqueue with the samecorrelation_id.
python
channel.queue_declare(queue='rpc_queue') # Client sends request channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=callback_queue, correlation_id=corr_id ), body=request_body ) # Server consumes from 'rpc_queue' channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) def on_request(ch, method, props, body): response = process_request(body) ch.basic_publish( exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=response ) ch.basic_ack(delivery_tag=method.delivery_tag)
Example
This example shows a simple RPC server and client using Python and RabbitMQ. The client sends a number, and the server returns its Fibonacci value.
python
import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True ) self.response = None self.corr_id = None def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id ), body=str(n) ) while self.response is None: self.connection.process_data_events() return int(self.response) def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(f" [.] fib({n})") response = fib(n) ch.basic_publish( exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") import threading def start_server(): channel.start_consuming() server_thread = threading.Thread(target=start_server) server_thread.start() client = FibonacciRpcClient() print(" [x] Requesting fib(6)") response = client.call(6) print(f" [.] Got {response}") connection.close()
Output
[x] Awaiting RPC requests
[x] Requesting fib(6)
[.] fib(6)
[.] Got 8
Common Pitfalls
- Not setting
reply_toorcorrelation_id: The client won't know where to get the response or how to match it. - Not acknowledging messages on server: Can cause message re-delivery and duplicates.
- Blocking the server: Long processing without concurrency can block other requests.
- Not using exclusive callback queue: Can cause response mix-ups if multiple clients share the same queue.
python
Wrong: channel.basic_publish(exchange='', routing_key='rpc_queue', body='data') # Missing reply_to and correlation_id Right: channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=callback_queue, correlation_id=corr_id ), body='data' )
Quick Reference
- Use a unique
correlation_idto match requests and responses. - Declare a private, exclusive callback queue for replies.
- Server must acknowledge messages after processing.
- Use
basic_qos(prefetch_count=1)on server to avoid overload. - Keep server processing fast or use concurrency for scalability.
Key Takeaways
RPC in RabbitMQ uses a request queue, reply queue, and correlation ID to enable synchronous calls.
Always set reply_to and correlation_id properties on client requests for proper response routing.
The server must acknowledge messages and send responses to the reply queue with the same correlation ID.
Use exclusive callback queues on the client to avoid response mix-ups.
Avoid blocking the server; use concurrency or quick processing to handle multiple RPC calls efficiently.