0%

Go Concurrency programming Pipeline

参考: Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language

Pipeline是并发编程之中的官方推荐范式. 本文将通过Demo形式介绍一个并发编程范式: pipeline, 与相关适配模式: fan-in, fan-out, cancellation.

Go并发原语使得编码人员可以轻易的构建数据流管道pipeline从而更有效的利用I/O, CPU等资源.

定义

pipeline并没有一个标准定义, 只是一种并发编程范式. pipeline指的是将复杂的工作拆分为多个阶段stage, 每个stage可以视为一个函数, 内部逻辑由多个或一个goroutine构成; 这些goroutine可以拆分为三步:

  1. 通过inbound-channels接收数据;
  2. 操作数据,达到业务目的;
  3. 发送结果到outbound-channels.

那么顾名思义: 第一个stage, 仅包含有outbound-channels, 称为producer; 最后一个stage, 仅包含有inbound-channels, 称为consumer; 其余中间阶段一般同时具备inbound-channels, outbound-channels.

经过上面的定义, 我们可以大概明确pipeline的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// CompleteWork 复杂工作拆分为三个阶段并发完成
func CompleteWork(sourceData Data) ResultData {
ch1 := startStage(sourceData)
ch2 := middleStage(ch1)
resultData := endStage(ch2)
return resultData
}

// startStage 第一阶段并发处理源数据, 生成中间数据1
func startStage(sourceData Data) <-chan Data {
var outbound chan Data
go func(sourceData Data) {
// operate sourceData
outbound <- operate(sourceData)
}()
return
}

// middleStage 第二阶段并发处理中间数据1, 生成中间数据2
func middleStage(inbound <-chan Data) <-chan Data2 {
var outbound chan Data2
go func(inbound chan Data) {
// operate inbound
outbound <- operate(sourceData)
}()
return
}

// endStage 第三阶段并发处理中间数据2, 生成最终结果
func endStage(inbound <-chan Data2) ResultData {
var result ResultData
go func(inbound chan Data2) {
// operate inbound
result <- operate(inbound)
}()
return result
}

下面给将通过Demo方式展示其过程, 以便于更好理解;

Demo

我们以数字的平方为例, 给定一个数据源[]int, 要求输出每个数字元素的平方;

Step1: basic

那可以分为两个stage:

  • stage1 将数据源[]int发送到channel: ch1中;

  • stage2读取ch1并将结果发送到channel: ch2中;

  • stage3读取ch2并打印输出即可;

  • Stage1:

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func Gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
    for _, n := range nums {
    fmt.Printf("Send value: %v to channel \n", n)
    out <- n
    }
    close(out)
    }()
    return out
    }

    stage1是一个函数, 其将源数据nums转换到channel: out中. 在函数内部, 首先定义了一个outbound-channel: out, 随后启用一个goroutine将数字sendout, 同时通过fmt输出方便用户了解执行过程, 在完成发送完毕后, 通过close关闭channel: out;

  • Stage2:

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func Sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
    for n := range in {
    fmt.Printf("Receive value: %v from channel \n", n)
    out <- n * n
    }
    close(out)
    }()
    return out
    }

    stage2是一个函数, 其接受一个inbound-channel: in, 并返回平方结果outbound-channel: out. 在函数内部, 首先定义了一个outbound-channel: out, 随后启用一个goroutine将读取数据平方后发送发送到out中, 同时通过fmt输出方便用户了解执行过程, 在完成发送完毕后, 通过close关闭channel: out;

stage定义完毕后, 我这边通过单测的方式实现数字的平方业务逻辑: logic:

1
2
3
4
5
6
7
8
9
10
11
func TestPipeline(t *testing.T) {
// Set up pipeline
// Stage1
ch1 := Gen(1, 2, 3, 4, 5, 6, 7, 8, 9)
// Stage2
out := Sq(ch1)
// Stage3: Consume result
for o := range out {
fmt.Printf("Got result: %v \n", o)
}
}

通过单测可以查看实现效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@VM-0-9-centos base]# go test -v .
=== RUN TestPipeline
Send value: 1 to channel
Send value: 2 to channel
Receive value: 1 from channel
Receive value: 2 from channel
Send value: 3 to channel
Got result: 1
Got result: 4
Receive value: 3 from channel
Got result: 9
--- PASS: TestPipeline (0.00s)
PASS
ok learngo/pkg/concurrency/pipeline/base 0.002s

由于并发的不确定性, 上面的fmt输出结果不完全固定, 但是可以明确的是: Gen, Sq, logic三个goroutine是并发的, 这有效利用了CPU.

