Bird
Raised Fist0
MLOpsdevops~5 mins

Pipeline components and DAGs in MLOps - Commands & Configuration

Choose your learning style10 modes available

Start learning this pattern below

Jump into concepts and practice - no test required

or
Recommended
Test this pattern10 questions across easy, medium, and hard to know if this pattern is strong
Introduction
When you build machine learning workflows, you need to organize tasks so they run in order. Pipeline components are the building blocks of these workflows, and DAGs (Directed Acyclic Graphs) show how these tasks connect and flow without loops.
When you want to automate data preprocessing, model training, and evaluation steps in one flow
When you need to run tasks in a specific order and handle dependencies between them
When you want to reuse parts of your workflow as separate components for different projects
When you want to visualize the flow of your machine learning pipeline clearly
When you want to schedule and monitor your ML workflows reliably
Commands
This Python script defines three pipeline components: preprocess, train, and evaluate. It then creates a pipeline that runs these components in order using a DAG structure. Finally, it compiles the pipeline to a YAML file for deployment.
Terminal
from kfp import dsl

@dsl.component
def preprocess_op():
    print('Preprocessing data')

@dsl.component
def train_op():
    print('Training model')

@dsl.component
def evaluate_op():
    print('Evaluating model')

@dsl.pipeline(name='ml-pipeline')
def ml_pipeline():
    preprocess = preprocess_op()
    train = train_op()
    train.after(preprocess)
    evaluate = evaluate_op()
    evaluate.after(train)

if __name__ == '__main__':
    import kfp
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
Expected OutputExpected
No output (command runs silently)
This command submits the compiled pipeline YAML to Kubeflow Pipelines to start a run named 'run1' under the experiment 'ml-experiment'. It triggers the execution of the DAG.
Terminal
kfp run submit --pipeline ml_pipeline.yaml --experiment ml-experiment --run-name run1
Expected OutputExpected
Run submitted successfully. Run ID: 12345 Pipeline: ml-pipeline Experiment: ml-experiment Run name: run1
--pipeline - Specifies the pipeline YAML file to run
--experiment - Names the experiment to group runs
--run-name - Gives a name to this specific run
This command lists all runs under the 'ml-experiment' experiment so you can check the status of your pipeline executions.
Terminal
kfp run list --experiment ml-experiment
Expected OutputExpected
Run ID Run Name Status Created At 12345 run1 Succeeded 2024-06-01 10:00:00
--experiment - Filters runs by experiment name
Key Concept

If you remember nothing else from this pattern, remember: pipeline components are tasks and DAGs define their order without loops.

Code Example
MLOps
from kfp import dsl

@dsl.component
def preprocess_op():
    print('Preprocessing data')

@dsl.component
def train_op():
    print('Training model')

@dsl.component
def evaluate_op():
    print('Evaluating model')

@dsl.pipeline(name='ml-pipeline')
def ml_pipeline():
    preprocess = preprocess_op()
    train = train_op()
    train.after(preprocess)
    evaluate = evaluate_op()
    evaluate.after(train)

if __name__ == '__main__':
    import kfp
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
    print('Pipeline compiled to ml_pipeline.yaml')
OutputSuccess
Common Mistakes
Not specifying the order of components in the pipeline
The tasks may run in any order or in parallel, causing errors or wrong results
Use the .after() method or set dependencies explicitly to define the execution order
Creating cycles in the DAG by making tasks depend on each other circularly
DAGs must not have loops; cycles cause the pipeline to fail or hang
Ensure dependencies form a directed acyclic graph with no loops
Summary
Define pipeline components as small tasks using @dsl.component decorator.
Create a pipeline function that connects components in order using .after() to form a DAG.
Compile the pipeline to a YAML file and submit it to run on Kubeflow Pipelines.
List runs to monitor pipeline execution status.

Practice

(1/5)
1. What does a Directed Acyclic Graph (DAG) represent in an MLOps pipeline?
easy
A. Tasks and their dependencies without any cycles
B. A loop of tasks that repeat indefinitely
C. Random tasks executed in parallel without order
D. Only the final output of a pipeline

Solution

  1. Step 1: Understand DAG structure

    A DAG is a graph with nodes and edges where edges show dependencies and no cycles exist.
  2. Step 2: Relate DAG to pipeline tasks

    In MLOps, tasks are nodes and dependencies are edges, ensuring tasks run in order without loops.
  3. Final Answer:

    Tasks and their dependencies without any cycles -> Option A
  4. Quick Check:

    DAG = tasks + dependencies without loops [OK]
