How to Use Airflow for ML Pipeline: Step-by-Step Guide
Use
Apache Airflow to create a directed acyclic graph (DAG) that defines your ML pipeline steps as tasks. Each task runs a part of the ML workflow like data preprocessing, training, and evaluation, scheduled and monitored by Airflow.Syntax
An Airflow ML pipeline is defined as a DAG (Directed Acyclic Graph) where each node is a task. Tasks are created using operators like PythonOperator to run Python functions. The DAG defines task order and scheduling.
- DAG: The workflow container with schedule and default args.
- Task: A unit of work, e.g., data cleaning or model training.
- Operator: Defines what the task does, e.g., run Python code.
- Dependencies: Set task order with
task1 >> task2.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def preprocess(): print('Preprocessing data') def train(): print('Training model') def evaluate(): print('Evaluating model') with DAG('ml_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag: task_preprocess = PythonOperator(task_id='preprocess', python_callable=preprocess) task_train = PythonOperator(task_id='train', python_callable=train) task_evaluate = PythonOperator(task_id='evaluate', python_callable=evaluate) task_preprocess >> task_train >> task_evaluate
Example
This example shows a simple ML pipeline with three tasks: preprocessing data, training a model, and evaluating it. Each task prints a message to simulate the step. The tasks run in order daily.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def preprocess(): print('Preprocessing data') def train(): print('Training model') def evaluate(): print('Evaluating model') with DAG('ml_pipeline_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag: preprocess_task = PythonOperator(task_id='preprocess', python_callable=preprocess) train_task = PythonOperator(task_id='train', python_callable=train) evaluate_task = PythonOperator(task_id='evaluate', python_callable=evaluate) preprocess_task >> train_task >> evaluate_task # To run this DAG, place it in Airflow's DAG folder and start the Airflow scheduler and webserver.
Output
Preprocessing data
Training model
Evaluating model
Common Pitfalls
- Not setting task dependencies: Tasks run in parallel by default, so missing
>>causes wrong order. - Using heavy computations in tasks: Avoid long blocking code; use external services or batch jobs.
- Ignoring task retries and failure handling: Set retries and alerts to handle failures gracefully.
- Not using Airflow variables or connections: Hardcoding credentials or paths reduces flexibility and security.
python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def train(): print('Training model') def evaluate(): print('Evaluating model') with DAG('wrong_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag: train_task = PythonOperator(task_id='train', python_callable=train) evaluate_task = PythonOperator(task_id='evaluate', python_callable=evaluate) # Missing dependency: tasks run in parallel (wrong) # Correct way: # train_task >> evaluate_task
Quick Reference
Remember these tips when building ML pipelines with Airflow:
- Define your workflow as a
DAGwith clear task dependencies. - Use
PythonOperatorfor Python code tasks. - Schedule pipelines with
schedule_interval(e.g., '@daily'). - Set
start_dateandcatchup=Falseto control runs. - Use Airflow UI to monitor and troubleshoot tasks.
Key Takeaways
Airflow manages ML pipelines by defining tasks in a DAG with clear dependencies.
Use PythonOperator to run Python functions for each ML step like preprocessing and training.
Always set task order with >> to ensure correct execution sequence.
Schedule pipelines with schedule_interval and control runs with start_date and catchup.
Monitor pipelines via Airflow UI and handle failures with retries and alerts.