from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'start_date': days_ago(1),
}
dag = DAG(
'trigger_rule_example',
default_args=default_args,
schedule_interval=None,
catchup=False
)
# Task that always succeeds
success_task = BashOperator(
task_id='success_task',
bash_command='echo Success',
dag=dag
)
# Task that always fails
fail_task = BashOperator(
task_id='fail_task',
bash_command='exit 1',
dag=dag
)
# Task runs only if all upstream tasks succeed
all_success_task = BashOperator(
task_id='all_success_task',
bash_command='echo All upstream succeeded',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
# Task runs if at least one upstream task succeeds
one_success_task = BashOperator(
task_id='one_success_task',
bash_command='echo At least one upstream succeeded',
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=dag
)
# Task runs if no upstream tasks failed
none_failed_task = BashOperator(
task_id='none_failed_task',
bash_command='echo No upstream failed',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
success_task >> [all_success_task, one_success_task, none_failed_task]
fail_task >> [all_success_task, one_success_task, none_failed_task]
This DAG defines five tasks:
- success_task: always succeeds.
- fail_task: always fails.
- all_success_task: runs only if all upstream tasks succeed (all_success).
- one_success_task: runs if at least one upstream task succeeds (one_success).
- none_failed_task: runs if no upstream tasks failed (none_failed).
The last three tasks depend on both success_task and fail_task to show how trigger rules affect execution.