0
0
Apache Airflowdevops~5 mins

Atomic operations in pipelines in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes, when running a series of tasks in a pipeline, you want to make sure that either all tasks succeed or none do. Atomic operations help by making the whole set of tasks behave like one single step that fully completes or fully fails, avoiding partial results.
When you want to load data into a database only if all data files are processed successfully.
When you need to update multiple systems and want to avoid partial updates that cause inconsistency.
When running a multi-step data transformation where failure in one step means the whole process should be retried.
When you want to ensure that temporary files are cleaned up only if all previous tasks succeed.
When you want to avoid triggering downstream tasks if any upstream task fails.
Config File - atomic_pipeline_dag.py
atomic_pipeline_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def task1():
    print("Task 1 completed")

def task2():
    print("Task 2 completed")

def task3():
    print("Task 3 completed")

with DAG(
    dag_id='atomic_pipeline',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False
) as dag:

    t1 = PythonOperator(
        task_id='task1',
        python_callable=task1
    )

    t2 = PythonOperator(
        task_id='task2',
        python_callable=task2
    )

    t3 = PythonOperator(
        task_id='task3',
        python_callable=task3
    )

    t1 >> t2 >> t3

This Airflow DAG defines three tasks that run one after another.

The tasks are simple Python functions printing messages.

The order t1 >> t2 >> t3 ensures they run sequentially.

If any task fails, the next tasks will not run, making the pipeline atomic in behavior.

Commands
List all available DAGs to confirm the new atomic_pipeline DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
dag_id | filepath atomic_pipeline | /usr/local/airflow/dags/atomic_pipeline_dag.py example_bash_operator | /usr/local/airflow/dags/example_bash_operator.py
Trigger the atomic_pipeline DAG to start running the tasks in order.
Terminal
airflow dags trigger atomic_pipeline
Expected OutputExpected
Created <DagRun atomic_pipeline @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the atomic_pipeline DAG to see the steps that will run.
Terminal
airflow tasks list atomic_pipeline
Expected OutputExpected
task1 task2 task3
Run task2 alone for the given date to test it independently without running the full DAG.
Terminal
airflow tasks test atomic_pipeline task2 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:xxxx} INFO - Executing <Task(PythonOperator): task2> on 2024-06-01 Task 2 completed [2024-06-01 12:00:00,100] {taskinstance.py:xxxx} INFO - Marking task as SUCCESS. dag_id=atomic_pipeline, task_id=task2, execution_date=2024-06-01
Key Concept

If you remember nothing else from this pattern, remember: chaining tasks with dependencies ensures the pipeline behaves atomically by stopping on failure.

Common Mistakes
Not setting task dependencies, so tasks run in parallel and partial results happen.
Without dependencies, tasks do not wait for each other, so failures in one task do not stop others, breaking atomicity.
Always define task order using >> or set_upstream/set_downstream to enforce sequential execution.
Ignoring task failures and manually marking tasks as success.
This hides real errors and causes downstream tasks to run even if previous steps failed.
Let Airflow handle task states automatically and fix errors instead of forcing success.
Summary
Define tasks as PythonOperators or other operators in an Airflow DAG.
Set task dependencies to run tasks sequentially for atomic behavior.
Trigger the DAG and monitor task execution to ensure all tasks succeed or the pipeline stops.