0
0
Apache Airflowdevops~30 mins

GCP operators (BigQuery, GCS, Dataflow) in Apache Airflow - Mini Project: Build & Apply

Choose your learning style9 modes available
Using GCP Operators in Airflow to Manage Data Pipelines
📖 Scenario: You work as a data engineer managing data pipelines on Google Cloud Platform (GCP). You want to automate tasks like uploading files to Google Cloud Storage (GCS), running queries in BigQuery, and launching Dataflow jobs using Apache Airflow.
🎯 Goal: Build an Airflow DAG that uses GCP operators to upload a file to GCS, run a BigQuery SQL query, and start a Dataflow job.
📋 What You'll Learn
Create a Python dictionary with GCP connection details
Define a GCS bucket name variable
Use the GCSCreateBucketOperator to create a bucket
Use the BigQueryExecuteQueryOperator to run a SQL query
Use the DataflowCreatePythonJobOperator to launch a Dataflow job
💡 Why This Matters
🌍 Real World
Automating cloud data workflows is common in data engineering to ensure reliable and repeatable data processing.
💼 Career
Knowing how to use Airflow with GCP operators is valuable for roles involving cloud data pipelines and workflow orchestration.
Progress0 / 4 steps
1
Set up GCP connection details dictionary
Create a Python dictionary called gcp_conn with these exact entries: 'project_id': 'my-gcp-project', 'location': 'us-central1', and 'gcp_conn_id': 'google_cloud_default'.
Apache Airflow
Need a hint?

Use curly braces to create a dictionary and include the exact keys and values as strings.

2
Define the GCS bucket name variable
Create a variable called bucket_name and set it to the string 'my-data-bucket'.
Apache Airflow
Need a hint?

Assign the exact string to the variable bucket_name.

3
Create a GCS bucket using GCSCreateBucketOperator
Write a line that creates a variable called create_bucket and assigns it to a GCSCreateBucketOperator with these parameters: task_id='create_bucket', bucket_name=bucket_name, project_id=gcp_conn['project_id'], and gcp_conn_id=gcp_conn['gcp_conn_id'].
Apache Airflow
Need a hint?

Import the operator and use the exact parameter names and values.

4
Add BigQuery and Dataflow operators to the DAG
Add two variables: run_query assigned to BigQueryExecuteQueryOperator with task_id='run_query', sql='SELECT 1', use_legacy_sql=false, and gcp_conn_id=gcp_conn['gcp_conn_id']; and start_dataflow assigned to DataflowCreatePythonJobOperator with task_id='start_dataflow', py_file='gs://my-data-bucket/dataflow_job.py', location=gcp_conn['location'], and gcp_conn_id=gcp_conn['gcp_conn_id'].
Apache Airflow
Need a hint?

Import both operators and use the exact parameters as shown.