参考: Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language
Pipeline
是并发编程之中的官方推荐范式. 本文将通过Demo
形式介绍一个并发编程范式: pipeline
, 与相关适配模式: fan-in, fan-out, cancellation
.
Go
并发原语使得编码人员可以轻易的构建数据流管道pipeline
从而更有效的利用I/O, CPU
等资源.
定义
pipeline
并没有一个标准定义, 只是一种并发编程范式. pipeline
指的是将复杂的工作拆分为多个阶段stage
, 每个stage
可以视为一个函数, 内部逻辑由多个或一个goroutine
构成; 这些goroutine
可以拆分为三步:
- 通过
inbound-channels
接收数据; - 操作数据,达到业务目的;
- 发送结果到
outbound-channels
.
那么顾名思义: 第一个stage
, 仅包含有outbound-channels
, 称为producer
; 最后一个stage
, 仅包含有inbound-channels
, 称为consumer
; 其余中间阶段一般同时具备inbound-channels, outbound-channels
.
经过上面的定义, 我们可以大概明确pipeline
的伪代码如下:
1 | // CompleteWork 复杂工作拆分为三个阶段并发完成 |
下面给将通过Demo
方式展示其过程, 以便于更好理解;
Demo
我们以数字的平方
为例, 给定一个数据源[]int
, 要求输出每个数字元素的平方;
Step1: basic
那可以分为两个stage
:
stage1
将数据源[]int
发送到channel: ch1
中;stage2
读取ch1
并将结果发送到channel: ch2
中;stage3
读取ch2
并打印输出即可;Stage1
:示例代码:
1
2
3
4
5
6
7
8
9
10
11func Gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
fmt.Printf("Send value: %v to channel \n", n)
out <- n
}
close(out)
}()
return out
}stage1
是一个函数, 其将源数据nums
转换到channel: out
中. 在函数内部, 首先定义了一个outbound-channel: out
, 随后启用一个goroutine
将数字send
到out
, 同时通过fmt
输出方便用户了解执行过程, 在完成发送完毕后, 通过close
关闭channel: out
;Stage2
:示例代码:
1
2
3
4
5
6
7
8
9
10
11func Sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
fmt.Printf("Receive value: %v from channel \n", n)
out <- n * n
}
close(out)
}()
return out
}stage2
是一个函数, 其接受一个inbound-channel: in
, 并返回平方结果outbound-channel: out
. 在函数内部, 首先定义了一个outbound-channel: out
, 随后启用一个goroutine
将读取数据平方后发送发送到out
中, 同时通过fmt
输出方便用户了解执行过程, 在完成发送完毕后, 通过close
关闭channel: out
;
stage
定义完毕后, 我这边通过单测的方式实现数字的平方
业务逻辑: logic
:
1 | func TestPipeline(t *testing.T) { |
通过单测可以查看实现效果:
1 | [root@VM-0-9-centos base]# go test -v . |
由于并发的不确定性, 上面的fmt
输出结果不完全固定, 但是可以明确的是: Gen
, Sq
, logic
三个goroutine
是并发的, 这有效利用了CPU
.
这就是pipeline.
Step2: with fan-in fan-out
在分布式系统中, 每一个Worker
都可以占用CPU & I/O
, 为了更有效的利用其优势, 可以同时发起多个函数, 读取同一个channel
,直到channel
关闭, 这种方式称为: fan-out
;
相对的, 一个函数可以同时读取多个inbound-channels: ins
, 将结果输出到一个outbound-channel: out
, 直到所有的ins
均已关闭, 这种方式称为fan-in
;
业务代码logic
部分, 调整为分布式处理模式(fan-in, fan-out
):
1 | func TestPipeline(t *testing.T) { |
fan-out
: 这里假设将工作分为两个worker
处理, 每个worker
逻辑一摸一样无需修改代码
, 每个worker
读取同一个channel: ch1
;fan-in
: 定义了一个新函数merge
, 其将读取channle
列表, 将结果输出到同一个result channle
中:代码示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func merge(ins ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(ins))
for _, c := range ins {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}在
merge
中, 首先定义一个outbound-channel: out
用来接收所有ins
结果. 随后采用goroutine
方式实现并发读取入参channel
操作, 采用goroutine
方式实现并发关闭channel out
操作;由于向closed channel out
发送数据会导致panic
, 这里采用了wg sync.WaitGroup
进行并发同步控制.每一个
inbound-channel
采用goroutine
实现:1
2
3
4
5
6func (c <-chan int) {
for n:= range c {
out <- n
}
wg.Done()
}对于范式中需要注意的是:
- 在所有
Send
操作完毕后, 需要close outbound-channel
; 通过wg
机制实现. - 在
inbound-channel close
前, 需要持续receive value
. 通过for range
机制实现.
- 在所有
Step3: with cancellation
上面的例子中存在缺陷: 采用了unbuffered channel
是一种阻塞通道. 也就是说如果某一步stage
不达预期的话, 会导致相关goroutine
阻塞, 从而造成资源泄露, GC
机制无法回收阻塞的goroutine
;
并且在实际应用过程中, 某些值也许具备特殊含义, 例如0 == Error
, 这也需要相关协程均进行销毁操作;下面是一个阻塞例子:
1 | func TestPipeline(t *testing.T) { |
在上面的例子中, 仅消费了一次, 会导致Gen, Sq*2, Merge
内部的协程均挂起;
需要有一个机制进行取消信号的广播, 告知所有的协程, 该退出了.
那么该如何处理呢? 那就需要pipeline stage
添加Cancel Channel
作为入参;并结合Select
语句, 同时感知多个channel
操作:
例如: Sq
函数:
1 | func sq(done <-chan struct{}, in <-chan int) <-chan int { |
相比较于之前函数, 在入参列表中添加了 done channel
用于标识是否已外部已完成操作, 内部采用select
同时监控out channel & done channel operation
, 如果监控到done
动作, 则return
, 销毁此goroutine
;
对于logic
中则需要定义done channel
, 并在退出前发送done
信号:
1 | func TestPipeline(t *testing.T) { |
如上所示, 这里采用close
操作, 是因为从closed channel
获取值, 会默认获得类型零值. 也就是说close
是一种广播操作;
整体代码如下:
1 | package cancel |
结论
本文通过Demo
方式展示了pipeline
的并发编程范式, 并一步步展示了fan-in, fan-out, cancellation
的适配结果, 逐步完善了pipeline
工作模式. 期望可以在日常应用中, 不断实践, 并总结更优雅的并发编程范式.