How to Mark a Task as Failed in Airflow: Simple Guide
In Airflow, you can mark a task as failed by raising an exception inside the task's Python function or by calling
task_instance.set_state('failed') in the task context. This tells Airflow the task did not complete successfully and triggers failure handling.Syntax
To mark a task as failed programmatically, you typically use the task_instance.set_state() method or raise an exception inside the task function.
task_instance.set_state('failed'): Sets the task state to failed explicitly.- Raising an exception: Automatically marks the task as failed when the exception is not caught.
python
def my_task(**context): ti = context['ti'] # Mark task as failed explicitly ti.set_state('failed')
Example
This example shows a PythonOperator task that marks itself as failed by raising an exception. Airflow will catch the exception and mark the task as failed automatically.
python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago def fail_task(): raise Exception('Failing this task intentionally') def explicit_fail_task(**context): ti = context['ti'] ti.set_state('failed') with DAG(dag_id='fail_task_example', start_date=days_ago(1), schedule_interval=None) as dag: task1 = PythonOperator( task_id='fail_by_exception', python_callable=fail_task ) task2 = PythonOperator( task_id='fail_explicitly', python_callable=explicit_fail_task )
Output
When running this DAG, task 'fail_by_exception' fails with an exception message: "Failing this task intentionally". Task 'fail_explicitly' is marked as failed by calling set_state('failed').
Common Pitfalls
Common mistakes when marking tasks as failed include:
- Not raising an exception or calling
set_state('failed'), so Airflow thinks the task succeeded. - Calling
set_state('failed')outside the task context or without proper access totask_instance. - Swallowing exceptions with try-except blocks without re-raising, which prevents failure detection.
python
def wrong_fail_task(): try: # Some code that fails raise Exception('Error') except Exception: pass # This hides the failure, task will be marked success def right_fail_task(): # Just raise the exception to fail the task raise Exception('Error')
Quick Reference
Summary tips to mark a task as failed in Airflow:
- Raise an exception inside the task function to fail automatically.
- Use
task_instance.set_state('failed')to fail explicitly. - Ensure you have access to
task_instancevia context (provide_context=Truein PythonOperator). - Avoid catching exceptions without re-raising to prevent false success.
Key Takeaways
Raise an exception inside your task to mark it as failed automatically.
Use task_instance.set_state('failed') to explicitly mark failure when needed.
Always access task_instance from context to change task state programmatically.
Avoid catching exceptions without re-raising to ensure Airflow detects failures.
Marking tasks as failed triggers Airflow's failure handling and retries if configured.