0
0
AirflowHow-ToBeginner · 3 min read

How to Use MySqlHook in Airflow: Syntax and Example

Use MySqlHook in Airflow by importing it from airflow.providers.mysql.hooks.mysql, then create an instance with your connection ID. Call methods like get_records() or run() to execute SQL queries on your MySQL database.
📐

Syntax

The MySqlHook class is imported from airflow.providers.mysql.hooks.mysql. You create an instance by passing the Airflow connection ID for your MySQL database. Use methods like get_records(sql) to fetch data or run(sql) to execute commands.

Key parts:

  • mysql_conn_id: Airflow connection ID configured in UI or environment.
  • get_records(sql): Runs a SELECT query and returns results as a list of tuples.
  • run(sql): Runs any SQL command without returning results.
python
from airflow.providers.mysql.hooks.mysql import MySqlHook

# Create hook instance
hook = MySqlHook(mysql_conn_id='my_mysql_conn')

# Fetch records example
records = hook.get_records(sql='SELECT * FROM my_table')

# Run a SQL command example
hook.run(sql='INSERT INTO my_table (col1) VALUES ("value")')
💻

Example

This example shows how to use MySqlHook inside an Airflow task to fetch data from a MySQL table and print the results.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

def fetch_and_print():
    hook = MySqlHook(mysql_conn_id='my_mysql_conn')
    records = hook.get_records(sql='SELECT id, name FROM users LIMIT 5')
    for row in records:
        print(f"User ID: {row[0]}, Name: {row[1]}")

with DAG(dag_id='mysqlhook_example_dag', start_date=datetime(2024, 1, 1), schedule_interval='@once', catchup=False) as dag:
    task = PythonOperator(
        task_id='fetch_users',
        python_callable=fetch_and_print
    )
Output
User ID: 1, Name: Alice User ID: 2, Name: Bob User ID: 3, Name: Carol User ID: 4, Name: Dave User ID: 5, Name: Eve
⚠️

Common Pitfalls

  • Not setting up the Airflow MySQL connection (mysql_conn_id) properly in the Airflow UI or environment causes connection failures.
  • Using incorrect SQL syntax or forgetting to commit transactions when running insert/update commands.
  • Assuming get_records() returns dictionaries; it returns a list of tuples instead.
  • Not handling exceptions for connection or query errors can cause DAG failures.
python
from airflow.providers.mysql.hooks.mysql import MySqlHook

# Wrong: Using a non-existent connection ID
hook = MySqlHook(mysql_conn_id='wrong_conn')  # This will fail

# Right: Use the correct connection ID configured in Airflow
hook = MySqlHook(mysql_conn_id='my_mysql_conn')
📊

Quick Reference

MethodDescription
MySqlHook(mysql_conn_id)Create hook with Airflow MySQL connection ID
get_records(sql)Run SELECT query and return list of tuples
run(sql)Execute any SQL command without returning results
get_conn()Get raw MySQL connection object for advanced use
get_cursor()Get raw cursor object for custom queries

Key Takeaways

Import MySqlHook from airflow.providers.mysql.hooks.mysql and create it with your Airflow MySQL connection ID.
Use get_records() to fetch query results and run() to execute SQL commands without results.
Ensure your Airflow MySQL connection is correctly configured to avoid connection errors.
Remember get_records() returns a list of tuples, not dictionaries.
Handle exceptions to prevent DAG failures when database issues occur.