0
0
Apache Airflowdevops~10 mins

Handling schema changes in data pipelines in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
Data pipelines often break when the structure of incoming data changes unexpectedly. Handling schema changes means updating your pipeline to accept new or modified data formats without stopping the flow.
When a new column is added to your data source and your pipeline needs to process it.
When a column is removed or renamed in the incoming data and your pipeline must adapt.
When the data type of a column changes, such as from integer to string.
When you want to validate incoming data schema before processing to avoid errors.
When you want to maintain historical data compatibility while accepting new schema versions.
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json

def validate_schema(**kwargs):
    expected_schema = {'id': int, 'name': str, 'age': int}
    data = kwargs['ti'].xcom_pull(task_ids='fetch_data')
    for record in data:
        for key, expected_type in expected_schema.items():
            if key not in record:
                raise ValueError(f"Missing key {key} in record")
            if not isinstance(record[key], expected_type):
                raise TypeError(f"Key {key} expected type {expected_type.__name__}, got {type(record[key]).__name__}")

def fetch_data():
    # Simulated data fetch with schema change (extra 'email' field)
    return [
        {'id': 1, 'name': 'Alice', 'age': 30, 'email': 'alice@example.com'},
        {'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'}
    ]

def process_data(**kwargs):
    data = kwargs['ti'].xcom_pull(task_ids='fetch_data')
    # Process data ignoring unknown fields
    processed = [{k: v for k, v in record.items() if k in ['id', 'name', 'age']} for record in data]
    print(f"Processed data: {processed}")

def handle_schema_change():
    print("Schema change handled by ignoring unknown fields and validating required ones.")

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'handle_schema_changes',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
)

fetch_task = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag
)

validate_task = PythonOperator(
    task_id='validate_schema',
    python_callable=validate_schema,
    dag=dag
)

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag
)

handle_change_task = PythonOperator(
    task_id='handle_schema_change',
    python_callable=handle_schema_change,
    dag=dag
)

fetch_task >> validate_task >> process_task >> handle_change_task

This Airflow DAG simulates handling schema changes in a data pipeline.

fetch_data simulates fetching data with an extra 'email' field not in the expected schema.

validate_schema checks required fields and their types, ignoring extra fields.

process_data processes only the expected fields, ignoring unknown ones.

handle_schema_change is a placeholder to show where you might add logic to adapt to schema changes.

The DAG runs tasks in order: fetch, validate, process, then handle schema changes.

Commands
List all available DAGs to confirm the new DAG 'handle_schema_changes' is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
handle_schema_changes example_bash_operator example_python_operator
Trigger the DAG manually to run the tasks and test schema change handling.
Terminal
airflow dags trigger handle_schema_changes
Expected OutputExpected
Created <DagRun handle_schema_changes @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the DAG to understand the workflow steps.
Terminal
airflow tasks list handle_schema_changes
Expected OutputExpected
fetch_data validate_schema process_data handle_schema_change
Run the 'fetch_data' task alone for the given date to verify data fetching with schema changes.
Terminal
airflow tasks test handle_schema_changes fetch_data 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {python.py:114} INFO - Running command: ['python3', '-m', 'airflow', 'tasks', 'run', 'handle_schema_changes', 'fetch_data', '2024-06-01', '--local'] [2024-06-01 12:00:01,000] {python.py:118} INFO - Done. Returned value: [{'id': 1, 'name': 'Alice', 'age': 30, 'email': 'alice@example.com'}, {'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'}]
Run the 'validate_schema' task to check if the data matches the expected schema, ignoring extra fields.
Terminal
airflow tasks test handle_schema_changes validate_schema 2024-06-01
Expected OutputExpected
[2024-06-01 12:01:00,000] {python.py:114} INFO - Running command: ['python3', '-m', 'airflow', 'tasks', 'run', 'handle_schema_changes', 'validate_schema', '2024-06-01', '--local'] [2024-06-01 12:01:01,000] {python.py:118} INFO - Done. Returned value: None
Key Concept

If you remember nothing else, remember: validate required fields strictly but ignore unknown fields to handle schema changes smoothly.

Common Mistakes
Failing the pipeline when extra fields appear in the data.
This causes unnecessary pipeline failures for harmless schema additions.
Validate only required fields and ignore extra fields to allow forward compatibility.
Not validating data types, leading to errors downstream.
Incorrect data types can cause crashes or wrong results later in the pipeline.
Always check that required fields have the expected data types before processing.
Hardcoding schema without a way to update it when changes occur.
Makes the pipeline brittle and hard to maintain as data evolves.
Design schema validation to be easy to update and handle optional fields gracefully.
Summary
Create an Airflow DAG that fetches data, validates schema, processes data, and handles schema changes.
Validate required fields and their types but ignore unknown fields to avoid pipeline failures.
Test tasks individually to ensure each step handles schema changes correctly.