Bird
Raised Fist0
MLOpsdevops~5 mins

Pipeline scheduling and triggers in MLOps - Commands & Configuration

Choose your learning style10 modes available

Start learning this pattern below

Jump into concepts and practice - no test required

or
Recommended
Test this pattern10 questions across easy, medium, and hard to know if this pattern is strong
Introduction
Scheduling pipelines lets you run your data or model tasks automatically at set times or when something important happens. Triggers start pipelines without you needing to run them manually, saving time and avoiding mistakes.
When you want your machine learning model to retrain every day at midnight without manual work
When you need to run data cleaning steps every hour to keep data fresh
When you want to start a pipeline automatically after new data files arrive in storage
When you want to test your model every time new code is pushed to your project
When you want to run a pipeline only when a specific event, like a manual approval, happens
Config File - pipeline_schedule.py
pipeline_schedule.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.sensors.filesystem import FileSensor
from datetime import timedelta

def train_model():
    print("Training model...")

def process_data():
    print("Processing data...")

with DAG(
    dag_id='daily_model_training',
    default_args={
        'owner': 'mlops_team',
        'start_date': days_ago(1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='0 0 * * *',  # every day at midnight
    catchup=False
) as dag:

    wait_for_data = FileSensor(
        task_id='wait_for_data_file',
        filepath='/data/new_data.csv',
        poke_interval=30,
        timeout=600
    )

    data_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    train_task = PythonOperator(
        task_id='train_model',
        python_callable=train_model
    )

    wait_for_data >> data_task >> train_task

This Airflow DAG defines a pipeline that runs every day at midnight (schedule_interval='0 0 * * *').

The FileSensor waits for a new data file to appear before starting the pipeline.

Then it runs data processing and model training tasks in order.

This setup automates running the pipeline only when new data is ready and on a schedule.

Commands
Lists all available pipelines (DAGs) in Airflow to confirm your pipeline is recognized.
Terminal
airflow dags list
Expected OutputExpected
dag_id | filepath -----------------|--------------------- daily_model_training | /home/user/airflow/dags/pipeline_schedule.py
Manually triggers the daily_model_training pipeline to test it immediately.
Terminal
airflow dags trigger daily_model_training
Expected OutputExpected
Created <DagRun daily_model_training @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
Shows all tasks in the daily_model_training pipeline to understand its steps.
Terminal
airflow tasks list daily_model_training
Expected OutputExpected
wait_for_data_file process_data train_model
Runs the train_model task for the pipeline on the specified date to check it works standalone.
Terminal
airflow tasks test daily_model_training train_model 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:876} INFO - Executing <Task(PythonOperator): train_model> on 2024-06-01 Training model... [2024-06-01 12:00:00,100] {taskinstance.py:1020} INFO - Marking task as SUCCESS. dag_id=daily_model_training, task_id=train_model, execution_date=2024-06-01
Key Concept

If you remember nothing else from this pattern, remember: scheduling runs pipelines automatically on a time plan, while triggers start pipelines when events happen.

Common Mistakes
Setting schedule_interval to None and expecting the pipeline to run automatically on a schedule
A None schedule means no automatic runs; the pipeline only runs when manually triggered.
Set schedule_interval to a valid cron expression like '0 0 * * *' for daily runs.
Not using sensors or triggers to wait for required data before starting pipeline tasks
Pipeline may start too early and fail because data is not ready yet.
Use sensors like FileSensor to wait for data availability before running tasks.
Triggering pipelines without checking task dependencies
Tasks may run out of order causing errors or incomplete processing.
Define task dependencies clearly using >> or set_upstream/set_downstream.
Summary
Define pipeline schedule using schedule_interval to automate runs at set times.
Use triggers like sensors to start pipelines when specific events or conditions happen.
Test pipelines and tasks manually with airflow CLI commands to ensure correct behavior.

Practice

(1/5)
1. What is the main purpose of pipeline scheduling in MLOps?
easy
A. To store pipeline logs for debugging
B. To manually start pipelines whenever needed
C. To run tasks automatically at specific times without manual intervention
D. To create new machine learning models from scratch

Solution

  1. Step 1: Understand pipeline scheduling

    Pipeline scheduling is designed to run tasks automatically at set times, like daily or hourly, without needing a person to start them.
  2. Step 2: Compare options

    Only To run tasks automatically at specific times without manual intervention describes automatic running at specific times. Other options describe manual actions or unrelated tasks.
  3. Final Answer:

    To run tasks automatically at specific times without manual intervention -> Option C
  4. Quick Check:

    Pipeline scheduling = automatic timed runs [OK]
Hint: Scheduling means automatic runs at set times [OK]
Common Mistakes:
  • Confusing scheduling with manual triggering
  • Thinking scheduling stores logs
  • Assuming scheduling creates models directly
2. Which of the following is a correct cron expression to schedule a pipeline to run every day at 3 AM?
easy
A. 3 0 * * *
B. 0 3 * * *
C. * 3 * * *
D. 0 0 3 * * *

Solution

  1. Step 1: Understand cron format

    Cron syntax is: minute hour day month weekday. To run at 3 AM daily, minute=0, hour=3, day/month/weekday=any (*).
  2. Step 2: Match expression

    0 3 * * * "0 3 * * *" means minute 0, hour 3, every day. Others have wrong order or extra fields.
  3. Final Answer:

    0 3 * * * -> Option B
  4. Quick Check:

    Minute=0, Hour=3 daily = 0 3 * * * [OK]
Hint: Cron: minute hour day month weekday; 3 AM is '0 3 * * *' [OK]
Common Mistakes:
  • Swapping hour and minute fields
  • Adding extra fields in cron
  • Using '*' in wrong positions
3. Given this pipeline trigger configuration snippet:
{
  "trigger": {
    "event": "data_arrival",
    "filter": {
      "file_type": "csv"
    }
  }
}

What happens when a new JSON file arrives in the data folder?
medium
A. The pipeline does not run because the file type is not CSV
B. The pipeline runs because any new file triggers it
C. The pipeline runs only if the JSON file is large
D. The pipeline runs but ignores the file type

Solution

  1. Step 1: Analyze trigger filter

    The trigger listens for 'data_arrival' events but only runs if the file type is 'csv'.
  2. Step 2: Apply to JSON file

    A JSON file does not match the 'csv' filter, so the pipeline will not run.
  3. Final Answer:

    The pipeline does not run because the file type is not CSV -> Option A
  4. Quick Check:

    Filter file_type=csv blocks JSON files [OK]
Hint: Triggers with filters run only on matching events [OK]
Common Mistakes:
  • Ignoring filter conditions
  • Assuming any file triggers pipeline
  • Confusing event type with file type
4. You wrote this cron expression to schedule a pipeline every hour:
60 * * * *

Why does the pipeline never run?
medium
A. Because the hour field is missing
B. Because cron requires seconds field
C. Because the asterisks are misplaced
D. Because 60 is not a valid minute value in cron syntax

Solution

  1. Step 1: Check minute field validity

    Cron minute values must be 0-59. '60' is invalid and causes no runs.
  2. Step 2: Confirm other fields

    The hour and other fields are correct as '*', meaning every hour/day. The error is only the minute value.
  3. Final Answer:

    Because 60 is not a valid minute value in cron syntax -> Option D
  4. Quick Check:

    Minute must be 0-59; 60 is invalid [OK]
Hint: Minutes in cron go 0-59, never 60 [OK]
Common Mistakes:
  • Using 60 as minute value
  • Thinking cron needs seconds field
  • Misplacing asterisks
5. You want a pipeline to run automatically when new data arrives and also every Sunday at midnight. Which setup correctly combines scheduling and event triggers?
hard
A. Use a cron schedule '0 0 * * 0' and an event trigger for 'data_arrival' together
B. Use only a cron schedule '0 0 * * 0' because event triggers conflict with schedules
C. Use only an event trigger for 'data_arrival' and manually run on Sundays
D. Use a cron schedule '0 0 * * 7' and ignore event triggers

Solution

  1. Step 1: Understand combined triggers

    Pipelines can have both cron schedules and event triggers to run on different conditions.
  2. Step 2: Verify cron expression for Sunday midnight

    '0 0 * * 0' runs at midnight on Sundays (0 or 7 can represent Sunday, but 0 is standard).
  3. Step 3: Confirm event trigger for data arrival

    Adding an event trigger for 'data_arrival' ensures pipeline runs when new data arrives.
  4. Final Answer:

    Use a cron schedule '0 0 * * 0' and an event trigger for 'data_arrival' together -> Option A
  5. Quick Check:

    Combine cron and event triggers for full automation [OK]
Hint: Combine cron and event triggers for multiple run conditions [OK]
Common Mistakes:
  • Thinking schedules and triggers cannot coexist
  • Using wrong cron day for Sunday
  • Ignoring event triggers for data arrival