How to Implement Pipeline Pattern in Go: Simple Guide
In Go, you implement a
pipeline by chaining functions that communicate via channels, where each stage processes data and passes it to the next. Use goroutines for each stage and channels to connect them, enabling concurrent and efficient data flow.Syntax
A pipeline in Go consists of multiple stages connected by channels. Each stage is a function that takes an input channel and returns an output channel. Inside, it runs a goroutine that reads from the input, processes data, and sends results to the output.
in <-chan T: input channel to receive dataout chan<- T: output channel to send processed data- Goroutine: runs the processing concurrently
go
func stage(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for v := range in { out <- v * 2 // example processing } }() return out }
Example
This example shows a simple pipeline with three stages: generating numbers, doubling them, and converting to strings. Each stage runs concurrently and passes data through channels.
go
package main import ( "fmt" "strconv" ) func gen(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out } func double(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * 2 } }() return out } func toString(in <-chan int) <-chan string { out := make(chan string) go func() { defer close(out) for n := range in { out <- strconv.Itoa(n) } }() return out } func main() { nums := gen(1, 2, 3, 4) doubled := double(nums) strings := toString(doubled) for s := range strings { fmt.Println(s) } }
Output
2
4
6
8
Common Pitfalls
Common mistakes when implementing pipelines in Go include:
- Not closing output channels, which causes downstream stages to hang.
- Blocking on channel sends or receives without goroutines, leading to deadlocks.
- Not using
defer close(out)to ensure channels close properly. - Mixing buffered and unbuffered channels without understanding blocking behavior.
go
package main func wrongStage(in <-chan int) <-chan int { out := make(chan int) // Missing goroutine causes deadlock for v := range in { out <- v * 2 } close(out) return out } func rightStage(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for v := range in { out <- v * 2 } }() return out }
Quick Reference
- Use channels to connect pipeline stages.
- Run each stage in its own goroutine.
- Always close output channels to signal completion.
- Process data inside the goroutine loop.
- Use unidirectional channels (
<-chanandchan<-) for clarity.
Key Takeaways
Implement pipelines by chaining stages with channels and goroutines for concurrency.
Always close output channels to avoid blocking downstream stages.
Run processing loops inside goroutines to prevent deadlocks.
Use unidirectional channels to clarify data flow direction.
Test pipelines with simple examples to understand data flow.