How to Use MySqlHook in Airflow: Syntax and Example
Use
MySqlHook in Airflow by importing it from airflow.providers.mysql.hooks.mysql, then create an instance with your connection ID. Call methods like get_records() or run() to execute SQL queries on your MySQL database.Syntax
The MySqlHook class is imported from airflow.providers.mysql.hooks.mysql. You create an instance by passing the Airflow connection ID for your MySQL database. Use methods like get_records(sql) to fetch data or run(sql) to execute commands.
Key parts:
mysql_conn_id: Airflow connection ID configured in UI or environment.get_records(sql): Runs a SELECT query and returns results as a list of tuples.run(sql): Runs any SQL command without returning results.
python
from airflow.providers.mysql.hooks.mysql import MySqlHook # Create hook instance hook = MySqlHook(mysql_conn_id='my_mysql_conn') # Fetch records example records = hook.get_records(sql='SELECT * FROM my_table') # Run a SQL command example hook.run(sql='INSERT INTO my_table (col1) VALUES ("value")')
Example
This example shows how to use MySqlHook inside an Airflow task to fetch data from a MySQL table and print the results.
python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.mysql.hooks.mysql import MySqlHook from datetime import datetime def fetch_and_print(): hook = MySqlHook(mysql_conn_id='my_mysql_conn') records = hook.get_records(sql='SELECT id, name FROM users LIMIT 5') for row in records: print(f"User ID: {row[0]}, Name: {row[1]}") with DAG(dag_id='mysqlhook_example_dag', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag: task = PythonOperator( task_id='fetch_users', python_callable=fetch_and_print )
Output
User ID: 1, Name: Alice
User ID: 2, Name: Bob
User ID: 3, Name: Carol
User ID: 4, Name: Dave
User ID: 5, Name: Eve
Common Pitfalls
- Not setting up the Airflow MySQL connection (
mysql_conn_id) properly in the Airflow UI or environment causes connection failures. - Using incorrect SQL syntax or forgetting to commit transactions when running insert/update commands.
- Assuming
get_records()returns dictionaries; it returns a list of tuples instead. - Not handling exceptions for connection or query errors can cause DAG failures.
python
from airflow.providers.mysql.hooks.mysql import MySqlHook # Wrong: Using a non-existent connection ID hook = MySqlHook(mysql_conn_id='wrong_conn') # This will fail # Right: Use the correct connection ID configured in Airflow hook = MySqlHook(mysql_conn_id='my_mysql_conn')
Quick Reference
| Method | Description |
|---|---|
| MySqlHook(mysql_conn_id) | Create hook with Airflow MySQL connection ID |
| get_records(sql) | Run SELECT query and return list of tuples |
| run(sql) | Execute any SQL command without returning results |
| get_conn() | Get raw MySQL connection object for advanced use |
| get_cursor() | Get raw cursor object for custom queries |
Key Takeaways
Import MySqlHook from airflow.providers.mysql.hooks.mysql and create it with your Airflow MySQL connection ID.
Use get_records() to fetch query results and run() to execute SQL commands without results.
Ensure your Airflow MySQL connection is correctly configured to avoid connection errors.
Remember get_records() returns a list of tuples, not dictionaries.
Handle exceptions to prevent DAG failures when database issues occur.