from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime default_args = {'start_date': datetime(2024, 1, 1)} dag = DAG('test_sql_operator', default_args=default_args, schedule_interval='@once') run_query = PostgresOperator( task_id='run_query', postgres_conn_id='my_postgres', sql='SELECT COUNT(*) FROM users;', dag=dag )
The SqlOperator runs the SQL query but does not fetch or log the results by default. It executes the query on the database and marks the task as successful if no error occurs. To see query results, you need to fetch them explicitly or use a different operator.
The sql parameter expects the SQL commands as a string. Reading the file content and passing it as a string is the correct way. Passing the file path as a string or using SQL commands like RUN will not work.
The error 'could not connect to server' usually means the connection details are wrong or missing. The connection ID must be correctly set up in Airflow connections with valid host, port, user, and password.
create_table = PostgresOperator(task_id='create_table', postgres_conn_id='my_postgres', sql='CREATE TABLE test(id INT);', dag=dag) insert_data = PostgresOperator(task_id='insert_data', postgres_conn_id='my_postgres', sql='INSERT INTO test VALUES (1);', dag=dag) verify_data = PostgresOperator(task_id='verify_data', postgres_conn_id='my_postgres', sql='SELECT COUNT(*) FROM test;', dag=dag)
In Airflow, the '>>' operator sets downstream dependencies, so tasks run sequentially in that order.
Airflow Connections securely store credentials and allow operators to reference them by ID. This avoids exposing secrets in code or files.