How to Implement Work Queue with RabbitMQ: Simple Guide
To implement a work queue in
RabbitMQ, create a durable queue and have multiple workers consume messages from it. Use basic_publish to send tasks and basic_consume with manual acknowledgments to process and confirm task completion.Syntax
Here is the basic syntax to create a work queue in RabbitMQ:
channel.queue_declare(queue, durable=True): Declares a durable queue to survive RabbitMQ restarts.channel.basic_publish(exchange='', routing_key=queue, body=message, properties=pika.BasicProperties(delivery_mode=2)): Sends a persistent message to the queue.channel.basic_consume(queue, on_message_callback, auto_ack=False): Starts consuming messages with manual acknowledgment.channel.basic_ack(delivery_tag=method.delivery_tag): Acknowledges message processing to remove it from the queue.
python
channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2) # make message persistent ) channel.basic_consume( queue='task_queue', on_message_callback=callback_function, auto_ack=False ) # channel.basic_ack(delivery_tag=method.delivery_tag) # This should be called inside the callback function
Example
This example shows a simple producer sending tasks and a worker consuming them with manual acknowledgments to implement a reliable work queue.
python
import pika import time # Producer code connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for i in range(5): message = f'Task {i+1}' channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2) # make message persistent ) print(f"Sent {message}") connection.close() # Worker code def callback(ch, method, properties, body): print(f"Received {body.decode()}") time.sleep(1) # simulate work print("Done") ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) channel.basic_qos(prefetch_count=1) # fair dispatch channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False) print('Waiting for messages. To exit press CTRL+C') channel.start_consuming()
Output
Sent Task 1
Sent Task 2
Sent Task 3
Sent Task 4
Sent Task 5
Waiting for messages. To exit press CTRL+C
Received Task 1
Done
Received Task 2
Done
Received Task 3
Done
Received Task 4
Done
Received Task 5
Done
Common Pitfalls
- Not making queues and messages durable: This causes message loss if RabbitMQ restarts.
- Using
auto_ack=True: Messages are removed immediately, risking loss if worker crashes before processing. - Not setting
prefetch_count=1: Can cause uneven task distribution among workers. - Forgetting to acknowledge messages: Causes messages to remain unacknowledged and requeued repeatedly.
python
Wrong: channel.queue_declare(queue='task_queue') # not durable channel.basic_publish(exchange='', routing_key='task_queue', body=message) # not persistent channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True) # auto ack Right: channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2)) channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False) # channel.basic_ack(delivery_tag=method.delivery_tag) # This should be called inside the callback function
Quick Reference
- Queue durability:
queue_declare(durable=True) - Message persistence:
BasicProperties(delivery_mode=2) - Manual ack:
auto_ack=Falseandbasic_ack() - Fair dispatch:
basic_qos(prefetch_count=1)
Key Takeaways
Declare queues as durable and publish messages as persistent to avoid data loss.
Use manual acknowledgments to ensure tasks are processed reliably.
Set prefetch_count=1 for fair task distribution among workers.
Avoid auto_ack=True to prevent losing unprocessed messages.
Always acknowledge messages after successful processing.