0
0
GCPcloud~10 mins

Dataflow for stream/batch processing in GCP - Commands & Configuration

Choose your learning style9 modes available
Introduction
Dataflow helps you process data as it arrives or in batches. It makes it easy to handle large amounts of data without managing servers.
When you want to analyze live data from sensors or user activity in real time.
When you need to clean and transform large files stored in cloud storage.
When you want to combine data from different sources and prepare it for reports.
When you want to build a pipeline that runs automatically on a schedule.
When you want to scale your data processing without worrying about infrastructure.
Config File - dataflow_pipeline.py
dataflow_pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class SimpleOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input', help='Input file to process')
        parser.add_argument('--output', help='Output file to write results')


def run():
    options = PipelineOptions()
    simple_options = options.view_as(SimpleOptions)

    with beam.Pipeline(options=options) as p:
        (p
         | 'ReadInput' >> beam.io.ReadFromText(simple_options.input)
         | 'FilterEmpty' >> beam.Filter(lambda line: line.strip() != '')
         | 'WriteOutput' >> beam.io.WriteToText(simple_options.output))

if __name__ == '__main__':
    run()

This Python file defines a simple Dataflow pipeline using Apache Beam.

  • SimpleOptions: Defines input and output file arguments.
  • run function: Creates the pipeline, reads lines from the input file, filters out empty lines, and writes the results to the output file.
  • This pipeline can run on Google Cloud Dataflow service or locally.
Commands
This command runs the Dataflow pipeline locally or on the cloud, reading from the input file and writing filtered results to the output location.
Terminal
python3 dataflow_pipeline.py --input gs://example-bucket/input.txt --output gs://example-bucket/output/result
Expected OutputExpected
INFO:apache_beam.runners.direct.direct_runner:Starting pipeline execution INFO:apache_beam.runners.direct.direct_runner:Pipeline execution complete
--input - Specifies the input file path in Google Cloud Storage.
--output - Specifies the output file prefix in Google Cloud Storage.
Lists all Dataflow jobs in your Google Cloud project to check the status of running or completed pipelines.
Terminal
gcloud dataflow jobs list
Expected OutputExpected
JOB_ID NAME TYPE CREATION_TIME STATE 2023-06-01_01_23_45 dataflow_pipeline.py Batch 2023-06-01T01:23:45Z Running
Shows detailed information about a specific Dataflow job, including its current state and any errors.
Terminal
gcloud dataflow jobs describe 2023-06-01_01_23_45
Expected OutputExpected
id: 2023-06-01_01_23_45 name: dataflow_pipeline.py type: Batch currentState: Running startTime: 2023-06-01T01:23:45Z
Key Concept

If you remember nothing else from this pattern, remember: Dataflow lets you build pipelines that process data continuously or in batches without managing servers.

Common Mistakes
Using local file paths instead of cloud storage paths for input/output when running on Dataflow service.
Dataflow service cannot access local files on your computer; it requires files in cloud storage.
Always use Google Cloud Storage paths starting with gs:// for input and output files when running on Dataflow.
Not specifying pipeline options correctly, causing the job to run locally instead of on the cloud.
Without proper options, the pipeline may not use the Dataflow runner and won't scale or run in the cloud.
Use PipelineOptions to specify the Dataflow runner and project details when running on Google Cloud.
Summary
Create a Python pipeline file that reads, processes, and writes data using Apache Beam.
Run the pipeline with input and output paths pointing to Google Cloud Storage.
Use gcloud commands to list and check the status of Dataflow jobs.