How to Use PostgresHook in Airflow: Simple Guide
Use
PostgresHook in Airflow to connect to a PostgreSQL database by specifying the connection ID. Then call methods like get_records() or run() to execute SQL queries within your DAG tasks.Syntax
The PostgresHook is initialized with a connection ID that refers to your PostgreSQL connection configured in Airflow. You use methods like get_records(sql) to fetch data or run(sql) to execute SQL commands.
Key parts:
- postgres_conn_id: The Airflow connection ID for PostgreSQL.
- get_records(sql): Runs a SELECT query and returns results as a list of tuples.
- run(sql): Executes any SQL command (INSERT, UPDATE, DELETE, etc.).
python
from airflow.providers.postgres.hooks.postgres import PostgresHook # Initialize the hook with your connection ID hook = PostgresHook(postgres_conn_id='my_postgres_conn') # Fetch records example records = hook.get_records(sql='SELECT * FROM my_table;') # Run a SQL command example hook.run(sql='INSERT INTO my_table (col1) VALUES (123);')
Example
This example shows how to use PostgresHook inside an Airflow PythonOperator to fetch data from a PostgreSQL table and log the results.
python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime def fetch_data(): hook = PostgresHook(postgres_conn_id='my_postgres_conn') records = hook.get_records(sql='SELECT id, name FROM users LIMIT 5;') for record in records: print(f"User ID: {record[0]}, Name: {record[1]}") default_args = { 'start_date': datetime(2024, 1, 1), } dag = DAG('postgres_hook_example', default_args=default_args, schedule_interval='@daily', catchup=False) fetch_task = PythonOperator( task_id='fetch_users', python_callable=fetch_data, dag=dag )
Output
User ID: 1, Name: Alice
User ID: 2, Name: Bob
User ID: 3, Name: Carol
User ID: 4, Name: Dave
User ID: 5, Name: Eve
Common Pitfalls
Common mistakes when using PostgresHook include:
- Not setting up the PostgreSQL connection in Airflow UI or environment, causing connection errors.
- Using wrong
postgres_conn_idthat does not match the configured connection. - Forgetting to commit transactions when running data-changing SQL commands (use
run(sql, autocommit=True)if needed). - Not handling exceptions which can cause silent failures.
python
from airflow.providers.postgres.hooks.postgres import PostgresHook # Wrong: missing connection setup or wrong ID hook = PostgresHook(postgres_conn_id='wrong_conn_id') # Right: ensure connection exists and use autocommit for INSERT hook = PostgresHook(postgres_conn_id='my_postgres_conn') hook.run(sql='INSERT INTO my_table (col1) VALUES (123);', autocommit=True)
Quick Reference
| Method | Description |
|---|---|
| PostgresHook(postgres_conn_id) | Create hook with connection ID |
| get_records(sql) | Run SELECT query and return list of tuples |
| run(sql, autocommit=False) | Execute SQL command; set autocommit=True for changes |
| get_conn() | Get raw psycopg2 connection object |
| get_cursor() | Get raw cursor object for advanced use |
Key Takeaways
Initialize PostgresHook with the correct Airflow connection ID to connect to PostgreSQL.
Use get_records() to fetch data and run() to execute SQL commands inside Airflow tasks.
Ensure your PostgreSQL connection is properly configured in Airflow UI before using the hook.
Use autocommit=True in run() when executing data-changing SQL statements to avoid transaction issues.
Handle exceptions to catch and debug database connection or query errors effectively.