0
0
Apache Airflowdevops~20 mins

PythonOperator for custom logic in Apache Airflow - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
PythonOperator Mastery
Get all challenges correct to earn this badge!
Test your skills under time pressure!
💻 Command Output
intermediate
2:00remaining
Output of PythonOperator task execution

Given this Airflow DAG snippet, what will be the output logged when the python_task runs?

Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def greet():
    print('Hello from Airflow!')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('greet_dag', default_args=default_args, schedule_interval='@daily')

python_task = PythonOperator(
    task_id='greet_task',
    python_callable=greet,
    dag=dag
)
ATask greet_task failed due to missing return
Bgreet_task executed successfully
CNo output is logged by PythonOperator
DHello from Airflow!
Attempts:
2 left
💡 Hint

Think about what the print statement inside the python_callable does.

Configuration
intermediate
2:00remaining
Correct PythonOperator usage with arguments

Which option correctly passes arguments to a Python function using PythonOperator?

Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def add_numbers(a, b):
    print(a + b)

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('add_dag', default_args=default_args, schedule_interval='@daily')
APythonOperator(task_id='add_task', python_callable=add_numbers, op_kwargs={'a': 5, 'b': 3}, dag=dag)
BPythonOperator(task_id='add_task', python_callable=add_numbers(5, 3), dag=dag)
CPythonOperator(task_id='add_task', python_callable=add_numbers, args=(5, 3), dag=dag)
DPythonOperator(task_id='add_task', python_callable=add_numbers, op_args=[5, 3], dag=dag)
Attempts:
2 left
💡 Hint

Check how keyword arguments are passed to the callable in Airflow.

Troubleshoot
advanced
2:00remaining
Error raised by PythonOperator with missing callable

What error will Airflow raise if python_callable is set to None in PythonOperator?

Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('error_dag', default_args=default_args, schedule_interval='@daily')

python_task = PythonOperator(task_id='error_task', python_callable=None, dag=dag)
AAirflowSkipException: Task skipped due to None callable
BValueError: python_callable must be a callable
CTypeError: 'NoneType' object is not callable
DNo error, task runs with no operation
Attempts:
2 left
💡 Hint

Think about what happens when Python tries to call None as a function.

🔀 Workflow
advanced
2:00remaining
Order of execution with PythonOperator tasks

Given these two PythonOperator tasks in a DAG, which option shows the correct order of execution?

Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def task1():
    print('Task 1')

def task2():
    print('Task 2')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('order_dag', default_args=default_args, schedule_interval='@daily')

t1 = PythonOperator(task_id='task1', python_callable=task1, dag=dag)
t2 = PythonOperator(task_id='task2', python_callable=task2, dag=dag)

t1 >> t2
ATask 1 runs before Task 2
BTask 2 runs before Task 1
CTasks run in parallel with no order
DTask 2 runs twice, Task 1 once
Attempts:
2 left
💡 Hint

Look at the dependency operator >> between tasks.

Best Practice
expert
3:00remaining
Best practice for returning values from PythonOperator

Which option correctly captures the return value of a Python function executed by PythonOperator for use in downstream tasks?

Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import get_current_context
from datetime import datetime

def generate_value():
    return 42

def print_value():
    context = get_current_context()
    value = context['ti'].xcom_pull(task_ids='generate_task')
    print(f'Received value: {value}')

default_args = {'start_date': datetime(2024, 1, 1)}

dag = DAG('xcom_dag', default_args=default_args, schedule_interval='@daily')

generate_task = PythonOperator(task_id='generate_task', python_callable=generate_value, dag=dag)
print_task = PythonOperator(task_id='print_task', python_callable=print_value, dag=dag)
generate_task >> print_task
AStore value in a global variable accessible by both tasks
BUse return statement in the first task and pull with xcom_pull in the second task
CWrite value to a file in the first task and read it in the second task
DPass value as an argument directly to the second task's python_callable
Attempts:
2 left
💡 Hint

Think about how Airflow shares data between tasks.