0
0
MLOpsdevops~5 mins

Apache Airflow for ML orchestration in MLOps - Commands & Configuration

Choose your learning style9 modes available
Introduction
Apache Airflow helps you automate and schedule tasks in machine learning projects. It solves the problem of running complex workflows step-by-step without manual work.
When you want to run data preprocessing, model training, and evaluation in order automatically.
When you need to retry failed ML tasks without starting everything over.
When you want to track the order and timing of ML pipeline steps.
When you want to run ML workflows on a schedule, like daily model retraining.
When you want to visualize the progress and status of your ML tasks.
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def preprocess():
    print('Data preprocessing step')

def train_model():
    print('Model training step')

def evaluate():
    print('Model evaluation step')

def notify():
    print('Notify completion')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ml_pipeline',
    default_args=default_args,
    description='Simple ML pipeline',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

preprocess_task = PythonOperator(
    task_id='preprocess',
    python_callable=preprocess,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate',
    python_callable=evaluate,
    dag=dag,
)

notify_task = PythonOperator(
    task_id='notify',
    python_callable=notify,
    dag=dag,
)

preprocess_task >> train_task >> evaluate_task >> notify_task

This file defines a Directed Acyclic Graph (DAG) named 'ml_pipeline' that runs daily starting from January 1, 2024.

Each PythonOperator runs a Python function representing a step: preprocess, train_model, evaluate, and notify.

The tasks are linked in order so they run one after another.

Retries are set to 1 with a 5-minute delay if a task fails.

Commands
This command lists all the DAGs Airflow knows about, to check if your ML pipeline DAG is recognized.
Terminal
airflow dags list
Expected OutputExpected
dag_id | owner | paused ml_pipeline | airflow | False
This command manually starts the ML pipeline DAG to run the workflow immediately.
Terminal
airflow dags trigger ml_pipeline
Expected OutputExpected
Created <DagRun ml_pipeline @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
This command shows all the tasks defined in the ml_pipeline DAG to verify the steps.
Terminal
airflow tasks list ml_pipeline
Expected OutputExpected
preprocess train_model evaluate notify
This command runs the 'preprocess' task for the given date without scheduling the whole DAG, useful for testing a single step.
Terminal
airflow tasks test ml_pipeline preprocess 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:1234} INFO - Running preprocess Data preprocessing step [2024-06-01 12:00:01,000] {taskinstance.py:1234} INFO - Task preprocess succeeded
Key Concept

If you remember nothing else from this pattern, remember: Airflow runs your ML steps in order automatically and lets you retry and schedule them easily.

Common Mistakes
Not setting start_date or setting it to a future date
Airflow will not run the DAG because it thinks the start date is not reached yet.
Set start_date to a past date or today to allow the DAG to run immediately.
Not linking tasks with >> or set_upstream/set_downstream
Tasks run independently and not in the intended order, breaking the workflow.
Use >> to chain tasks in the correct sequence.
Running airflow tasks test without specifying the correct execution date
The task may not run or run with wrong context, causing confusion.
Always provide a valid execution date matching your DAG schedule.
Summary
Define your ML workflow steps as Python functions and link them in a DAG file.
Use airflow commands to list, trigger, and test your ML pipeline tasks.
Set start_date and retries in default_args to control scheduling and failure handling.