How to Use XCom with TaskFlow API in Airflow
In Airflow's
TaskFlow API, use the @task decorator to create tasks that return values, which are automatically pushed to XCom. To pull data from another task, call the task function with .output to get the XCom value, enabling easy data sharing between tasks.Syntax
The TaskFlow API uses Python functions decorated with @task to define tasks. When a task function returns a value, Airflow automatically pushes it to XCom. To use the value in another task, call the first task function and access its .output attribute.
Key parts:
@task: marks a Python function as an Airflow task.- Return value: automatically stored in XCom.
.output: used to pull the XCom value from a task.
python
from airflow.decorators import dag, task from airflow.utils.dates import days_ago @dag(start_date=days_ago(1), schedule_interval='@daily') def example_dag(): @task def push_data(): return 'hello from XCom' @task def pull_data(data): print(f'Received: {data}') data = push_data() pull_data(data.output) example_dag_instance = example_dag()
Example
This example shows two tasks: one pushes a string to XCom by returning it, and the other pulls it by accessing .output. The second task prints the received value.
python
from airflow.decorators import dag, task from airflow.utils.dates import days_ago @dag(start_date=days_ago(1), schedule_interval='@daily', catchup=False) def xcom_taskflow_example(): @task def push_message(): return 'Data passed via XCom' @task def pull_message(message): print(f'Pulled message: {message}') msg = push_message() pull_message(msg.output) example_dag = xcom_taskflow_example()
Output
Pulled message: Data passed via XCom
Common Pitfalls
- Not returning a value from the task function means nothing is pushed to XCom.
- Trying to access XCom values without using
.outputon the task instance will fail. - Using
ti.xcom_pull()manually is unnecessary with TaskFlow API and can complicate code.
python
from airflow.decorators import dag, task from airflow.utils.dates import days_ago @dag(start_date=days_ago(1), schedule_interval='@daily', catchup=False) def wrong_xcom_usage(): @task def no_return(): print('No return, no XCom') @task def wrong_pull(): # This will fail because no_return() returns None print(no_return()) # Wrong: prints None, no XCom used no_return() wrong_pull() example_dag = wrong_xcom_usage()
Quick Reference
| Concept | Usage | Notes |
|---|---|---|
| @task decorator | Marks a function as a task | Required for TaskFlow API tasks |
| Return value | Returned value is pushed to XCom | Must return to share data |
| .output | Access XCom value from a task | Use on task function call |
| ti.xcom_pull() | Legacy way to pull XCom | Avoid with TaskFlow API |
| Task dependencies | Set by calling tasks in order | Ensures correct execution sequence |
Key Takeaways
Use @task decorator to create tasks that automatically push return values to XCom.
Access XCom data by calling the task function and using .output in downstream tasks.
Always return data from tasks to share it via XCom with TaskFlow API.
Avoid manual ti.xcom_pull() calls when using TaskFlow API for cleaner code.
Define task order by calling tasks in sequence to manage dependencies.