0
0
Apache Airflowdevops~5 mins

Idempotent task design in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
When running tasks repeatedly, you want to avoid doing the same work twice by mistake. Idempotent task design means writing tasks so they can run many times without causing errors or duplicate results.
When a task might be retried automatically after failure and you want to avoid duplicate data processing.
When you schedule tasks to run regularly and want to ensure they do not repeat the same work unnecessarily.
When multiple workers might run the same task and you want to prevent conflicts or repeated side effects.
When tasks update external systems and you want to keep data consistent even if the task runs multiple times.
When debugging or testing tasks by running them multiple times without causing errors.
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import os

def write_unique_file():
    filename = '/tmp/unique_output.txt'
    if not os.path.exists(filename):
        with open(filename, 'w') as f:
            f.write('This file is created only once.')
    else:
        print('File already exists, skipping write.')

def print_message():
    print('Task ran successfully.')

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'idempotent_task_example',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

write_file_task = PythonOperator(
    task_id='write_unique_file',
    python_callable=write_unique_file,
    dag=dag
)

print_message_task = PythonOperator(
    task_id='print_message',
    python_callable=print_message,
    dag=dag
)

write_file_task >> print_message_task

This Airflow DAG defines two tasks. The first task write_unique_file writes a file only if it does not already exist, making it idempotent. The second task print_message simply prints a message. The DAG runs daily without catching up missed runs.

The key part is the check if not os.path.exists(filename) which prevents repeated writes and side effects.

Commands
List all available DAGs to confirm the new DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
idempotent_task_example example_bash_operator example_python_operator
Manually trigger the DAG to run the tasks immediately and test idempotency.
Terminal
airflow dags trigger idempotent_task_example
Expected OutputExpected
Created <DagRun id=idempotent_task_example__2024-06-01T00:00:00+00:00, dag_id=idempotent_task_example, run_id=manual__2024-06-01T00:00:00+00:00, state=running>
Check the logs of the write_unique_file task to see if the file was created or skipped.
Terminal
airflow tasks logs idempotent_task_example write_unique_file --execution-date 2024-06-01T00:00:00+00:00
Expected OutputExpected
File already exists, skipping write. [2024-06-01 00:00:01,000] {logging_mixin.py:112} INFO - Task exited with return code 0
Check the logs of the print_message task to confirm it ran successfully.
Terminal
airflow tasks logs idempotent_task_example print_message --execution-date 2024-06-01T00:00:00+00:00
Expected OutputExpected
Task ran successfully. [2024-06-01 00:00:02,000] {logging_mixin.py:112} INFO - Task exited with return code 0
Key Concept

If you remember nothing else from this pattern, remember: design tasks so they can run multiple times without causing duplicate work or errors.

Common Mistakes
Writing tasks that always perform the same action without checking previous state.
This causes duplicate data, errors, or inconsistent system state when tasks retry or rerun.
Add checks like file existence, database flags, or idempotent APIs to avoid repeating side effects.
Assuming tasks run only once and ignoring retries or manual reruns.
Airflow retries failed tasks and allows manual reruns, so tasks must handle multiple executions safely.
Always write tasks with idempotency in mind to handle retries and reruns gracefully.
Summary
Create tasks that check if work is already done before acting to avoid duplicates.
Use Airflow PythonOperator to run idempotent Python functions.
Verify task behavior by checking logs after manual DAG triggers.