0
0
AirflowHow-ToBeginner · 4 min read

How to Use PostgresHook in Airflow: Simple Guide

Use PostgresHook in Airflow to connect to a PostgreSQL database by specifying the connection ID. Then call methods like get_records() or run() to execute SQL queries within your DAG tasks.
📐

Syntax

The PostgresHook is initialized with a connection ID that refers to your PostgreSQL connection configured in Airflow. You use methods like get_records(sql) to fetch data or run(sql) to execute SQL commands.

Key parts:

  • postgres_conn_id: The Airflow connection ID for PostgreSQL.
  • get_records(sql): Runs a SELECT query and returns results as a list of tuples.
  • run(sql): Executes any SQL command (INSERT, UPDATE, DELETE, etc.).
python
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Initialize the hook with your connection ID
hook = PostgresHook(postgres_conn_id='my_postgres_conn')

# Fetch records example
records = hook.get_records(sql='SELECT * FROM my_table;')

# Run a SQL command example
hook.run(sql='INSERT INTO my_table (col1) VALUES (123);')
💻

Example

This example shows how to use PostgresHook inside an Airflow PythonOperator to fetch data from a PostgreSQL table and log the results.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

def fetch_data():
    hook = PostgresHook(postgres_conn_id='my_postgres_conn')
    records = hook.get_records(sql='SELECT id, name FROM users LIMIT 5;')
    for record in records:
        print(f"User ID: {record[0]}, Name: {record[1]}")

default_args = {
    'start_date': datetime(2024, 1, 1),
}

dag = DAG('postgres_hook_example', default_args=default_args, schedule_interval='@daily', catchup=False)

fetch_task = PythonOperator(
    task_id='fetch_users',
    python_callable=fetch_data,
    dag=dag
)
Output
User ID: 1, Name: Alice User ID: 2, Name: Bob User ID: 3, Name: Carol User ID: 4, Name: Dave User ID: 5, Name: Eve
⚠️

Common Pitfalls

Common mistakes when using PostgresHook include:

  • Not setting up the PostgreSQL connection in Airflow UI or environment, causing connection errors.
  • Using wrong postgres_conn_id that does not match the configured connection.
  • Forgetting to commit transactions when running data-changing SQL commands (use run(sql, autocommit=True) if needed).
  • Not handling exceptions which can cause silent failures.
python
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Wrong: missing connection setup or wrong ID
hook = PostgresHook(postgres_conn_id='wrong_conn_id')

# Right: ensure connection exists and use autocommit for INSERT
hook = PostgresHook(postgres_conn_id='my_postgres_conn')
hook.run(sql='INSERT INTO my_table (col1) VALUES (123);', autocommit=True)
📊

Quick Reference

MethodDescription
PostgresHook(postgres_conn_id)Create hook with connection ID
get_records(sql)Run SELECT query and return list of tuples
run(sql, autocommit=False)Execute SQL command; set autocommit=True for changes
get_conn()Get raw psycopg2 connection object
get_cursor()Get raw cursor object for advanced use

Key Takeaways

Initialize PostgresHook with the correct Airflow connection ID to connect to PostgreSQL.
Use get_records() to fetch data and run() to execute SQL commands inside Airflow tasks.
Ensure your PostgreSQL connection is properly configured in Airflow UI before using the hook.
Use autocommit=True in run() when executing data-changing SQL statements to avoid transaction issues.
Handle exceptions to catch and debug database connection or query errors effectively.