0
0
Apache Airflowdevops~10 mins

SqlOperator for database queries in Apache Airflow - Step-by-Step Execution

Choose your learning style9 modes available
Process Flow - PostgresOperator for Postgres database queries
Define SQL query string
Create PostgresOperator with query and connection
Airflow schedules and triggers task
PostgresOperator connects to DB
Executes SQL query
Commits transaction if necessary
Task completes with success or failure
This flow shows how PostgresOperator runs a SQL query in Airflow: define query, create operator, Airflow runs it, connects to DB, executes query, and finishes.
Execution Sample
Apache Airflow
from airflow.providers.postgres.operators.postgres import PostgresOperator

sql_task = PostgresOperator(
    task_id='run_query',
    postgres_conn_id='my_postgres',
    sql='SELECT * FROM users;'
)
This code creates a PostgresOperator task that runs a SELECT query on a Postgres database using Airflow.
Process Table
StepActionEvaluationResult
1Define SQL query stringsql = 'SELECT * FROM users;'Query string stored
2Create PostgresOperator with task_id, postgres_conn_id, and sqlPostgresOperator(task_id='run_query', postgres_conn_id='my_postgres', sql=sql)Operator instance created
3Airflow scheduler triggers taskTask scheduled to runTask starts execution
4PostgresOperator connects to Postgres DB using 'my_postgres'Connection establishedDB connection ready
5Execute SQL query 'SELECT * FROM users;'Query sent to DBDB executes query
6Finalize query execution and transactionTransaction handledExecution complete
7Task completes successfullyNo errorsTask marked success
💡 Task finishes after successful query execution
Status Tracker
VariableStartAfter Step 1After Step 2After Step 4After Step 6Final
sqlundefinedSELECT * FROM users;SELECT * FROM users;SELECT * FROM users;SELECT * FROM users;SELECT * FROM users;
sql_taskundefinedundefinedPostgresOperator instancePostgresOperator instancePostgresOperator instancePostgresOperator instance
db_connectionundefinedundefinedundefinedConnectedConnectedClosed after task
query_resultundefinedundefinedundefinedundefinedundefinedundefined
Key Moments - 3 Insights
Why do we need to specify 'postgres_conn_id' in PostgresOperator?
The 'postgres_conn_id' tells Airflow which database connection to use. Without it, PostgresOperator won't know where to run the query. See step 4 in execution_table where connection is established.
Does PostgresOperator automatically fetch query results?
No, PostgresOperator executes the query but does not automatically fetch or push results to XCom. It completes execution as shown in step 6. Use PythonOperator with hook.get_records() for SELECT data.
What happens if the SQL query has an error?
If the query fails, the task will fail and Airflow marks it as failed. This stops the flow unless retries or error handling are set. This is implied after step 6 if errors occur.
Visual Quiz - 3 Questions
Test your understanding
Look at the execution table, at which step does PostgresOperator connect to the database?
AStep 4
BStep 2
CStep 6
DStep 1
💡 Hint
Check the 'Action' column for connection establishment in the execution_table.
According to variable_tracker, what is the value of 'query_result' after step 6?
AFetched rows
BPostgresOperator instance
Cundefined
DClosed after task
💡 Hint
Look at the 'query_result' row under 'After Step 6' in variable_tracker.
If the SQL query string changes, which step in execution_table reflects this change?
AStep 4
BStep 1
CStep 7
DStep 3
💡 Hint
Step 1 shows defining the SQL query string.
Concept Snapshot
PostgresOperator runs SQL queries in Airflow tasks.
Define SQL string, create operator with postgres_conn_id and SQL.
Airflow runs task, operator connects to DB, executes query.
Task completes success or failure.
Use postgres_conn_id to specify target database.
Full Transcript
PostgresOperator in Airflow lets you run SQL queries as tasks. First, you write your SQL query as a string. Then, you create a PostgresOperator instance, giving it a task ID, the postgres_conn_id, and the SQL query. When Airflow runs the task, PostgresOperator connects to the database using the connection ID. It sends the SQL query to the database and waits for completion. After the query runs, it handles the transaction and marks the task as successful if no errors occur. If the query fails, the task fails. This process allows you to automate database queries inside Airflow workflows easily.