0%

Go Concurrency Programming Context

参考: Go Concurrency Patterns: Context - The Go Programming Language

本文将简单介绍下context pkg内的关键类型与函数, 随后通过两个用例的方式展示context在并发编程中的使用场景;

缘由

Server端一次请求的发生, 往往需要启动新的goroutine, 例如用来请求数据库或RPC. 这就提出了一些合理的需求: 有时需要对goroutine进行生命周期的控制, 例如超过了预期时间则视为请求失败, 应当销毁相关goroutine; 有时需要传递请求相关的内容, 例如权限信息, Trace-ID等.

基于此Google提出了context包, 在并发场景下实现请求生命周期控制;

定义

Context是用来在诸多请求相关的goroutine间传递信息的工具, 一般用于goroutine生命周期管理; 其可以:

  • 支持手动触发取消cancel信号: 信号会传播给所有的derived context;
  • 支持设置超时时间: 超时后, 完成信号自动传播给所有derived context;
  • 支持传递request-scope值: 传递Trace-ID, user-ID等请求相关的数据;
1
2
3
// Package context defines the Context type, which carries deadlines,
// cancellation signals, and other request-scoped values across API boundaries
// and between processes.

Context Typecontext包的核心类型, 结合源码, 可以有更好的理解:

  • Context接口定义:

    1
    2
    3
    4
    5
    6
    7
    type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key any) any
    }

    • Deadline(): 返回当前context的终止时间; 如果ok == false, 标识没有设置终止时间;

    • Done(): 返回一个channel用于标识当前context是否是终止状态, 终止状态的设置有三种:

      • cancel: 调用WithCancel(ctx)获得的cancel函数, 会传播cancel signal到所有derived context中;
      • deadline: 调用WithDeadline(ctx, time.Time), 会在时间到达deadline后, 自动传播deadline signal到所有derived context中;
      • timeout: 调用WithTimeout(ctx, time.Time)会在时间到达timeout后, 自动传播deadline signal到所有derived context中;

      一般对于并发编程中, 结合select语句使用更佳, 示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      //  func Stream(ctx context.Context, out chan<- Value) error {
      // for {
      // v, err := DoSomething(ctx)
      // if err != nil {
      // return err
      // }
      // select {
      // case <-ctx.Done():
      // return ctx.Err()
      // case out <- v:
      // }
      // }
      // }
    • Error: 一般在ctx终止后, 用于传递错误信息;

    • Value: 用于传递request-scope值;

  • Context关键方法:

    • Backgroud() Context: 返回一个空Context, 用于根context用处;
    • WithCancel(parent Context) (ctx Context, cancel CancelFunc): 接收一个parent context返回一个child contextcancel函数; 调用cancel函数就可以将child context与其衍生context发送取消信号;
    • WithDeadline(parent Context, d time.Time) (Context, CancelFunc): 接收一个parent context与超时时间(绝对时间)d time.Time, 返回一个child contextcancel函数; 用户可以手动调用cancel发送取消信号, 或者超时后child context会自动发送超时信号;
    • WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc): 接收一个parent context与时间间隔(相对时间)timeout time.Duration, 返回一个child contextcancel函数; 用户可以手动调用cancel发送取消信号, 或者超时后child context会自动发送超时信号;
    • WithValue(parent Context, key, val any) Context: 会将key value添加到parent context后返回一个新的context, 用于传递请求相关的数据;

使用

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"context"
"fmt"
"time"
)

func main() {
parentCtx := context.Background()

// Using WithTimeout
timeoutCtx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
defer cancel()

// Using WithDeadline
deadlineCtx, cancel := context.WithDeadline(parentCtx, time.Now().Add(2*time.Second))
defer cancel()

// Simulating long-running operations
go func() {
select {
case <-timeoutCtx.Done():
fmt.Println("Timeout Context canceled")
}
}()

go func() {
select {
case <-deadlineCtx.Done():
fmt.Println("Deadline Context canceled")
}
}()

// Waiting for a while to observe the difference
time.Sleep(3 * time.Second)
}

这里给出了一段示例代码, 其通过BackGroud方法获得了root ctx, 随后分别使用了WithTimeout, WithDeadLine设置了两个带超时信号的child ctx; 通过go 关键字启动了两个协程, 协程内分别通过select语句监控ctx终止信号, 并输出打印信息; 通过运行可以发现, 在2 s钟后两个goroutine均输出, 并终止;

Example

本部分将展示pipeline并发编程模式下, 通过context发送取消信号, 从而保证在主业务退出后, 所有goroutine正常销毁;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package cancel

import (
"context"
"fmt"
"sync"
)

func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
fmt.Printf("stage one send : %v \n", n)
case <-ctx.Done():
fmt.Printf("OH CANCEL STAGE ONE \n")
return
}
}
}()
return out
}

func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
fmt.Printf("stage two send : %v \n", n*n)
case <-ctx.Done():
fmt.Printf("OH CANCEL STAGE TWO \n")
return
}
}
}()
return out
}

func merge(ctx context.Context, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
fmt.Printf("OH CANCEL STAGE THREE \n")
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

func TestPipeline(t *testing.T) {
parentCtx := context.Background()
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

in := gen(ctx, 2, 3, 4, 5)
c1 := sq(ctx, in)
c2 := sq(ctx, in)
out := merge(ctx, c1, c2)
fmt.Println(<-out)
}

在上面的示例代码中, 主业务逻辑TestPipeline, 通过WithCancel获得了用于发送取消信号的ctx, 随后通过defer cancel()保证在业务逻辑退出后, 向所有相关协程发送取消信号, 防止资源泄露;

结论

context 是用于控制请求生命周期的工具, 其天然适配并发编程. 在实际编码中, 形成了约定俗称的签名限制: 函数第一个入参均为ctx context.Context, 使得跨API的访问中, 交互更加方便;