Hint: DAG means no loops, just tasks linked in order [OK]
Common Mistakes:
  • Thinking DAG allows loops
  • Confusing DAG with random task order
  • Assuming DAG only shows final output
2. Which of the following is the correct syntax to define a simple DAG in Apache Airflow?
easy
A. dag = DAG('my_dag', interval='daily')
B. dag = DAG('my_dag' schedule='daily')
C. dag = DAG('my_dag', schedule='everyday')
D. dag = DAG('my_dag', schedule_interval='@daily')

Solution

  1. Step 1: Check Airflow DAG syntax

    The DAG constructor requires a name and a schedule_interval parameter for timing.
  2. Step 2: Validate options

    dag = DAG('my_dag', schedule_interval='@daily') uses correct parameter 'schedule_interval' with valid value '@daily'. Others use wrong parameter names or values.
  3. Final Answer:

    dag = DAG('my_dag', schedule_interval='@daily') -> Option D
  4. Quick Check:

    Correct DAG syntax uses schedule_interval [OK]
Hint: Use schedule_interval='@daily' for daily DAGs [OK]
Common Mistakes:
  • Using 'schedule' instead of 'schedule_interval'
  • Wrong interval value formats
  • Missing commas between parameters
3. Given this Airflow DAG snippet, what is the order of task execution?
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task1 >> task2 >> task3
medium
A. task3, then task2, then task1
B. task1, then task2, then task3
C. task2, then task1, then task3
D. All tasks run in parallel

Solution

  1. Step 1: Analyze task dependencies

    The '>>' operator sets order: task1 before task2, task2 before task3.
  2. Step 2: Determine execution sequence

    Tasks run in sequence: task1 first, then task2, then task3.
  3. Final Answer:

    task1, then task2, then task3 -> Option B
  4. Quick Check:

    task1 >> task2 >> task3 means sequential order [OK]
Hint: >> means run left task before right task [OK]
Common Mistakes:
  • Assuming tasks run in reverse order
  • Thinking tasks run in parallel
  • Ignoring the '>>' operator meaning
4. You wrote this DAG code but get an error: TypeError: 'DAG' object is not iterable. What is the likely cause?
with DAG('example_dag', schedule_interval='@daily') as dag:
    task1 = DummyOperator(task_id='task1')
    task2 = DummyOperator(task_id='task2')
    task1 >> task2

for task in dag:
    print(task.task_id)
medium
A. DAG object is not iterable, so 'for task in dag' causes error
B. DummyOperator requires a 'dag' parameter outside the context
C. Missing import for DummyOperator
D. schedule_interval '@daily' is invalid

Solution

  1. Step 1: Identify error cause

    The error says 'DAG' object is not iterable, likely from trying to loop over dag object.
  2. Step 2: Understand DAG iterability

    DAG objects in Airflow are not iterable directly; looping over them causes this error.
  3. Final Answer:

    DAG object is not iterable, so 'for task in dag' causes error -> Option A
  4. Quick Check:

    DAG is not iterable; use dag.tasks list instead [OK]
Hint: DAG is not iterable; use dag.tasks to loop [OK]
Common Mistakes:
  • Trying to loop directly over DAG object
  • Assuming DummyOperator needs dag param outside context
  • Misreading error as import issue
5. You want to create a pipeline where task A runs first, then tasks B and C run in parallel, and finally task D runs after both B and C finish. Which DAG structure correctly represents this?
hard
A. [A, B] >> C >> D
B. A >> B >> C >> D
C. A >> [B, C] >> D
D. A >> D >> [B, C]

Solution

  1. Step 1: Understand task order requirements

    Task A runs first, then B and C run at the same time, then D runs after both finish.
  2. Step 2: Translate to DAG syntax

    Using Airflow syntax, 'A >> [B, C] >> D' means A before B and C in parallel, then D after both.
  3. Final Answer:

    A >> [B, C] >> D -> Option C
  4. Quick Check:

    Parallel tasks in list brackets between sequential tasks [OK]
Hint: Use brackets [] for parallel tasks in DAG [OK]
Common Mistakes:
  • Placing tasks in wrong order
  • Not using brackets for parallel tasks
  • Assuming linear order for all tasks