0
0
RabbitmqHow-ToBeginner · 4 min read

How to Implement Work Queue with RabbitMQ: Simple Guide

To implement a work queue in RabbitMQ, create a durable queue and have multiple workers consume messages from it. Use basic_publish to send tasks and basic_consume with manual acknowledgments to process and confirm task completion.
📐

Syntax

Here is the basic syntax to create a work queue in RabbitMQ:

  • channel.queue_declare(queue, durable=True): Declares a durable queue to survive RabbitMQ restarts.
  • channel.basic_publish(exchange='', routing_key=queue, body=message, properties=pika.BasicProperties(delivery_mode=2)): Sends a persistent message to the queue.
  • channel.basic_consume(queue, on_message_callback, auto_ack=False): Starts consuming messages with manual acknowledgment.
  • channel.basic_ack(delivery_tag=method.delivery_tag): Acknowledges message processing to remove it from the queue.
python
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # make message persistent
)
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback_function,
    auto_ack=False
)
# channel.basic_ack(delivery_tag=method.delivery_tag)  # This should be called inside the callback function
💻

Example

This example shows a simple producer sending tasks and a worker consuming them with manual acknowledgments to implement a reliable work queue.

python
import pika
import time

# Producer code
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

for i in range(5):
    message = f'Task {i+1}'
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)  # make message persistent
    )
    print(f"Sent {message}")
connection.close()

# Worker code

def callback(ch, method, properties, body):
    print(f"Received {body.decode()}")
    time.sleep(1)  # simulate work
    print("Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # fair dispatch
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Output
Sent Task 1 Sent Task 2 Sent Task 3 Sent Task 4 Sent Task 5 Waiting for messages. To exit press CTRL+C Received Task 1 Done Received Task 2 Done Received Task 3 Done Received Task 4 Done Received Task 5 Done
⚠️

Common Pitfalls

  • Not making queues and messages durable: This causes message loss if RabbitMQ restarts.
  • Using auto_ack=True: Messages are removed immediately, risking loss if worker crashes before processing.
  • Not setting prefetch_count=1: Can cause uneven task distribution among workers.
  • Forgetting to acknowledge messages: Causes messages to remain unacknowledged and requeued repeatedly.
python
Wrong:
channel.queue_declare(queue='task_queue')  # not durable
channel.basic_publish(exchange='', routing_key='task_queue', body=message)  # not persistent
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True)  # auto ack

Right:
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
# channel.basic_ack(delivery_tag=method.delivery_tag)  # This should be called inside the callback function
📊

Quick Reference

  • Queue durability: queue_declare(durable=True)
  • Message persistence: BasicProperties(delivery_mode=2)
  • Manual ack: auto_ack=False and basic_ack()
  • Fair dispatch: basic_qos(prefetch_count=1)

Key Takeaways

Declare queues as durable and publish messages as persistent to avoid data loss.
Use manual acknowledgments to ensure tasks are processed reliably.
Set prefetch_count=1 for fair task distribution among workers.
Avoid auto_ack=True to prevent losing unprocessed messages.
Always acknowledge messages after successful processing.