0
0
Apache Airflowdevops~10 mins

GCP operators (BigQuery, GCS, Dataflow) in Apache Airflow - Commands & Configuration

Choose your learning style9 modes available
Introduction
When you want to automate tasks that use Google Cloud services like BigQuery, Cloud Storage, or Dataflow, you can use GCP operators in Airflow. These operators help you run jobs and move data without writing complex code for each service.
When you need to load data from Cloud Storage into BigQuery automatically every day.
When you want to run a Dataflow job to process streaming data as part of your workflow.
When you want to move files between different Cloud Storage buckets on a schedule.
When you want to create a BigQuery table or run a query as part of a data pipeline.
When you want to chain multiple Google Cloud tasks together in a reliable order.
Config File - gcp_data_pipeline_dag.py
gcp_data_pipeline_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'gcp_data_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

list_gcs_files = GCSListObjectsOperator(
    task_id='list_gcs_files',
    bucket='my-data-bucket',
    prefix='input/',
    dag=dag
)

run_bigquery_query = BigQueryInsertJobOperator(
    task_id='run_bigquery_query',
    configuration={
        'query': {
            'query': 'SELECT * FROM `my-project.my_dataset.my_table` LIMIT 10',
            'useLegacySql': False
        }
    },
    location='US',
    dag=dag
)

start_dataflow_job = DataflowStartFlexTemplateOperator(
    task_id='start_dataflow_job',
    body={
        'launchParameter': {
            'jobName': 'example-dataflow-job',
            'containerSpecGcsPath': 'gs://my-templates/dataflow-template.json',
            'parameters': {
                'inputFile': 'gs://my-data-bucket/input/data.csv',
                'outputTable': 'my-project:my_dataset.output_table'
            }
        }
    },
    location='us-central1',
    dag=dag
)

list_gcs_files >> run_bigquery_query >> start_dataflow_job

This Airflow DAG defines a simple data pipeline using GCP operators.

  • GCSListObjectsOperator lists files in a Cloud Storage bucket.
  • BigQueryInsertJobOperator runs a SQL query on BigQuery.
  • DataflowStartFlexTemplateOperator starts a Dataflow job from a Flex Template.
  • Tasks are linked to run in order: list files, then query BigQuery, then start Dataflow.
Commands
List all available DAGs to confirm the new GCP data pipeline DAG is recognized by Airflow.
Terminal
airflow dags list
Expected OutputExpected
gcp_data_pipeline example_dag
Manually trigger the GCP data pipeline DAG to start the workflow immediately.
Terminal
airflow dags trigger gcp_data_pipeline
Expected OutputExpected
Created <DagRun gcp_data_pipeline @ 2024-06-01T12:00:00+00:00: manual__2024-06-01T12:00:00+00:00, externally triggered: True>
List all tasks in the gcp_data_pipeline DAG to see the steps involved.
Terminal
airflow tasks list gcp_data_pipeline
Expected OutputExpected
list_gcs_files run_bigquery_query start_dataflow_job
Test the list_gcs_files task for the given date to verify it lists files from the GCS bucket correctly.
Terminal
airflow tasks test gcp_data_pipeline list_gcs_files 2024-06-01
Expected OutputExpected
[2024-06-01 12:00:00,000] {taskinstance.py:XXXX} INFO - Starting task: list_gcs_files [2024-06-01 12:00:01,000] {gcs.py:XXXX} INFO - Found files: ['input/data.csv', 'input/data2.csv'] [2024-06-01 12:00:02,000] {taskinstance.py:XXXX} INFO - Task list_gcs_files succeeded
Key Concept

If you remember nothing else from this pattern, remember: GCP operators in Airflow let you automate Google Cloud tasks easily by wrapping complex API calls into simple tasks.

Common Mistakes
Not setting the correct Google Cloud connection in Airflow before running GCP operators.
The operators need credentials to access GCP services; without a proper connection, tasks will fail with authentication errors.
Create and configure a Google Cloud connection in Airflow UI or via CLI with the right service account credentials before running the DAG.
Using legacy SQL syntax in BigQuery queries without setting 'useLegacySql' to False.
BigQuery defaults to legacy SQL which may cause query errors if standard SQL syntax is used.
Always set 'useLegacySql': False in the BigQueryInsertJobOperator configuration when using standard SQL.
Not specifying the correct location for Dataflow jobs or BigQuery queries.
GCP services require location to route requests; missing or wrong location causes job failures.
Set the 'location' parameter correctly matching your GCP resources region.
Summary
Use GCP operators in Airflow to automate tasks with BigQuery, Cloud Storage, and Dataflow.
Define a DAG with tasks that list GCS files, run BigQuery queries, and start Dataflow jobs.
Trigger and test the DAG using Airflow CLI commands to verify the workflow.