这就是pipeline.

Step2: with fan-in fan-out

在分布式系统中, 每一个Worker都可以占用CPU & I/O, 为了更有效的利用其优势, 可以同时发起多个函数, 读取同一个channel,直到channel关闭, 这种方式称为: fan-out;

相对的, 一个函数可以同时读取多个inbound-channels: ins, 将结果输出到一个outbound-channel: out, 直到所有的ins均已关闭, 这种方式称为fan-in;

业务代码logic部分, 调整为分布式处理模式(fan-in, fan-out):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestPipeline(t *testing.T) {
// Set up pipeline
// Stage1
ch1 := Gen(1, 2, 3)
// Stage2
// fan-out: Distribute the work across two goroutines
out1 := Sq(ch1)
out2 := Sq(ch1)
// Stage3: Consume result
// fan-in: Consume fan-in result
for o := range merge(out1, out2) {
fmt.Printf("Got result: %v \n", o)
}
}
  • fan-out: 这里假设将工作分为两个worker处理, 每个worker逻辑一摸一样无需修改代码, 每个worker读取同一个channel: ch1;

  • fan-in: 定义了一个新函数merge, 其将读取channle列表, 将结果输出到同一个result channle中:代码示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func merge(ins ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
    for n := range c {
    out <- n
    }
    wg.Done()
    }

    wg.Add(len(ins))
    for _, c := range ins {
    go output(c)
    }
    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    go func() {
    wg.Wait()
    close(out)
    }()
    return out
    }

    merge中, 首先定义一个outbound-channel: out用来接收所有ins结果. 随后采用goroutine方式实现并发读取入参channel操作, 采用goroutine方式实现并发关闭channel out操作;由于向closed channel out发送数据会导致panic, 这里采用了wg sync.WaitGroup进行并发同步控制.

    每一个inbound-channel采用goroutine实现:

    1
    2
    3
    4
    5
    6
    func (c <-chan int) {
    for n:= range c {
    out <- n
    }
    wg.Done()
    }

    对于范式中需要注意的是:

    • 在所有Send操作完毕后, 需要close outbound-channel; 通过wg机制实现.
    • inbound-channel close前, 需要持续receive value. 通过for range机制实现.

Step3: with cancellation

上面的例子中存在缺陷: 采用了unbuffered channel 是一种阻塞通道. 也就是说如果某一步stage不达预期的话, 会导致相关goroutine阻塞, 从而造成资源泄露, GC机制无法回收阻塞的goroutine;

并且在实际应用过程中, 某些值也许具备特殊含义, 例如0 == Error, 这也需要相关协程均进行销毁操作;下面是一个阻塞例子:

1
2
3
4
5
6
7
8
9
10
11
func TestPipeline(t *testing.T) {
ch1 := Gen(1, 2, 3)
// fan-out: Distribute the work across two goroutines
out1 := Sq(ch1)
out2 := Sq(ch1)
// fan-in: Consume fan-in result
result := merge(out1, out2)
// WARNING!!!!! Resource Leak
fmt.Printf("Got result: %v \n", <-result)
return
}

在上面的例子中, 仅消费了一次, 会导致Gen, Sq*2, Merge内部的协程均挂起;

需要有一个机制进行取消信号的广播, 告知所有的协程, 该退出了.

那么该如何处理呢? 那就需要pipeline stage添加Cancel Channel 作为入参;并结合Select语句, 同时感知多个channel操作:

例如: Sq函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}

相比较于之前函数, 在入参列表中添加了 done channel用于标识是否已外部已完成操作, 内部采用select同时监控out channel & done channel operation, 如果监控到done动作, 则return, 销毁此goroutine;

对于logic中则需要定义done channel, 并在退出前发送done信号:

1
2
3
4
5
func TestPipeline(t *testing.T) {
done := make(chan struct{})
defer close(done)
....
}

如上所示, 这里采用close操作, 是因为从closed channel获取值, 会默认获得类型零值. 也就是说close是一种广播操作;

整体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package cancel

import (
"fmt"
"testing"
)

func TestPipeline(t *testing.T) {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)

in := gen(done, 2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)

// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// done will be closed by the deferred call.
}
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

结论

本文通过Demo方式展示了pipeline的并发编程范式, 并一步步展示了fan-in, fan-out, cancellation的适配结果, 逐步完善了pipeline工作模式. 期望可以在日常应用中, 不断实践, 并总结更优雅的并发编程范式.