0
0
AirflowHow-ToBeginner · 3 min read

How to Create Parallel Tasks in Airflow: Simple Guide

In Airflow, you create parallel tasks by defining multiple tasks without direct dependencies between them inside a DAG. Use PythonOperator or other operators for each task, and Airflow’s scheduler runs them in parallel based on available workers.
📐

Syntax

To create parallel tasks, define multiple tasks in a DAG without setting dependencies between them. Each task is an instance of an operator like PythonOperator. Airflow runs tasks in parallel if workers are available.

  • dag: The workflow container.
  • task_id: Unique name for each task.
  • python_callable: The function the task runs.
  • No set_upstream or set_downstream means tasks run independently.
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def task1_func():
    print('Task 1 running')

def task2_func():
    print('Task 2 running')

def task3_func():
    print('Task 3 running')

with DAG('parallel_tasks_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    task1 = PythonOperator(task_id='task1', python_callable=task1_func)
    task2 = PythonOperator(task_id='task2', python_callable=task2_func)
    task3 = PythonOperator(task_id='task3', python_callable=task3_func)

# No dependencies set, tasks run in parallel
💻

Example

This example shows three tasks defined in a DAG without dependencies. When triggered, Airflow runs all three tasks in parallel if workers are free.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import time

def task1_func():
    print('Task 1 started')
    time.sleep(5)
    print('Task 1 finished')

def task2_func():
    print('Task 2 started')
    time.sleep(5)
    print('Task 2 finished')

def task3_func():
    print('Task 3 started')
    time.sleep(5)
    print('Task 3 finished')

with DAG('parallel_tasks_example', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag:
    task1 = PythonOperator(task_id='task1', python_callable=task1_func)
    task2 = PythonOperator(task_id='task2', python_callable=task2_func)
    task3 = PythonOperator(task_id='task3', python_callable=task3_func)

# Tasks run in parallel because no dependencies are set
Output
Task 1 started Task 2 started Task 3 started Task 1 finished Task 2 finished Task 3 finished
⚠️

Common Pitfalls

Common mistakes when creating parallel tasks include:

  • Setting dependencies that force sequential execution, e.g., task1 >> task2.
  • Not having enough workers configured, so tasks queue instead of running in parallel.
  • Using depends_on_past=True which can block parallel runs.

Always check your Airflow worker count and task dependencies to ensure parallelism.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def dummy_func():
    print('Running')

with DAG('wrong_parallel', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag:
    task1 = PythonOperator(task_id='task1', python_callable=dummy_func)
    task2 = PythonOperator(task_id='task2', python_callable=dummy_func)
    task3 = PythonOperator(task_id='task3', python_callable=dummy_func)

    # Wrong: This forces sequential execution
    task1 >> task2 >> task3

# Correct: Remove dependencies to run in parallel
# task1, task2, task3 defined without >> or << operators
📊

Quick Reference

Tips for parallel tasks in Airflow:

  • Define multiple tasks without dependencies to run in parallel.
  • Ensure your Airflow executor and workers support parallelism (e.g., CeleryExecutor).
  • Check max_active_tasks and worker slots to avoid bottlenecks.
  • Avoid depends_on_past=True if you want parallel runs.

Key Takeaways

Define tasks without dependencies to enable parallel execution in Airflow.
Airflow runs tasks in parallel based on available worker slots and executor configuration.
Avoid chaining tasks if you want them to run simultaneously.
Check Airflow worker count and executor type to support parallelism.
Disable depends_on_past to prevent blocking parallel runs.