0
0
Apache Airflowdevops~5 mins

Why branching handles conditional logic in Apache Airflow - Why It Works

Choose your learning style9 modes available
Introduction
Sometimes workflows need to make choices based on conditions. Branching in Airflow lets you decide which path to follow next, so your tasks run only when they should.
When you want to run different tasks depending on the day of the week.
When you need to skip some steps if a file is missing.
When you want to run a task only if a previous task succeeded.
When you want to send notifications only if errors occur.
When you want to run different data processing pipelines based on input values.
Config File - branching_dag.py
branching_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator

def choose_branch(**kwargs):
    condition = kwargs['dag_run'].conf.get('run_branch', 'branch_a')
    if condition == 'branch_a':
        return 'task_branch_a'
    else:
        return 'task_branch_b'

def task_a():
    print('Running branch A task')

def task_b():
    print('Running branch B task')

def final_task():
    print('Final task after branching')

default_args = {
    'start_date': days_ago(1)
}

dag = DAG('branching_example', default_args=default_args, schedule_interval=None)

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=choose_branch,
    dag=dag
)

task_branch_a = PythonOperator(
    task_id='task_branch_a',
    python_callable=task_a,
    dag=dag
)

task_branch_b = PythonOperator(
    task_id='task_branch_b',
    python_callable=task_b,
    dag=dag
)

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed_or_skipped',
    dag=dag
)

branching >> [task_branch_a, task_branch_b] >> join

This DAG shows how branching works in Airflow.

choose_branch decides which branch to run based on a parameter.

BranchPythonOperator runs this decision function.

task_branch_a and task_branch_b are the two possible paths.

DummyOperator join waits for whichever branch runs to finish before continuing.

Commands
This command starts the DAG and tells it to run branch A by passing a configuration parameter.
Terminal
airflow dags trigger branching_example --conf '{"run_branch": "branch_a"}'
Expected OutputExpected
{"dag_run_id": "manual__2024-06-01T00:00:00+00:00", "state": "running"}
--conf - Pass JSON configuration to the DAG run
Lists all tasks in the DAG so you can see the branching and other tasks.
Terminal
airflow tasks list branching_example
Expected OutputExpected
branching join task_branch_a task_branch_b
Runs the branching task alone for the given date to see which branch it chooses.
Terminal
airflow tasks test branching_example branching 2024-06-01
Expected OutputExpected
Running choose_branch [2024-06-01 00:00:00,000] {python_operator.py:114} INFO - Done. Returned value was: task_branch_a
Runs the branch A task alone to verify it works when chosen.
Terminal
airflow tasks test branching_example task_branch_a 2024-06-01
Expected OutputExpected
Running branch A task [2024-06-01 00:00:00,000] {python_operator.py:114} INFO - Done. Returned value was: None
Key Concept

Branching lets your workflow choose different paths based on conditions, so only the right tasks run.

Common Mistakes
Returning a task ID that does not exist in the DAG from the branching function.
Airflow will fail because it cannot find the task to run next.
Always return a valid task ID defined in the DAG from the branching function.
Not using a join or downstream task after branching.
The DAG may end prematurely or not continue properly after branches finish.
Use a DummyOperator or other task to join branches and continue the workflow.
Summary
BranchPythonOperator runs a function that decides which task to run next based on conditions.
You define multiple tasks for different branches and return the chosen task's ID from the function.
Use a join task to wait for whichever branch runs before continuing the workflow.