0
0
AirflowHow-ToBeginner · 3 min read

How to Use @task.branch Decorator in Airflow for Branching Logic

In Airflow, use the @task.branch decorator to define a Python function that decides which downstream task(s) to run next based on logic. This function returns the task ID(s) to execute, enabling dynamic branching in your DAG.
📐

Syntax

The @task.branch decorator is applied to a Python function inside an Airflow DAG. This function must return the task ID or a list of task IDs to run next, enabling conditional branching.

  • @task.branch: Marks the function as a branching task.
  • Function body: Contains logic to decide which task(s) to run next.
  • Return value: A string or list of strings representing downstream task IDs.
python
from airflow.decorators import task

@task.branch
def choose_branch():
    # Logic to decide branch
    if condition:
        return 'task_a'
    else:
        return 'task_b'
💻

Example

This example shows a DAG with a branching task that chooses between two tasks based on a simple condition.

python
from airflow import DAG
from airflow.decorators import task, dag
from airflow.utils.dates import days_ago

@dag(start_date=days_ago(1), schedule_interval=None, catchup=False)
def branching_dag():

    @task.branch
    def choose_branch():
        import random
        if random.choice([True, False]):
            return 'task_a'
        else:
            return 'task_b'

    @task
def task_a():
        print('Running Task A')

    @task
def task_b():
        print('Running Task B')

    @task
def task_c():
        print('Running Task C after branch')

    branch = choose_branch()
    a = task_a()
    b = task_b()
    c = task_c()

    branch >> [a, b] >> c

branching_dag_instance = branching_dag()
Output
When the DAG runs, choose_branch randomly returns 'task_a' or 'task_b'. Only the chosen task and then task_c run.
⚠️

Common Pitfalls

  • Returning task IDs that do not exist in the DAG causes errors.
  • Returning multiple task IDs requires returning a list, not a string.
  • Not using @task.branch decorator will not trigger branching behavior.
  • Downstream tasks not properly linked after branching can cause unexpected execution.
python
from airflow.decorators import task

# Wrong: returns a list as string
@task.branch
def wrong_branch():
    return "['task_a', 'task_b']"  # This is a string, not a list

# Right: returns a list of strings
@task.branch
def right_branch():
    return ['task_a', 'task_b']
📊

Quick Reference

@task.branch lets you create conditional paths in your DAG by returning task IDs to run next.

  • Use it on a function that returns a string or list of strings.
  • Returned task IDs must match downstream task IDs.
  • Branching controls which tasks run next dynamically.

Key Takeaways

Use @task.branch to define a function that returns task IDs for conditional branching.
Return a string for one task or a list of strings for multiple tasks to run next.
Ensure returned task IDs exist and are linked downstream in the DAG.
Without @task.branch, the function will not trigger branching behavior.
Properly link downstream tasks after branching to avoid execution issues.