How to Create Parallel Tasks in Airflow: Simple Guide
In Airflow, you create parallel tasks by defining multiple tasks without direct dependencies between them inside a DAG. Use
PythonOperator or other operators for each task, and Airflow’s scheduler runs them in parallel based on available workers.Syntax
To create parallel tasks, define multiple tasks in a DAG without setting dependencies between them. Each task is an instance of an operator like PythonOperator. Airflow runs tasks in parallel if workers are available.
dag: The workflow container.task_id: Unique name for each task.python_callable: The function the task runs.- No
set_upstreamorset_downstreammeans tasks run independently.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def task1_func(): print('Task 1 running') def task2_func(): print('Task 2 running') def task3_func(): print('Task 3 running') with DAG('parallel_tasks_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag: task1 = PythonOperator(task_id='task1', python_callable=task1_func) task2 = PythonOperator(task_id='task2', python_callable=task2_func) task3 = PythonOperator(task_id='task3', python_callable=task3_func) # No dependencies set, tasks run in parallel
Example
This example shows three tasks defined in a DAG without dependencies. When triggered, Airflow runs all three tasks in parallel if workers are free.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import time def task1_func(): print('Task 1 started') time.sleep(5) print('Task 1 finished') def task2_func(): print('Task 2 started') time.sleep(5) print('Task 2 finished') def task3_func(): print('Task 3 started') time.sleep(5) print('Task 3 finished') with DAG('parallel_tasks_example', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag: task1 = PythonOperator(task_id='task1', python_callable=task1_func) task2 = PythonOperator(task_id='task2', python_callable=task2_func) task3 = PythonOperator(task_id='task3', python_callable=task3_func) # Tasks run in parallel because no dependencies are set
Output
Task 1 started
Task 2 started
Task 3 started
Task 1 finished
Task 2 finished
Task 3 finished
Common Pitfalls
Common mistakes when creating parallel tasks include:
- Setting dependencies that force sequential execution, e.g.,
task1 >> task2. - Not having enough workers configured, so tasks queue instead of running in parallel.
- Using
depends_on_past=Truewhich can block parallel runs.
Always check your Airflow worker count and task dependencies to ensure parallelism.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def dummy_func(): print('Running') with DAG('wrong_parallel', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag: task1 = PythonOperator(task_id='task1', python_callable=dummy_func) task2 = PythonOperator(task_id='task2', python_callable=dummy_func) task3 = PythonOperator(task_id='task3', python_callable=dummy_func) # Wrong: This forces sequential execution task1 >> task2 >> task3 # Correct: Remove dependencies to run in parallel # task1, task2, task3 defined without >> or << operators
Quick Reference
Tips for parallel tasks in Airflow:
- Define multiple tasks without dependencies to run in parallel.
- Ensure your Airflow executor and workers support parallelism (e.g., CeleryExecutor).
- Check
max_active_tasksand worker slots to avoid bottlenecks. - Avoid
depends_on_past=Trueif you want parallel runs.
Key Takeaways
Define tasks without dependencies to enable parallel execution in Airflow.
Airflow runs tasks in parallel based on available worker slots and executor configuration.
Avoid chaining tasks if you want them to run simultaneously.
Check Airflow worker count and executor type to support parallelism.
Disable depends_on_past to prevent blocking parallel runs.