How to Define a DAG in Python for Apache Airflow
To define a
DAG in Python for Apache Airflow, import DAG from airflow and create an instance with required arguments like dag_id, start_date, and schedule_interval. Then, define tasks and set their dependencies inside this DAG object.Syntax
Defining a DAG in Python involves creating a DAG object with key parameters:
- dag_id: Unique name for the DAG.
- start_date: When the DAG should start running.
- schedule_interval: How often the DAG runs (e.g., daily).
- default_args: Optional dictionary for default task arguments.
Tasks are defined as operators and linked inside the DAG.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 1, 1), } dag = DAG( dag_id='example_dag', default_args=default_args, schedule_interval='@daily', catchup=False ) # Define tasks start = BashOperator( task_id='start', bash_command='echo Start', dag=dag ) end = BashOperator( task_id='end', bash_command='echo End', dag=dag ) # Set task dependencies start >> end
Example
This example shows a simple DAG with two tasks: start and end. The start task runs first, then the end task runs after it.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 1, 1), } dag = DAG( dag_id='simple_dag', default_args=default_args, schedule_interval='@daily', catchup=False ) start = BashOperator( task_id='start', bash_command='echo Starting DAG', dag=dag ) end = BashOperator( task_id='end', bash_command='echo Ending DAG', dag=dag ) start >> end
Output
[2024-01-01 00:00:00] Starting DAG
[2024-01-01 00:00:01] Ending DAG
Common Pitfalls
Common mistakes when defining DAGs include:
- Not setting
start_dateproperly, causing DAG not to run. - Using
schedule_interval=Noneunintentionally, which disables scheduling. - Forgetting to set task dependencies, so tasks run in wrong order or all at once.
- Defining tasks outside the DAG context or missing
dag=dagin operators.
python
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # Wrong: Missing start_date # dag = DAG(dag_id='bad_dag', schedule_interval='@daily') # Right: Include start_date default_args = {'start_date': datetime(2024, 1, 1)} dag = DAG(dag_id='good_dag', default_args=default_args, schedule_interval='@daily') # Wrong: Tasks without dag parameter # task1 = BashOperator(task_id='task1', bash_command='echo hi') # Right: Tasks linked to DAG task1 = BashOperator(task_id='task1', bash_command='echo hi', dag=dag)
Quick Reference
Key points to remember when defining a DAG:
- dag_id: Unique name for your DAG.
- start_date: When your DAG starts running.
- schedule_interval: How often it runs (cron or presets like '@daily').
- default_args: Common task settings like retries or owner.
- Tasks: Define operators and set dependencies with
>>or.set_upstream().
Key Takeaways
Always create a DAG object with dag_id, start_date, and schedule_interval.
Define tasks inside the DAG and link them with dependencies.
Set start_date correctly to ensure your DAG runs as expected.
Include dag=dag in each task to associate it with the DAG.
Use schedule_interval to control how often the DAG runs.