0
0
RabbitmqHow-ToBeginner · 4 min read

How to Implement RPC in RabbitMQ: Simple Guide with Example

To implement RPC in RabbitMQ, create a client that sends a request message with a unique correlation_id and a reply_to queue, and a server that listens for requests, processes them, and sends back the response to the reply_to queue with the same correlation_id. This pattern enables synchronous communication over asynchronous messaging.
📐

Syntax

RPC in RabbitMQ involves these key parts:

  • Request Queue: Where the client sends the request.
  • Reply Queue: A temporary queue where the client listens for the response.
  • Correlation ID: A unique identifier to match responses to requests.
  • Client: Sends request with reply_to and correlation_id, then waits for the response.
  • Server: Listens on request queue, processes request, and sends response to reply_to queue with the same correlation_id.
python
channel.queue_declare(queue='rpc_queue')

# Client sends request
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=callback_queue,
        correlation_id=corr_id
    ),
    body=request_body
)

# Server consumes from 'rpc_queue'
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

def on_request(ch, method, props, body):
    response = process_request(body)
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=response
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)
💻

Example

This example shows a simple RPC server and client using Python and RabbitMQ. The client sends a number, and the server returns its Fibonacci value.

python
import pika
import uuid

class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id
            ),
            body=str(n)
        )
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def on_request(ch, method, props, body):
    n = int(body)
    print(f" [.] fib({n})")
    response = fib(n)

    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

import threading

def start_server():
    channel.start_consuming()

server_thread = threading.Thread(target=start_server)
server_thread.start()

client = FibonacciRpcClient()
print(" [x] Requesting fib(6)")
response = client.call(6)
print(f" [.] Got {response}")

connection.close()
Output
[x] Awaiting RPC requests [x] Requesting fib(6) [.] fib(6) [.] Got 8
⚠️

Common Pitfalls

  • Not setting reply_to or correlation_id: The client won't know where to get the response or how to match it.
  • Not acknowledging messages on server: Can cause message re-delivery and duplicates.
  • Blocking the server: Long processing without concurrency can block other requests.
  • Not using exclusive callback queue: Can cause response mix-ups if multiple clients share the same queue.
python
Wrong:
channel.basic_publish(exchange='', routing_key='rpc_queue', body='data')
# Missing reply_to and correlation_id

Right:
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=callback_queue,
        correlation_id=corr_id
    ),
    body='data'
)
📊

Quick Reference

  • Use a unique correlation_id to match requests and responses.
  • Declare a private, exclusive callback queue for replies.
  • Server must acknowledge messages after processing.
  • Use basic_qos(prefetch_count=1) on server to avoid overload.
  • Keep server processing fast or use concurrency for scalability.

Key Takeaways

RPC in RabbitMQ uses a request queue, reply queue, and correlation ID to enable synchronous calls.
Always set reply_to and correlation_id properties on client requests for proper response routing.
The server must acknowledge messages and send responses to the reply queue with the same correlation ID.
Use exclusive callback queues on the client to avoid response mix-ups.
Avoid blocking the server; use concurrency or quick processing to handle multiple RPC calls efficiently.