参考: Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language
Pipeline是并发编程之中的官方推荐范式. 本文将通过Demo形式介绍一个并发编程范式: pipeline, 与相关适配模式: fan-in, fan-out, cancellation.
Go并发原语使得编码人员可以轻易的构建数据流管道pipeline从而更有效的利用I/O, CPU等资源.
定义
pipeline并没有一个标准定义, 只是一种并发编程范式. pipeline指的是将复杂的工作拆分为多个阶段stage, 每个stage可以视为一个函数, 内部逻辑由多个或一个goroutine构成; 这些goroutine可以拆分为三步:
- 通过
inbound-channels接收数据; - 操作数据,达到业务目的;
- 发送结果到
outbound-channels.
那么顾名思义: 第一个stage, 仅包含有outbound-channels, 称为producer; 最后一个stage, 仅包含有inbound-channels, 称为consumer; 其余中间阶段一般同时具备inbound-channels, outbound-channels.
经过上面的定义, 我们可以大概明确pipeline的伪代码如下:
1 | // CompleteWork 复杂工作拆分为三个阶段并发完成 |
下面给将通过Demo方式展示其过程, 以便于更好理解;
Demo
我们以数字的平方为例, 给定一个数据源[]int, 要求输出每个数字元素的平方;
Step1: basic
那可以分为两个stage:
stage1将数据源[]int发送到channel: ch1中;stage2读取ch1并将结果发送到channel: ch2中;stage3读取ch2并打印输出即可;Stage1:示例代码:
1
2
3
4
5
6
7
8
9
10
11func Gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
fmt.Printf("Send value: %v to channel \n", n)
out <- n
}
close(out)
}()
return out
}stage1是一个函数, 其将源数据nums转换到channel: out中. 在函数内部, 首先定义了一个outbound-channel: out, 随后启用一个goroutine将数字send到out, 同时通过fmt输出方便用户了解执行过程, 在完成发送完毕后, 通过close关闭channel: out;Stage2:示例代码:
1
2
3
4
5
6
7
8
9
10
11func Sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
fmt.Printf("Receive value: %v from channel \n", n)
out <- n * n
}
close(out)
}()
return out
}stage2是一个函数, 其接受一个inbound-channel: in, 并返回平方结果outbound-channel: out. 在函数内部, 首先定义了一个outbound-channel: out, 随后启用一个goroutine将读取数据平方后发送发送到out中, 同时通过fmt输出方便用户了解执行过程, 在完成发送完毕后, 通过close关闭channel: out;
stage定义完毕后, 我这边通过单测的方式实现数字的平方业务逻辑: logic:
1 | func TestPipeline(t *testing.T) { |
通过单测可以查看实现效果:
1 | [root@VM-0-9-centos base]# go test -v . |
由于并发的不确定性, 上面的fmt输出结果不完全固定, 但是可以明确的是: Gen, Sq, logic三个goroutine是并发的, 这有效利用了CPU.
这就是pipeline.
Step2: with fan-in fan-out
在分布式系统中, 每一个Worker都可以占用CPU & I/O, 为了更有效的利用其优势, 可以同时发起多个函数, 读取同一个channel,直到channel关闭, 这种方式称为: fan-out;
相对的, 一个函数可以同时读取多个inbound-channels: ins, 将结果输出到一个outbound-channel: out, 直到所有的ins均已关闭, 这种方式称为fan-in;
业务代码logic部分, 调整为分布式处理模式(fan-in, fan-out):
1 | func TestPipeline(t *testing.T) { |
fan-out: 这里假设将工作分为两个worker处理, 每个worker逻辑一摸一样无需修改代码, 每个worker读取同一个channel: ch1;fan-in: 定义了一个新函数merge, 其将读取channle列表, 将结果输出到同一个result channle中:代码示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func merge(ins ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(ins))
for _, c := range ins {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}在
merge中, 首先定义一个outbound-channel: out用来接收所有ins结果. 随后采用goroutine方式实现并发读取入参channel操作, 采用goroutine方式实现并发关闭channel out操作;由于向closed channel out发送数据会导致panic, 这里采用了wg sync.WaitGroup进行并发同步控制.每一个
inbound-channel采用goroutine实现:1
2
3
4
5
6func (c <-chan int) {
for n:= range c {
out <- n
}
wg.Done()
}对于范式中需要注意的是:
- 在所有
Send操作完毕后, 需要close outbound-channel; 通过wg机制实现. - 在
inbound-channel close前, 需要持续receive value. 通过for range机制实现.
- 在所有
Step3: with cancellation
上面的例子中存在缺陷: 采用了unbuffered channel 是一种阻塞通道. 也就是说如果某一步stage不达预期的话, 会导致相关goroutine阻塞, 从而造成资源泄露, GC机制无法回收阻塞的goroutine;
并且在实际应用过程中, 某些值也许具备特殊含义, 例如0 == Error, 这也需要相关协程均进行销毁操作;下面是一个阻塞例子:
1 | func TestPipeline(t *testing.T) { |
在上面的例子中, 仅消费了一次, 会导致Gen, Sq*2, Merge内部的协程均挂起;
需要有一个机制进行取消信号的广播, 告知所有的协程, 该退出了.
那么该如何处理呢? 那就需要pipeline stage添加Cancel Channel 作为入参;并结合Select语句, 同时感知多个channel操作:
例如: Sq函数:
1 | func sq(done <-chan struct{}, in <-chan int) <-chan int { |
相比较于之前函数, 在入参列表中添加了 done channel用于标识是否已外部已完成操作, 内部采用select同时监控out channel & done channel operation, 如果监控到done动作, 则return, 销毁此goroutine;
对于logic中则需要定义done channel, 并在退出前发送done信号:
1 | func TestPipeline(t *testing.T) { |
如上所示, 这里采用close操作, 是因为从closed channel获取值, 会默认获得类型零值. 也就是说close是一种广播操作;
整体代码如下:
1 | package cancel |
结论
本文通过Demo方式展示了pipeline的并发编程范式, 并一步步展示了fan-in, fan-out, cancellation的适配结果, 逐步完善了pipeline工作模式. 期望可以在日常应用中, 不断实践, 并总结更优雅的并发编程范式.