How to Use Redis with Celery in Flask for Background Tasks
To use
Redis with Celery in a Flask app, configure Celery with Redis as the broker URL and initialize it with your Flask app context. This setup allows Celery to handle background tasks using Redis as a message queue.Syntax
Here is the basic syntax to configure Celery with Redis in a Flask app:
- Import Celery and Flask.
- Create a Flask app instance.
- Initialize Celery with the Flask app's config.
- Set the broker URL to Redis (e.g.,
redis://localhost:6379/0). - Define tasks using Celery's
@celery.taskdecorator.
python
from flask import Flask from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0' ) celery = make_celery(app)
Example
This example shows a Flask app with a Celery task that adds two numbers asynchronously using Redis as the broker and result backend.
python
from flask import Flask, request, jsonify from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0' ) celery = make_celery(app) @celery.task() def add(x, y): return x + y @app.route('/add', methods=['POST']) def call_add(): data = request.get_json() x = data.get('x') y = data.get('y') task = add.delay(x, y) return jsonify({'task_id': task.id}), 202 @app.route('/result/<task_id>') def get_result(task_id): task = add.AsyncResult(task_id) if task.state == 'PENDING': response = {'state': task.state, 'result': None} elif task.state != 'FAILURE': response = {'state': task.state, 'result': task.result} else: response = {'state': task.state, 'result': str(task.info)} return jsonify(response) if __name__ == '__main__': app.run(debug=True)
Output
1. POST /add with JSON {"x": 4, "y": 5} returns {"task_id": "some-task-id"} with status 202
2. GET /result/some-task-id returns {"state": "SUCCESS", "result": 9}
Common Pitfalls
Common mistakes when using Redis with Celery in Flask include:
- Not running a Redis server locally or on the configured URL.
- Forgetting to start the Celery worker process with the correct app context.
- Not configuring the Flask app context inside Celery tasks, causing errors accessing Flask extensions.
- Using different Redis URLs for broker and backend inconsistently.
Always ensure Redis is running and start Celery workers with celery -A your_module.celery worker --loglevel=info.
python
## Wrong: Missing Flask context in task @celery.task() def add_wrong(x, y): # This may fail if accessing Flask app context or extensions return x + y ## Right: Use ContextTask to wrap tasks class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask
Quick Reference
Redis URL format: redis://hostname:port/db_number (default port 6379, db 0)
Start Redis server: Run redis-server in your terminal.
Start Celery worker: celery -A your_module.celery worker --loglevel=info
Flask app config keys: CELERY_BROKER_URL and CELERY_RESULT_BACKEND
Key Takeaways
Configure Celery with Redis URLs in Flask app config for broker and backend.
Wrap Celery tasks to run within Flask app context to access Flask features safely.
Ensure Redis server is running and Celery worker is started properly.
Use Celery's delay() method to call tasks asynchronously from Flask routes.
Check task results via AsyncResult using the task ID returned from delay().