from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import json
def validate_schema(**kwargs):
expected_schema = {'id': int, 'name': str, 'age': int}
data = kwargs['ti'].xcom_pull(task_ids='fetch_data')
for record in data:
for key, expected_type in expected_schema.items():
if key not in record:
raise ValueError(f"Missing key {key} in record")
if not isinstance(record[key], expected_type):
raise TypeError(f"Key {key} expected type {expected_type.__name__}, got {type(record[key]).__name__}")
def fetch_data():
# Simulated data fetch with schema change (extra 'email' field)
return [
{'id': 1, 'name': 'Alice', 'age': 30, 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'age': 25, 'email': 'bob@example.com'}
]
def process_data(**kwargs):
data = kwargs['ti'].xcom_pull(task_ids='fetch_data')
# Process data ignoring unknown fields
processed = [{k: v for k, v in record.items() if k in ['id', 'name', 'age']} for record in data]
print(f"Processed data: {processed}")
def handle_schema_change():
print("Schema change handled by ignoring unknown fields and validating required ones.")
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
'handle_schema_changes',
default_args=default_args,
schedule_interval=None,
catchup=False
)
fetch_task = PythonOperator(
task_id='fetch_data',
python_callable=fetch_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_schema',
python_callable=validate_schema,
dag=dag
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag
)
handle_change_task = PythonOperator(
task_id='handle_schema_change',
python_callable=handle_schema_change,
dag=dag
)
fetch_task >> validate_task >> process_task >> handle_change_task
This Airflow DAG simulates handling schema changes in a data pipeline.
fetch_data simulates fetching data with an extra 'email' field not in the expected schema.
validate_schema checks required fields and their types, ignoring extra fields.
process_data processes only the expected fields, ignoring unknown ones.
handle_schema_change is a placeholder to show where you might add logic to adapt to schema changes.
The DAG runs tasks in order: fetch, validate, process, then handle schema changes.