0
0
AirflowHow-ToBeginner · 3 min read

How to Mark a Task as Failed in Airflow: Simple Guide

In Airflow, you can mark a task as failed by raising an exception inside the task's Python function or by calling task_instance.set_state('failed') in the task context. This tells Airflow the task did not complete successfully and triggers failure handling.
📐

Syntax

To mark a task as failed programmatically, you typically use the task_instance.set_state() method or raise an exception inside the task function.

  • task_instance.set_state('failed'): Sets the task state to failed explicitly.
  • Raising an exception: Automatically marks the task as failed when the exception is not caught.
python
def my_task(**context):
    ti = context['ti']
    # Mark task as failed explicitly
    ti.set_state('failed')
💻

Example

This example shows a PythonOperator task that marks itself as failed by raising an exception. Airflow will catch the exception and mark the task as failed automatically.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def fail_task():
    raise Exception('Failing this task intentionally')

def explicit_fail_task(**context):
    ti = context['ti']
    ti.set_state('failed')

with DAG(dag_id='fail_task_example', start_date=days_ago(1), schedule_interval=None) as dag:
    task1 = PythonOperator(
        task_id='fail_by_exception',
        python_callable=fail_task
    )

    task2 = PythonOperator(
        task_id='fail_explicitly',
        python_callable=explicit_fail_task
    )
Output
When running this DAG, task 'fail_by_exception' fails with an exception message: "Failing this task intentionally". Task 'fail_explicitly' is marked as failed by calling set_state('failed').
⚠️

Common Pitfalls

Common mistakes when marking tasks as failed include:

  • Not raising an exception or calling set_state('failed'), so Airflow thinks the task succeeded.
  • Calling set_state('failed') outside the task context or without proper access to task_instance.
  • Swallowing exceptions with try-except blocks without re-raising, which prevents failure detection.
python
def wrong_fail_task():
    try:
        # Some code that fails
        raise Exception('Error')
    except Exception:
        pass  # This hides the failure, task will be marked success

def right_fail_task():
    # Just raise the exception to fail the task
    raise Exception('Error')
📊

Quick Reference

Summary tips to mark a task as failed in Airflow:

  • Raise an exception inside the task function to fail automatically.
  • Use task_instance.set_state('failed') to fail explicitly.
  • Ensure you have access to task_instance via context (provide_context=True in PythonOperator).
  • Avoid catching exceptions without re-raising to prevent false success.

Key Takeaways

Raise an exception inside your task to mark it as failed automatically.
Use task_instance.set_state('failed') to explicitly mark failure when needed.
Always access task_instance from context to change task state programmatically.
Avoid catching exceptions without re-raising to ensure Airflow detects failures.
Marking tasks as failed triggers Airflow's failure handling and retries if configured.