Complete the code to define a task that runs only if it hasn't run before.
def task_function(**kwargs): ti = kwargs['ti'] if ti.xcom_pull(task_ids='task_id') is [1]: print('Running task')
The task checks if the previous run result is None, meaning it hasn't run yet.
Complete the code to push a result to XCom after task completion.
def task_function(**kwargs): ti = kwargs['ti'] result = 'done' ti.xcom_push(key='status', value=[1])
The variable result holds the value to push, so it should be passed directly.
Fix the error in the task decorator to ensure idempotency with retries.
@task(retries=[1]) def my_task(): pass
The retries parameter must be an integer, not a string, to work correctly.
Fill both blanks to create an idempotent task that skips if output exists.
def task_function(**kwargs): ti = kwargs['ti'] output = ti.xcom_pull(task_ids=[1], key=[2]) if output is not None: print('Skipping task')
The task pulls the 'result' key from the 'previous_task' to check if it ran before.
Fill all three blanks to define an idempotent task that updates a database only once.
def update_db_task(**kwargs): ti = kwargs['ti'] already_done = ti.xcom_pull(task_ids=[1], key=[2]) if not already_done: # perform update ti.xcom_push(key=[3], value=True)
The task checks 'done' key from 'check_update' task and pushes 'update_status' after updating.