How to Create a DAG in Apache Airflow: Step-by-Step Guide
To create a DAG in Airflow, define a Python script with a
DAG object specifying its schedule and tasks. Use Operators to define tasks and set dependencies between them inside the DAG context.Syntax
A DAG in Airflow is defined using the DAG class from airflow.models. You specify parameters like dag_id (unique name), schedule_interval (when to run), and start_date (when to start). Tasks are created using Operators and linked with dependencies.
- dag_id: Unique identifier for the DAG.
- schedule_interval: Cron expression or presets like '@daily'.
- start_date: DateTime when DAG starts running.
- Operators: Define individual tasks (e.g., BashOperator).
- Task dependencies: Set order using
task1 >> task2.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'start_date': datetime(2024, 1, 1), } dag = DAG( dag_id='example_dag', default_args=default_args, schedule_interval='@daily', catchup=False ) task1 = BashOperator( task_id='print_date', bash_command='date', dag=dag ) task2 = BashOperator( task_id='sleep', bash_command='sleep 5', dag=dag ) task1 >> task2
Example
This example creates a simple DAG that runs daily. It has two tasks: one prints the current date, and the other sleeps for 5 seconds. The tasks run in sequence.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'start_date': datetime(2024, 1, 1), } dag = DAG( dag_id='simple_dag', default_args=default_args, schedule_interval='@daily', catchup=False ) print_date = BashOperator( task_id='print_date', bash_command='date', dag=dag ) sleep_task = BashOperator( task_id='sleep_task', bash_command='sleep 5', dag=dag ) print_date >> sleep_task
Output
INFO - Running command: date
Thu Jun 6 12:00:00 UTC 2024
INFO - Running command: sleep 5
Common Pitfalls
- Forgetting to set
start_datecauses DAG not to run. - Not setting
catchup=Falsecan cause many backfill runs. - Defining tasks outside the DAG context or missing
dag=dagargument leads to errors. - Incorrect task dependencies cause tasks to run in wrong order or not at all.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # Wrong: Missing start_date # dag = DAG(dag_id='bad_dag', schedule_interval='@daily') # Right: default_args = {'start_date': datetime(2024, 1, 1)} dag = DAG(dag_id='good_dag', default_args=default_args, schedule_interval='@daily', catchup=False) # Wrong: Task without dag argument # task = BashOperator(task_id='task1', bash_command='echo hi') # Right: task = BashOperator(task_id='task1', bash_command='echo hi', dag=dag)
Quick Reference
Tips for creating DAGs:
- Always set
start_dateandschedule_interval. - Use
catchup=Falseto avoid unexpected backfills. - Define tasks inside the DAG context and link them with
>>or<<. - Use descriptive
dag_idandtask_idnames. - Test DAGs locally before deploying.
Key Takeaways
Define a DAG using the DAG class with a unique dag_id, start_date, and schedule_interval.
Create tasks using Operators and assign them to the DAG with the dag parameter.
Set task order using dependencies like task1 >> task2 to control execution flow.
Always specify start_date and consider catchup=False to avoid unwanted backfills.
Test your DAG code to ensure tasks run as expected before deploying.