0
0
AirflowHow-ToBeginner · 3 min read

How to Define a DAG in Python for Apache Airflow

To define a DAG in Python for Apache Airflow, import DAG from airflow and create an instance with required arguments like dag_id, start_date, and schedule_interval. Then, define tasks and set their dependencies inside this DAG object.
📐

Syntax

Defining a DAG in Python involves creating a DAG object with key parameters:

  • dag_id: Unique name for the DAG.
  • start_date: When the DAG should start running.
  • schedule_interval: How often the DAG runs (e.g., daily).
  • default_args: Optional dictionary for default task arguments.

Tasks are defined as operators and linked inside the DAG.

python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

dag = DAG(
    dag_id='example_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

# Define tasks
start = BashOperator(
    task_id='start',
    bash_command='echo Start',
    dag=dag
)

end = BashOperator(
    task_id='end',
    bash_command='echo End',
    dag=dag
)

# Set task dependencies
start >> end
💻

Example

This example shows a simple DAG with two tasks: start and end. The start task runs first, then the end task runs after it.

python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

dag = DAG(
    dag_id='simple_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

start = BashOperator(
    task_id='start',
    bash_command='echo Starting DAG',
    dag=dag
)

end = BashOperator(
    task_id='end',
    bash_command='echo Ending DAG',
    dag=dag
)

start >> end
Output
[2024-01-01 00:00:00] Starting DAG [2024-01-01 00:00:01] Ending DAG
⚠️

Common Pitfalls

Common mistakes when defining DAGs include:

  • Not setting start_date properly, causing DAG not to run.
  • Using schedule_interval=None unintentionally, which disables scheduling.
  • Forgetting to set task dependencies, so tasks run in wrong order or all at once.
  • Defining tasks outside the DAG context or missing dag=dag in operators.
python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# Wrong: Missing start_date
# dag = DAG(dag_id='bad_dag', schedule_interval='@daily')

# Right: Include start_date

default_args = {'start_date': datetime(2024, 1, 1)}
dag = DAG(dag_id='good_dag', default_args=default_args, schedule_interval='@daily')

# Wrong: Tasks without dag parameter
# task1 = BashOperator(task_id='task1', bash_command='echo hi')

# Right: Tasks linked to DAG

task1 = BashOperator(task_id='task1', bash_command='echo hi', dag=dag)
📊

Quick Reference

Key points to remember when defining a DAG:

  • dag_id: Unique name for your DAG.
  • start_date: When your DAG starts running.
  • schedule_interval: How often it runs (cron or presets like '@daily').
  • default_args: Common task settings like retries or owner.
  • Tasks: Define operators and set dependencies with >> or .set_upstream().

Key Takeaways

Always create a DAG object with dag_id, start_date, and schedule_interval.
Define tasks inside the DAG and link them with dependencies.
Set start_date correctly to ensure your DAG runs as expected.
Include dag=dag in each task to associate it with the DAG.
Use schedule_interval to control how often the DAG runs.