0
0
AirflowHow-ToBeginner · 3 min read

How to Create a Custom Hook in Apache Airflow

To create a custom hook in Airflow, define a new Python class that inherits from BaseHook and implement connection logic in it. Then use this hook in your tasks to interact with external systems in a reusable way.
📐

Syntax

A custom hook in Airflow is a Python class that inherits from BaseHook. You override methods to add connection and interaction logic.

  • class MyCustomHook(BaseHook): - defines the hook class.
  • def __init__(self, conn_id): - initializes with Airflow connection ID.
  • def get_conn(self): - creates and returns the external connection object.
python
from airflow.hooks.base_hook import BaseHook

class MyCustomHook(BaseHook):
    def __init__(self, conn_id):
        super().__init__()
        self.conn_id = conn_id

    def get_conn(self):
        connection = self.get_connection(self.conn_id)
        # Use connection details to create external client
        return connection
💻

Example

This example shows a custom hook that connects to a fictional API using connection info stored in Airflow. It demonstrates how to get connection details and use them.

python
from airflow.hooks.base_hook import BaseHook
import requests

class MyApiHook(BaseHook):
    def __init__(self, conn_id):
        super().__init__()
        self.conn_id = conn_id

    def get_conn(self):
        connection = self.get_connection(self.conn_id)
        return requests.Session(), connection.host, connection.login, connection.password

    def call_api(self, endpoint):
        session, host, user, password = self.get_conn()
        url = f"{host}/{endpoint}"
        response = session.get(url, auth=(user, password))
        response.raise_for_status()
        return response.json()

# Usage example
hook = MyApiHook('my_api_conn')
result = hook.call_api('data')
print(result)
Output
{'key': 'value', 'data': [1, 2, 3]}
⚠️

Common Pitfalls

Common mistakes when creating custom hooks include:

  • Not inheriting from BaseHook, which breaks Airflow integration.
  • Not using Airflow's get_connection() method to retrieve connection info.
  • Hardcoding credentials instead of using Airflow connections.
  • Not handling connection errors or exceptions properly.

Always use Airflow's connection management and handle errors gracefully.

python
from airflow.hooks.base_hook import BaseHook

# Wrong: Not inheriting BaseHook and hardcoding credentials
class BadHook:
    def get_conn(self):
        return 'http://api.example.com', 'user', 'pass'

# Right: Inherit BaseHook and use get_connection
class GoodHook(BaseHook):
    def __init__(self, conn_id):
        super().__init__()
        self.conn_id = conn_id

    def get_conn(self):
        connection = self.get_connection(self.conn_id)
        return connection.host, connection.login, connection.password
📊

Quick Reference

Tips for creating custom hooks in Airflow:

  • Always inherit from BaseHook.
  • Use get_connection(conn_id) to access connection details.
  • Keep connection logic reusable and simple.
  • Handle exceptions to avoid task failures.
  • Test your hook independently before using in DAGs.

Key Takeaways

Create custom hooks by subclassing Airflow's BaseHook and implementing connection logic.
Use Airflow's get_connection method to securely access stored connection details.
Avoid hardcoding credentials; always rely on Airflow connections for security.
Handle exceptions in your hook to prevent task failures.
Test custom hooks separately before integrating them into DAGs.