0
0
Apache Airflowdevops~5 mins

Sharing data between tasks effectively in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
When you run multiple tasks in Airflow, sometimes one task needs to give information to another. Sharing data between tasks helps them work together smoothly without repeating work or losing important details.
When a task processes data and the next task needs that processed data to continue.
When you want to pass small results like file names or counts from one task to another.
When you want to avoid storing data outside Airflow and keep it inside the workflow.
When tasks run in sequence and depend on each other's outputs.
When you want to keep your workflow clean by sharing only necessary data between tasks.
Config File - my_dag.py
my_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def task_one(ti):
    data = 'Hello from task one'
    ti.xcom_push(key='message', value=data)

def task_two(ti):
    message = ti.xcom_pull(key='message', task_ids='task_one')
    print(f'Received message: {message}')

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'example_xcom_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
)

task_one = PythonOperator(
    task_id='task_one',
    python_callable=task_one,
    dag=dag
)

task_two = PythonOperator(
    task_id='task_two',
    python_callable=task_two,
    dag=dag
)

task_one >> task_two

This DAG defines two tasks using PythonOperator. task_one pushes a message using ti.xcom_push. task_two pulls that message with ti.xcom_pull and prints it. The tasks run in order, so task_two receives data from task_one.

Commands
List all available DAGs to confirm your DAG file is loaded by Airflow.
Terminal
airflow dags list
Expected OutputExpected
example_xcom_dag
Trigger the DAG to start running the tasks and share data between them.
Terminal
airflow dags trigger example_xcom_dag
Expected OutputExpected
Created <DagRun example_xcom_dag @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
Check the logs of task_two to see the message it received from task_one.
Terminal
airflow tasks logs example_xcom_dag task_two --execution-date 2024-06-01T12:00:00+00:00
Expected OutputExpected
Received message: Hello from task one [2024-06-01 12:00:10,000] {logging_mixin.py:112} INFO - Running <TaskInstance: example_xcom_dag.task_two manual__2024-06-01T12:00:00+00:00 [success]>
--execution-date - Specify the exact run time of the DAG to get logs for that run
Key Concept

If you remember nothing else from this pattern, remember: use XComs to push data in one task and pull it in another to share information inside Airflow workflows.

Common Mistakes
Trying to share large files or big data directly through XComs.
XComs are meant for small pieces of data; large data can cause performance issues or failures.
Store large data in external storage like S3 or a database and pass only references or file paths via XComs.
Not specifying the correct task_id when pulling data with ti.xcom_pull.
If the task_id is wrong, the data won't be found and the task will get None or fail.
Always use the exact task_id of the task that pushed the data when calling ti.xcom_pull.
Assuming data pushed in one DAG run is available in another DAG run.
XCom data is tied to a specific DAG run and execution date; it does not persist across runs.
Use external storage or variables if you need data to persist beyond a single DAG run.
Summary
Define tasks that push and pull data using ti.xcom_push and ti.xcom_pull.
Trigger the DAG and verify data sharing by checking task logs.
Use XComs for small data sharing within the same DAG run and task sequence.