36.3 Pipeline Pattern: Chaining Goroutine Stages
Right, let’s talk about the pipeline pattern. You’ve probably got some data that needs a series of operations performed on it: fetch it, process it, filter it, transform it, store it. You could write one big, gnarly function that does all of that. But then you’d have a monster that’s impossible to test, reason about, or modify without breaking three other things. We’re better than that.
The pipeline pattern is our escape hatch. We break that big process into discrete stages, each a separate goroutine, connected by channels. Data flows in one end, gets worked on, and flows out the other. It’s like an assembly line for your data, and it’s one of the most elegant ways to structure concurrent programs in Go. It makes your code modular, testable, and frankly, a joy to work with.
The Basic Blueprint
A stage is just a function that does three things:
- It takes an inbound channel.
- It does some work (your actual logic).
- It sends the result to an outbound channel.
When you close the inbound channel, the stage processes all remaining values and then closes its outbound channel. This signals to the next stage that it’s done. This chain of closing channels is how a graceful shutdown propagates through the entire pipeline.
Here’s a dead-simple example. Let’s make a pipeline that takes integers, multiplies them, and then adds to them.
package main
import "fmt"
// gen: First stage, a "generator". Turns a slice of ints into a channel of ints.
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out) // Crucial: we close when we're done sending.
}()
return out
}
// sq: Second stage, squares the numbers.
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in { // Loops until 'in' is closed and drained.
out <- n * n
}
close(out) // Again, close our outbound channel when we're done.
}()
return out
}
// add: Third stage, adds a value.
func add(in <-chan int, additive int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + additive
}
close(out)
}()
return out
}
func main() {
// Set up the pipeline. It's just a chain of function calls.
c := gen(2, 3, 4)
out := add(sq(c), 10)
// Consume the output. The range will break when 'out' is closed.
for result := range out {
fmt.Println(result) // Prints: 14 (2*2+10), 19 (3*3+10), 26 (4*4+10)
}
}
The beauty here is in the composition. The main function is incredibly readable. It’s just add(sq(gen(...)), 10). We can easily swap stages, add new ones, or test each function in isolation by just giving it a channel and reading its output. This is the core of the pattern.
Fan-Out, Fan-In: The Real Superpower
Sometimes one stage is a bottleneck. Maybe sq involves a incredibly complex calculation. Why should one goroutine do all the work? We can fan-out: start multiple copies of the same stage, all reading from the same input channel. Now we’re processing items in parallel.
But then we have multiple output channels. We need to fan-in back to a single channel to feed the next stage. This sounds complicated, but Go makes it straightforward with a handy little pattern using sync.WaitGroup.
// merge: A fan-in function to merge multiple channels into one.
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Define an output function for each input channel.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Start a goroutine to close out once all input channels are done.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3, 4, 5, 6, 7, 8)
// Fan-out: Distribute the work across two sq goroutines.
c1 := sq(in)
c2 := sq(in) // Both c1 and c2 read from the same 'in' channel.
// Fan-in: Merge the results from c1 and c2 into a single channel.
results := merge(c1, c2)
for result := range results {
fmt.Println(result) // Order will be non-deterministic! 4, 9, 16, 25, 36, 49, 64
}
}
Notice how the order of results is completely jumbled? That’s the point! We’re processing as fast as possible, and the results are emitted as they’re ready. If you need to preserve order, a pipeline is the wrong tool for that particular job.
Pitfalls and the Art of Cleanup
Here’s where most people get tripped up, and the official docs are a bit too quiet about it: resource leaks. Goroutines are cheap, but leaking hundreds of them because a channel was never drained will eventually crash your program.
The golden rule: A stage should always close its outbound channel when it has finished sending all values AND always drain its inbound channel until it’s closed. The for n := range in loop handles the draining part for you beautifully. If you break out of that loop early, you must ensure the channel is fully drained, or the sender will be blocked forever.
Another common “gotcha” is not knowing when to stop. What if a stage encounters an error? The typical idiom is to have a separate error channel (usually chan error) and a struct for results that can also contain an error. The receiving end then has to check both the result channel and the error channel. It’s a bit more boilerplate, but it’s the price of robustness.
So, use pipelines. They transform a tangled mess of concurrency into a clean, composable, and powerfully concurrent data flow. Just remember to always signal when you’re done by closing your channels. It’s the polite thing to do in the world of goroutines.