0
0
AirflowHow-ToBeginner · 3 min read

How to Store JSON in Variable in Airflow

In Airflow, you can store JSON data in a Python variable by assigning it directly as a Python dictionary or string. To share JSON between tasks, use XCom by pushing the JSON object in one task and pulling it in another.
📐

Syntax

To store JSON in a variable in Airflow, you typically use Python's native dictionary or string format. For sharing between tasks, use task_instance.xcom_push(key, value) to push JSON and task_instance.xcom_pull(key) to retrieve it.

  • json_data = {'key': 'value'}: Store JSON as a Python dict.
  • ti.xcom_push(key='json_key', value=json_data): Push JSON to XCom.
  • ti.xcom_pull(key='json_key'): Pull JSON from XCom.
python
json_data = {'name': 'Airflow', 'version': 2}
ti.xcom_push(key='my_json', value=json_data)
retrieved_json = ti.xcom_pull(key='my_json')
💻

Example

This example shows how to store JSON in a variable inside a PythonOperator and share it with another task using XCom.

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def push_json(ti):
    json_data = {'user': 'alice', 'action': 'login'}
    ti.xcom_push(key='user_action', value=json_data)

def pull_json(ti):
    data = ti.xcom_pull(key='user_action')
    print(f"Pulled JSON: {data}")

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('json_variable_example', default_args=default_args, schedule_interval='@once')

push_task = PythonOperator(task_id='push_json', python_callable=push_json, dag=dag)
pull_task = PythonOperator(task_id='pull_json', python_callable=pull_json, dag=dag)

push_task >> pull_task
Output
Pulled JSON: {'user': 'alice', 'action': 'login'}
⚠️

Common Pitfalls

Common mistakes include:

  • Trying to store JSON as a string without parsing it back to a dict when needed.
  • Not using ti (task instance) to push or pull XCom data.
  • Forgetting that XCom values must be serializable (JSON serializable).

Always ensure your JSON data is a Python dict or list before pushing to XCom.

python
def wrong_push(ti):
    json_str = '{"key": "value"}'  # JSON as string
    ti.xcom_push(key='json_key', value=json_str)  # Works but pulling expects string

def right_push(ti):
    json_obj = {"key": "value"}  # Python dict
    ti.xcom_push(key='json_key', value=json_obj)  # Recommended
📊

Quick Reference

ActionCode ExampleDescription
Store JSON in variablejson_data = {'a': 1, 'b': 2}Store JSON as Python dict
Push JSON to XComti.xcom_push(key='data', value=json_data)Share JSON between tasks
Pull JSON from XComdata = ti.xcom_pull(key='data')Retrieve shared JSON
Ensure serializableUse dict or list, not complex objectsXCom requires JSON-serializable data

Key Takeaways

Store JSON as Python dictionaries or lists in Airflow variables.
Use XCom's xcom_push and xcom_pull to share JSON between tasks.
Always ensure JSON data is JSON-serializable before pushing to XCom.
Avoid storing JSON as raw strings unless you parse them back.
Use task instance (ti) to interact with XCom in PythonOperator functions.