How to Use @task.branch Decorator in Airflow for Branching Logic
In Airflow, use the
@task.branch decorator to define a Python function that decides which downstream task(s) to run next based on logic. This function returns the task ID(s) to execute, enabling dynamic branching in your DAG.Syntax
The @task.branch decorator is applied to a Python function inside an Airflow DAG. This function must return the task ID or a list of task IDs to run next, enabling conditional branching.
- @task.branch: Marks the function as a branching task.
- Function body: Contains logic to decide which task(s) to run next.
- Return value: A string or list of strings representing downstream task IDs.
python
from airflow.decorators import task @task.branch def choose_branch(): # Logic to decide branch if condition: return 'task_a' else: return 'task_b'
Example
This example shows a DAG with a branching task that chooses between two tasks based on a simple condition.
python
from airflow import DAG from airflow.decorators import task, dag from airflow.utils.dates import days_ago @dag(start_date=days_ago(1), schedule_interval=None, catchup=False) def branching_dag(): @task.branch def choose_branch(): import random if random.choice([True, False]): return 'task_a' else: return 'task_b' @task def task_a(): print('Running Task A') @task def task_b(): print('Running Task B') @task def task_c(): print('Running Task C after branch') branch = choose_branch() a = task_a() b = task_b() c = task_c() branch >> [a, b] >> c branching_dag_instance = branching_dag()
Output
When the DAG runs, choose_branch randomly returns 'task_a' or 'task_b'. Only the chosen task and then task_c run.
Common Pitfalls
- Returning task IDs that do not exist in the DAG causes errors.
- Returning multiple task IDs requires returning a list, not a string.
- Not using
@task.branchdecorator will not trigger branching behavior. - Downstream tasks not properly linked after branching can cause unexpected execution.
python
from airflow.decorators import task # Wrong: returns a list as string @task.branch def wrong_branch(): return "['task_a', 'task_b']" # This is a string, not a list # Right: returns a list of strings @task.branch def right_branch(): return ['task_a', 'task_b']
Quick Reference
@task.branch lets you create conditional paths in your DAG by returning task IDs to run next.
- Use it on a function that returns a string or list of strings.
- Returned task IDs must match downstream task IDs.
- Branching controls which tasks run next dynamically.
Key Takeaways
Use @task.branch to define a function that returns task IDs for conditional branching.
Return a string for one task or a list of strings for multiple tasks to run next.
Ensure returned task IDs exist and are linked downstream in the DAG.
Without @task.branch, the function will not trigger branching behavior.
Properly link downstream tasks after branching to avoid execution issues.