0
0
Ml-pythonHow-ToBeginner ยท 4 min read

How to Use Airflow for ML Pipeline: Step-by-Step Guide

Use Apache Airflow to create a directed acyclic graph (DAG) that defines your ML pipeline steps as tasks. Each task runs a part of the ML workflow like data preprocessing, training, and evaluation, scheduled and monitored by Airflow.
๐Ÿ“

Syntax

An Airflow ML pipeline is defined as a DAG (Directed Acyclic Graph) where each node is a task. Tasks are created using operators like PythonOperator to run Python functions. The DAG defines task order and scheduling.

  • DAG: The workflow container with schedule and default args.
  • Task: A unit of work, e.g., data cleaning or model training.
  • Operator: Defines what the task does, e.g., run Python code.
  • Dependencies: Set task order with task1 >> task2.
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def preprocess():
    print('Preprocessing data')

def train():
    print('Training model')

def evaluate():
    print('Evaluating model')

with DAG('ml_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    task_preprocess = PythonOperator(task_id='preprocess', python_callable=preprocess)
    task_train = PythonOperator(task_id='train', python_callable=train)
    task_evaluate = PythonOperator(task_id='evaluate', python_callable=evaluate)

    task_preprocess >> task_train >> task_evaluate
๐Ÿ’ป

Example

This example shows a simple ML pipeline with three tasks: preprocessing data, training a model, and evaluating it. Each task prints a message to simulate the step. The tasks run in order daily.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def preprocess():
    print('Preprocessing data')

def train():
    print('Training model')

def evaluate():
    print('Evaluating model')

with DAG('ml_pipeline_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    preprocess_task = PythonOperator(task_id='preprocess', python_callable=preprocess)
    train_task = PythonOperator(task_id='train', python_callable=train)
    evaluate_task = PythonOperator(task_id='evaluate', python_callable=evaluate)

    preprocess_task >> train_task >> evaluate_task

# To run this DAG, place it in Airflow's DAG folder and start the Airflow scheduler and webserver.
Output
Preprocessing data Training model Evaluating model
โš ๏ธ

Common Pitfalls

  • Not setting task dependencies: Tasks run in parallel by default, so missing >> causes wrong order.
  • Using heavy computations in tasks: Avoid long blocking code; use external services or batch jobs.
  • Ignoring task retries and failure handling: Set retries and alerts to handle failures gracefully.
  • Not using Airflow variables or connections: Hardcoding credentials or paths reduces flexibility and security.
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def train():
    print('Training model')

def evaluate():
    print('Evaluating model')

with DAG('wrong_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    train_task = PythonOperator(task_id='train', python_callable=train)
    evaluate_task = PythonOperator(task_id='evaluate', python_callable=evaluate)

    # Missing dependency: tasks run in parallel (wrong)

# Correct way:
# train_task >> evaluate_task
๐Ÿ“Š

Quick Reference

Remember these tips when building ML pipelines with Airflow:

  • Define your workflow as a DAG with clear task dependencies.
  • Use PythonOperator for Python code tasks.
  • Schedule pipelines with schedule_interval (e.g., '@daily').
  • Set start_date and catchup=False to control runs.
  • Use Airflow UI to monitor and troubleshoot tasks.
โœ…

Key Takeaways

Airflow manages ML pipelines by defining tasks in a DAG with clear dependencies.
Use PythonOperator to run Python functions for each ML step like preprocessing and training.
Always set task order with >> to ensure correct execution sequence.
Schedule pipelines with schedule_interval and control runs with start_date and catchup.
Monitor pipelines via Airflow UI and handle failures with retries and alerts.