Bird
Raised Fist0
MLOpsdevops~10 mins

Apache Airflow for ML orchestration in MLOps - Step-by-Step Execution

Choose your learning style10 modes available

Start learning this pattern below

Jump into concepts and practice - no test required

or
Recommended
Test this pattern10 questions across easy, medium, and hard to know if this pattern is strong
Process Flow - Apache Airflow for ML orchestration
Define DAG and tasks
Schedule DAG run
Trigger task 1: Data ingestion
Trigger task 2: Data preprocessing
Trigger task 3: Model training
Trigger task 4: Model evaluation
Trigger task 5: Model deployment
DAG run complete
Airflow runs a Directed Acyclic Graph (DAG) of tasks in order, scheduling and executing each step of the ML workflow automatically.
Execution Sample
MLOps
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def ingest():
    print('Data ingested')

def preprocess():
    print('Data preprocessed')

with DAG('ml_pipeline', start_date=datetime(2024,1,1), schedule_interval='@daily') as dag:
    t1 = PythonOperator(task_id='ingest', python_callable=ingest)
    t2 = PythonOperator(task_id='preprocess', python_callable=preprocess)
    t1 >> t2
Defines a simple Airflow DAG with two tasks: ingest data and preprocess data, where preprocess runs after ingest.
Process Table
StepTask TriggeredTask StatusOutputNext Task
1ingestrunningData ingestedpreprocess
2ingestsuccessData ingestedpreprocess
3preprocessrunningData preprocessednone
4preprocesssuccessData preprocessedDAG complete
💡 All tasks completed successfully, DAG run finished.
Status Tracker
VariableStartAfter Step 1After Step 2After Step 3After Step 4Final
Task Status (ingest)nonerunningsuccesssuccesssuccesssuccess
Task Status (preprocess)nonenonenonerunningsuccesssuccess
DAG Run Statusnonerunningrunningrunningrunningsuccess
Key Moments - 2 Insights
Why does the 'preprocess' task wait before running?
Because in the execution_table, 'preprocess' starts only after 'ingest' shows 'success' at Step 2, showing task dependencies.
What happens if a task fails during execution?
The DAG run stops or retries depending on configuration; here, all tasks succeed as shown by 'success' status in the table.
Visual Quiz - 3 Questions
Test your understanding
Look at the execution_table, what is the status of 'ingest' at Step 2?
Arunning
Bfailed
Csuccess
Dqueued
💡 Hint
Check the 'Task Status' column for 'ingest' at Step 2 in the execution_table.
At which step does the 'preprocess' task start running?
AStep 3
BStep 2
CStep 1
DStep 4
💡 Hint
Look for when 'preprocess' has status 'running' in the execution_table.
If the 'ingest' task failed at Step 2, what would happen to the DAG run?
AThe DAG would continue to 'preprocess' anyway
BThe DAG run would stop or retry 'ingest'
CThe DAG would skip 'preprocess' and finish
DThe DAG would restart from the beginning
💡 Hint
Refer to the key_moments explanation about task failure impact on DAG execution.
Concept Snapshot
Apache Airflow runs ML workflows as DAGs.
Define tasks and their order with operators.
Airflow schedules and runs tasks automatically.
Tasks run only after dependencies succeed.
Failures can stop or retry tasks.
Use PythonOperators for Python code tasks.
Full Transcript
Apache Airflow helps automate ML workflows by defining tasks in a DAG. Each task runs in order, waiting for dependencies to finish. For example, data ingestion runs first, then preprocessing. The execution table shows task status changes from running to success. If a task fails, the DAG run may stop or retry. This visual trace helps understand how Airflow manages ML pipelines step-by-step.

Practice

(1/5)
1. What is the main purpose of Apache Airflow in ML orchestration?
easy
A. To store large datasets for ML training
B. To write ML model code in Python
C. To visualize ML model performance metrics
D. To automate and schedule ML workflows as directed tasks

Solution

  1. Step 1: Understand Airflow's role

    Apache Airflow is designed to automate workflows by scheduling and running tasks in order.
  2. Step 2: Differentiate from other ML tools

    It does not store data, visualize metrics, or write model code but manages task execution.
  3. Final Answer:

    To automate and schedule ML workflows as directed tasks -> Option D
  4. Quick Check:

    Airflow = workflow automation [OK]
