0
0
GoHow-ToBeginner · 4 min read

How to Implement Pipeline Pattern in Go: Simple Guide

In Go, you implement a pipeline by chaining functions that communicate via channels, where each stage processes data and passes it to the next. Use goroutines for each stage and channels to connect them, enabling concurrent and efficient data flow.
📐

Syntax

A pipeline in Go consists of multiple stages connected by channels. Each stage is a function that takes an input channel and returns an output channel. Inside, it runs a goroutine that reads from the input, processes data, and sends results to the output.

  • in <-chan T: input channel to receive data
  • out chan<- T: output channel to send processed data
  • Goroutine: runs the processing concurrently
go
func stage(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            out <- v * 2 // example processing
        }
    }()
    return out
}
💻

Example

This example shows a simple pipeline with three stages: generating numbers, doubling them, and converting to strings. Each stage runs concurrently and passes data through channels.

go
package main

import (
    "fmt"
    "strconv"
)

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

func toString(in <-chan int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for n := range in {
            out <- strconv.Itoa(n)
        }
    }()
    return out
}

func main() {
    nums := gen(1, 2, 3, 4)
    doubled := double(nums)
    strings := toString(doubled)

    for s := range strings {
        fmt.Println(s)
    }
}
Output
2 4 6 8
⚠️

Common Pitfalls

Common mistakes when implementing pipelines in Go include:

  • Not closing output channels, which causes downstream stages to hang.
  • Blocking on channel sends or receives without goroutines, leading to deadlocks.
  • Not using defer close(out) to ensure channels close properly.
  • Mixing buffered and unbuffered channels without understanding blocking behavior.
go
package main

func wrongStage(in <-chan int) <-chan int {
    out := make(chan int)
    // Missing goroutine causes deadlock
    for v := range in {
        out <- v * 2
    }
    close(out)
    return out
}

func rightStage(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            out <- v * 2
        }
    }()
    return out
}
📊

Quick Reference

  • Use channels to connect pipeline stages.
  • Run each stage in its own goroutine.
  • Always close output channels to signal completion.
  • Process data inside the goroutine loop.
  • Use unidirectional channels (<-chan and chan<-) for clarity.

Key Takeaways

Implement pipelines by chaining stages with channels and goroutines for concurrency.
Always close output channels to avoid blocking downstream stages.
Run processing loops inside goroutines to prevent deadlocks.
Use unidirectional channels to clarify data flow direction.
Test pipelines with simple examples to understand data flow.