0
0
Apache Airflowdevops~5 mins

Dynamic task generation with loops in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes you need to create many similar tasks in a workflow without writing each one by hand. Dynamic task generation with loops helps you create multiple tasks automatically using a loop, saving time and reducing errors.
When you want to process multiple files with the same steps but different file names.
When you need to run the same task for different dates or time periods.
When you want to create tasks for multiple users or customers dynamically.
When your workflow needs to scale by adding tasks based on input data size.
When you want to avoid repeating similar code for each task in your pipeline.
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_item(item):
    print(f"Processing {item}")

def create_dag():
    with DAG(dag_id='dynamic_loop_dag', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
        items = ['file1.csv', 'file2.csv', 'file3.csv']
        tasks = []
        for item in items:
            task = PythonOperator(
                task_id=f'process_{item.replace(".", "_")}',
                python_callable=process_item,
                op_args=[item]
            )
            tasks.append(task)
        return dag

dag = create_dag()

This DAG file defines a workflow that runs daily starting from January 1, 2024.

The process_item function prints the item being processed.

Inside create_dag, a list of items is defined.

A loop creates a PythonOperator task for each item, giving each a unique task_id.

All tasks are added to the DAG and returned.

Commands
List all available DAGs to verify that the new DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
dynamic_loop_dag example_bash_operator example_python_operator
Trigger the dynamic_loop_dag manually to start the workflow and run all dynamically created tasks.
Terminal
airflow dags trigger dynamic_loop_dag
Expected OutputExpected
Created <DagRun dynamic_loop_dag @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the dynamic_loop_dag to see the dynamically generated task IDs.
Terminal
airflow tasks list dynamic_loop_dag
Expected OutputExpected
process_file1_csv process_file2_csv process_file3_csv
Test run the task process_file2_csv for the given date to verify it runs correctly and prints the expected output.
Terminal
airflow tasks test dynamic_loop_dag process_file2_csv 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {python.py:114} INFO - Processing file2.csv [2024-06-01 12:00:00,100] {taskinstance.py:1234} INFO - Task exited with return code 0
Key Concept

If you remember nothing else from this pattern, remember: use a loop inside your DAG definition to create multiple similar tasks dynamically with unique IDs.

Common Mistakes
Using the same task_id for all tasks inside the loop.
Airflow requires unique task IDs; duplicate IDs cause errors and prevent the DAG from loading.
Generate unique task IDs by including the loop variable, like the item name, in the task_id.
Defining tasks outside the DAG context manager.
Tasks must be defined inside the DAG context to be properly registered with the DAG.
Always create tasks inside the 'with DAG(...) as dag:' block.
Not passing the loop variable as an argument to the task's callable.
All tasks would run the same code without knowing which item to process, causing incorrect behavior.
Pass the loop variable as an argument using op_args or op_kwargs to each task.
Summary
Define a list of items to process inside your DAG.
Use a loop to create a task for each item with a unique task_id.
Trigger the DAG and verify tasks run as expected with airflow CLI commands.