0
0
Apache Airflowdevops~5 mins

Mapped tasks for parallel processing in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes you need to run the same task many times with different inputs. Mapped tasks let you do this easily in Airflow by running many tasks in parallel without writing each one by hand.
When you want to process a list of files one by one but at the same time to save time.
When you have many similar jobs like sending emails to multiple users and want to run them all at once.
When you want to run the same data transformation on many datasets in parallel.
When you want to split a big job into smaller parts that can run independently.
When you want to avoid writing repetitive code for tasks that only differ by input values.
Config File - mapped_tasks_dag.py
mapped_tasks_dag.py
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago

with DAG(dag_id='mapped_tasks_example', start_date=days_ago(1), schedule_interval=None, catchup=False) as dag:

    @task
    def process_item(item):
        print(f"Processing item: {item}")
        return f"Processed {item}"

    items = ['apple', 'banana', 'cherry']

    results = process_item.expand(item=items)

This DAG defines a simple task process_item that takes one input called item. The expand method runs this task for each item in the list ['apple', 'banana', 'cherry'] in parallel. This way, the same task runs three times with different inputs.

Commands
This command lists all the DAGs Airflow knows about, so you can check if your DAG is recognized.
Terminal
airflow dags list
Expected OutputExpected
dag_id | filepath ----------------|--------------------------------- mapped_tasks_example | /usr/local/airflow/dags/mapped_tasks_dag.py
This command starts running the DAG named 'mapped_tasks_example' immediately.
Terminal
airflow dags trigger mapped_tasks_example
Expected OutputExpected
Created <DagRun mapped_tasks_example @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
This command shows all tasks in the DAG, including the mapped task.
Terminal
airflow tasks list mapped_tasks_example
Expected OutputExpected
process_item
This command runs the 'process_item' task for the date 2024-06-01 locally to test it.
Terminal
airflow tasks test mapped_tasks_example process_item 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:XXXX} INFO - Processing item: apple [2024-06-01 12:00:00,001] {taskinstance.py:XXXX} INFO - Task succeeded
Key Concept

Mapped tasks let you run the same task many times in parallel with different inputs using the expand() method.

Common Mistakes
Trying to pass a non-iterable value to expand()
expand() expects a list or iterable to run tasks multiple times; a single value causes an error.
Always pass a list or iterable to expand(), even if it has only one item.
Not enabling the DAG file in the Airflow DAGs folder
If the DAG file is not in the correct folder or has syntax errors, Airflow won't detect it.
Place the DAG file in the Airflow DAGs folder and check for syntax errors before triggering.
Expecting mapped tasks to run sequentially
Mapped tasks run in parallel by design, so expecting sequential output can cause confusion.
Design your workflow assuming mapped tasks run independently and in parallel.
Summary
Define a task function with @task decorator that takes one input parameter.
Use the expand() method with a list of inputs to run the task multiple times in parallel.
Trigger the DAG and verify tasks run for each input item.