0
0
MLOpsdevops~5 mins

Pipeline scheduling and triggers in MLOps - Commands & Configuration

Choose your learning style9 modes available
Introduction
Scheduling pipelines lets you run your data or model tasks automatically at set times or when something important happens. Triggers start pipelines without you needing to run them manually, saving time and avoiding mistakes.
When you want your machine learning model to retrain every day at midnight without manual work
When you need to run data cleaning steps every hour to keep data fresh
When you want to start a pipeline automatically after new data files arrive in storage
When you want to test your model every time new code is pushed to your project
When you want to run a pipeline only when a specific event, like a manual approval, happens
Config File - pipeline_schedule.py
pipeline_schedule.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.sensors.filesystem import FileSensor
from datetime import timedelta

def train_model():
    print("Training model...")

def process_data():
    print("Processing data...")

with DAG(
    dag_id='daily_model_training',
    default_args={
        'owner': 'mlops_team',
        'start_date': days_ago(1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='0 0 * * *',  # every day at midnight
    catchup=False
) as dag:

    wait_for_data = FileSensor(
        task_id='wait_for_data_file',
        filepath='/data/new_data.csv',
        poke_interval=30,
        timeout=600
    )

    data_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    train_task = PythonOperator(
        task_id='train_model',
        python_callable=train_model
    )

    wait_for_data >> data_task >> train_task

This Airflow DAG defines a pipeline that runs every day at midnight (schedule_interval='0 0 * * *').

The FileSensor waits for a new data file to appear before starting the pipeline.

Then it runs data processing and model training tasks in order.

This setup automates running the pipeline only when new data is ready and on a schedule.

Commands
Lists all available pipelines (DAGs) in Airflow to confirm your pipeline is recognized.
Terminal
airflow dags list
Expected OutputExpected
dag_id | filepath -----------------|--------------------- daily_model_training | /home/user/airflow/dags/pipeline_schedule.py
Manually triggers the daily_model_training pipeline to test it immediately.
Terminal
airflow dags trigger daily_model_training
Expected OutputExpected
Created <DagRun daily_model_training @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
Shows all tasks in the daily_model_training pipeline to understand its steps.
Terminal
airflow tasks list daily_model_training
Expected OutputExpected
wait_for_data_file process_data train_model
Runs the train_model task for the pipeline on the specified date to check it works standalone.
Terminal
airflow tasks test daily_model_training train_model 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:876} INFO - Executing <Task(PythonOperator): train_model> on 2024-06-01 Training model... [2024-06-01 12:00:00,100] {taskinstance.py:1020} INFO - Marking task as SUCCESS. dag_id=daily_model_training, task_id=train_model, execution_date=2024-06-01
Key Concept

If you remember nothing else from this pattern, remember: scheduling runs pipelines automatically on a time plan, while triggers start pipelines when events happen.

Common Mistakes
Setting schedule_interval to None and expecting the pipeline to run automatically on a schedule
A None schedule means no automatic runs; the pipeline only runs when manually triggered.
Set schedule_interval to a valid cron expression like '0 0 * * *' for daily runs.
Not using sensors or triggers to wait for required data before starting pipeline tasks
Pipeline may start too early and fail because data is not ready yet.
Use sensors like FileSensor to wait for data availability before running tasks.
Triggering pipelines without checking task dependencies
Tasks may run out of order causing errors or incomplete processing.
Define task dependencies clearly using >> or set_upstream/set_downstream.
Summary
Define pipeline schedule using schedule_interval to automate runs at set times.
Use triggers like sensors to start pipelines when specific events or conditions happen.
Test pipelines and tasks manually with airflow CLI commands to ensure correct behavior.