0
0
Apache Airflowdevops~5 mins

SqlOperator for database queries in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Sometimes you need to run SQL commands automatically as part of your data workflows. SqlOperator in Airflow helps you run SQL queries on databases without writing extra code to connect and manage the database.
When you want to run a simple SQL query like inserting or updating data as part of a scheduled task.
When you need to automate database maintenance tasks such as cleaning old records regularly.
When you want to chain SQL queries with other tasks in your data pipeline.
When you want to run a query on a database and use the result in later steps of your workflow.
When you want to keep your SQL code organized inside Airflow DAGs for easy management.
Config File - example_sql_operator_dag.py
example_sql_operator_dag.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

with DAG(
    dag_id='example_sql_operator_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:

    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='my_postgres_conn',
        sql="""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(50),
            email VARCHAR(50)
        );
        """
    )

    insert_user = PostgresOperator(
        task_id='insert_user',
        postgres_conn_id='my_postgres_conn',
        sql="""
        INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
        """
    )

    create_table >> insert_user

This Python file defines an Airflow DAG that runs two SQL commands on a Postgres database.

create_table: This task creates a table named users if it does not exist.

insert_user: This task inserts a new user record into the users table.

The postgres_conn_id refers to a connection configured in Airflow to connect to your Postgres database.

The tasks run in order: first create the table, then insert the user.

Commands
This command lists all the DAGs Airflow knows about, so you can check if your DAG is recognized.
Terminal
airflow dags list
Expected OutputExpected
example_sql_operator_dag
This command starts running the DAG named 'example_sql_operator_dag' immediately to execute the SQL tasks.
Terminal
airflow dags trigger example_sql_operator_dag
Expected OutputExpected
Created <DagRun example_sql_operator_dag @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
This command shows the tasks inside the DAG so you can verify the tasks are defined correctly.
Terminal
airflow tasks list example_sql_operator_dag
Expected OutputExpected
create_table insert_user
This command runs the 'create_table' task for the DAG on the given date to test the SQL execution without scheduling.
Terminal
airflow tasks test example_sql_operator_dag create_table 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:xxxx} INFO - Executing task create_table [2024-06-01 12:00:01,000] {taskinstance.py:xxxx} INFO - Task create_table succeeded
Key Concept

If you remember nothing else from this pattern, remember: SqlOperator lets you run SQL commands inside Airflow tasks easily by specifying the SQL and database connection.

Common Mistakes
Not setting up the database connection in Airflow before using postgres_conn_id.
Airflow cannot connect to the database and the task will fail with a connection error.
Create a connection in Airflow UI under Admin > Connections with the correct database credentials and use its connection ID in the operator.
Writing SQL commands with syntax errors inside the sql parameter.
The SQL query will fail and the task will not complete successfully.
Test your SQL queries separately in the database client before adding them to the operator.
Forgetting to set catchup=False in the DAG when testing to avoid running many past schedules.
Airflow will try to run the DAG for all past dates, causing unexpected load and confusion.
Set catchup=False in the DAG definition to run only the latest schedule.
Summary
Define SQL tasks in Airflow using SqlOperator by specifying the SQL query and database connection ID.
Use airflow CLI commands to list, trigger, and test your DAG and tasks.
Ensure your database connection is configured in Airflow and your SQL syntax is correct before running.