0
0
Apache Airflowdevops~10 mins

Unit testing DAGs in Apache Airflow - Step-by-Step Execution

Choose your learning style9 modes available
Process Flow - Unit testing DAGs
Write DAG code
Write unit test code
Run unit tests
Check test results
Deploy
Back to Write unit test code
This flow shows how you write DAGs, create unit tests for them, run tests, and then either deploy or fix issues based on results.
Execution Sample
Apache Airflow
def test_dag_structure():
    dag = get_dag('example_dag')
    assert dag is not None
    assert len(dag.tasks) == 3
    assert dag.tasks[0].task_id == 'start_task'
This test checks that the DAG named 'example_dag' exists, has 3 tasks, and the first task is 'start_task'.
Process Table
StepActionEvaluationResult
1Call get_dag('example_dag')DAG object found?True
2Check dag is not Nonedag is not NonePass
3Check number of taskslen(dag.tasks) == 3Pass
4Check first task iddag.tasks[0].task_id == 'start_task'Pass
5All assertions passedTest resultPass
💡 All assertions passed, test completes successfully
Status Tracker
VariableStartAfter Step 1After Step 2After Step 3After Step 4Final
dagNoneDAG object for 'example_dag'SameSameSameSame
len(dag.tasks)N/AN/AN/A3SameSame
dag.tasks[0].task_idN/AN/AN/AN/A'start_task''start_task'
Key Moments - 3 Insights
Why do we check if dag is not None before other assertions?
Because if dag is None, accessing dag.tasks would cause an error. Step 2 in the execution_table ensures the DAG exists before checking tasks.
What happens if the number of tasks is not 3?
The assertion in Step 3 would fail, stopping the test early and indicating the DAG structure is incorrect.
Why check the first task's task_id?
To verify the DAG tasks are correctly named and ordered, ensuring the workflow is as expected (Step 4).
Visual Quiz - 3 Questions
Test your understanding
Look at the execution_table, what is the value of 'dag' after Step 1?
ANone
B3
CDAG object for 'example_dag'
D'start_task'
💡 Hint
Check the variable_tracker row for 'dag' at 'After Step 1'
At which step does the test verify the number of tasks in the DAG?
AStep 3
BStep 4
CStep 2
DStep 5
💡 Hint
Look at the 'Action' column in execution_table for checking task count
If the DAG had 4 tasks instead of 3, what would happen in the execution_table?
AStep 4 would fail
BStep 3 would fail
CStep 3 would pass
DStep 5 would pass
💡 Hint
Step 3 checks if len(dag.tasks) == 3; if not, assertion fails there
Concept Snapshot
Unit testing DAGs in Airflow:
- Write tests to check DAG existence and structure
- Use assertions to verify tasks count and IDs
- Run tests before deployment
- Fix errors if tests fail
- Ensures DAG correctness early
Full Transcript
Unit testing DAGs means writing small test functions that check if your DAG exists and has the right tasks. First, you get the DAG object by name. Then you check if it is not None to avoid errors. Next, you verify the number of tasks matches what you expect. Finally, you check specific task IDs to confirm the workflow order. If all assertions pass, the test succeeds and you can deploy safely. If any assertion fails, you fix the DAG or tests before moving on. This process helps catch mistakes early and keeps your workflows reliable.