0
0
Apache Airflowdevops~5 mins

Task failure callbacks in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes tasks in a workflow fail. Task failure callbacks let you run special code automatically when a task fails. This helps you react quickly, like sending alerts or cleaning up resources.
When you want to send an email notification if a task fails in your workflow
When you need to log extra information for debugging after a task failure
When you want to trigger a cleanup process if a task does not complete successfully
When you want to restart or rerun certain tasks automatically after failure
When you want to update a dashboard or monitoring system about task failures
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.email import send_email

def task_that_fails():
    raise Exception('This task fails on purpose')

def failure_callback(context):
    task_instance = context.get('task_instance')
    dag_run = context.get('dag_run')
    message = f"Task {task_instance.task_id} failed in DAG {dag_run.dag_id}"
    print(message)
    send_email(to='alert@example.com', subject='Task Failure Alert', html_content=message)

def success_task():
    print('This task runs if previous tasks succeed')

with DAG(dag_id='example_failure_callback', start_date=days_ago(1), schedule_interval='@daily', catchup=False) as dag:
    failing_task = PythonOperator(
        task_id='failing_task',
        python_callable=task_that_fails,
        on_failure_callback=failure_callback
    )

    success = PythonOperator(
        task_id='success_task',
        python_callable=success_task
    )

    failing_task >> success

This DAG defines two tasks: one that always fails and one that runs after it.

The failure_callback function runs automatically when failing_task fails. It prints a message and sends an email alert.

The on_failure_callback parameter links the callback function to the task.

Commands
List all available DAGs to confirm your DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
example_failure_callback
Manually start the DAG run to test the failure callback in action.
Terminal
airflow dags trigger example_failure_callback
Expected OutputExpected
Created <DagRun example_failure_callback @ 2024-06-01T00:00:00+00:00, manual__2024-06-01T00:00:00+00:00, externally triggered: True>
List tasks in the DAG to verify task names and order.
Terminal
airflow tasks list example_failure_callback
Expected OutputExpected
failing_task success_task
Run the failing task manually for the given date to see the failure and callback in action.
Terminal
airflow tasks test example_failure_callback failing_task 2024-06-01
Expected OutputExpected
Running: ['python', '-m', 'airflow', 'tasks', 'run', 'example_failure_callback', 'failing_task', '2024-06-01', '--local'] [2024-06-01 00:00:00,000] {python.py:114} INFO - Executing <Task(PythonOperator): failing_task> on 2024-06-01 [2024-06-01 00:00:00,100] {python.py:120} INFO - Running task_that_fails [2024-06-01 00:00:00,200] {taskinstance.py:1234} ERROR - This task fails on purpose [2024-06-01 00:00:00,300] {taskinstance.py:1250} INFO - Task failing_task failed Task failing_task failed in DAG example_failure_callback [2024-06-01 00:00:00,400] {email.py:56} INFO - Sending email to alert@example.com
Key Concept

If you remember nothing else from this pattern, remember: on_failure_callback lets you run custom code automatically whenever a task fails.

Common Mistakes
Not passing the context parameter to the failure callback function
Without context, the callback cannot access task details like task_id or dag_id, so it cannot report useful information.
Define the callback function to accept a single 'context' argument and use it to get task and DAG info.
Assigning on_failure_callback to the DAG instead of the individual task
The DAG-level callback runs on DAG failure, not on individual task failures, so task failures won't trigger it.
Set on_failure_callback on the specific task(s) that need failure handling.
Not importing or configuring email sending properly in the callback
The callback tries to send email but fails silently or errors out, so alerts are not sent.
Import airflow.utils.email.send_email and configure SMTP settings in Airflow to enable email sending.
Summary
Define a Python function that accepts a context argument to handle task failure events.
Assign this function to the on_failure_callback parameter of the task you want to monitor.
Trigger the DAG and observe the callback running automatically when the task fails.