Hint: Airflow schedules tasks, not data or model code [OK]
Common Mistakes:
  • Confusing Airflow with data storage tools
  • Thinking Airflow writes ML model code
  • Assuming Airflow visualizes model metrics
2. Which of the following is the correct way to define a DAG in Apache Airflow using Python?
easy
A. dag = DAG('my_dag', run_every='daily')
B. dag = DAG('my_dag', schedule_interval='@daily')
C. dag = DAG('my_dag', interval='daily')
D. dag = DAG('my_dag', schedule='daily')

Solution

  1. Step 1: Recall DAG initialization syntax

    The correct parameter to set schedule is schedule_interval, not run_every, interval, or schedule.
  2. Step 2: Verify the example

    dag = DAG('my_dag', schedule_interval='@daily') is the standard syntax to schedule daily runs.
  3. Final Answer:

    dag = DAG('my_dag', schedule_interval='@daily') -> Option B
  4. Quick Check:

    Use schedule_interval to set DAG timing [OK]
Hint: Use schedule_interval to set DAG timing [OK]
Common Mistakes:
  • Using incorrect parameter names like run_every
  • Confusing schedule_interval with schedule
  • Forgetting to use quotes around '@daily'
3. Given the following Airflow DAG snippet, what will be the order of task execution?
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def task_a():
    print('Task A')

def task_b():
    print('Task B')

def task_c():
    print('Task C')

dag = DAG('example_dag', start_date=datetime(2024, 1, 1), schedule_interval='@once')

t1 = PythonOperator(task_id='a', python_callable=task_a, dag=dag)
t2 = PythonOperator(task_id='b', python_callable=task_b, dag=dag)
t3 = PythonOperator(task_id='c', python_callable=task_c, dag=dag)

t1 >> t2 >> t3
medium
A. Task A, then Task B, then Task C
B. Task C, then Task B, then Task A
C. Task A, Task B, and Task C run in parallel
D. Task B, then Task A, then Task C

Solution

  1. Step 1: Understand task dependencies

    The operator chaining t1 >> t2 >> t3 means t1 runs first, then t2, then t3.
  2. Step 2: Confirm execution order

    Tasks print in order: Task A, Task B, Task C.
  3. Final Answer:

    Task A, then Task B, then Task C -> Option A
  4. Quick Check:

    Operator chaining sets task order [OK]
Hint: >> means run left task before right task [OK]
Common Mistakes:
  • Assuming tasks run in parallel without dependencies
  • Misreading the >> operator order
  • Confusing task IDs with execution order
4. You wrote this Airflow DAG code but get an error: TypeError: DAG.__init__() got an unexpected keyword argument 'start'
What is the likely cause?
dag = DAG('my_dag', start='2024-01-01', schedule_interval='@daily')
medium
A. The parameter should be start_date, not start
B. The schedule_interval value '@daily' is invalid
C. DAG name cannot be 'my_dag'
D. Missing import for datetime module

Solution

  1. Step 1: Identify incorrect parameter

    The error says start is unexpected; Airflow expects start_date.
  2. Step 2: Confirm correct parameter usage

    Replacing start with start_date fixes the error.
  3. Final Answer:

    The parameter should be start_date, not start -> Option A
  4. Quick Check:

    Use start_date, not start [OK]
Hint: Use start_date, not start, for DAG start time [OK]
Common Mistakes:
  • Using 'start' instead of 'start_date'
  • Assuming '@daily' is invalid schedule
  • Ignoring error message details
5. You want to create an Airflow DAG that runs an ML training task only if data preprocessing succeeded. Which Airflow feature should you use to enforce this dependency?
hard
A. Schedule both tasks to run at the same time
B. Use Airflow Variables to store task status
C. Set task dependencies using >> operator between preprocessing and training tasks
D. Write a single Python function combining both tasks

Solution

  1. Step 1: Understand task dependency in Airflow

    Airflow uses task dependencies to control execution order, ensuring one task runs after another succeeds.
  2. Step 2: Apply dependency operator

    Using the >> operator sets the training task to run only after preprocessing completes successfully.
  3. Final Answer:

    Set task dependencies using >> operator between preprocessing and training tasks -> Option C
  4. Quick Check:

    Use >> to enforce task order [OK]
Hint: Use >> to link tasks in order [OK]
Common Mistakes:
  • Thinking Variables control task order
  • Scheduling tasks simultaneously without dependencies
  • Combining tasks loses modularity and control