0%

Go Data Structure Source Code chan

引言

channelGo提供的用于goroutine间通信的数据结构, 是核心数据结构之一. 本文假设读者熟悉channel的使用方式, 将从源码的角度解读其数据结构, 并通过源码解释各个操作下channel内实现流程.

数据结构

源码位置: src/runtime/chan.go, channel的核心数据结构为: hchan.

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // channel 内的存量数据数量
dataqsiz uint // channel 的Size
buf unsafe.Pointer // 指向环形队列的指针, 环形队列存储了数据
elemsize uint16 // 元素大小, 如int64为8, 标识为8字节
closed uint32 // channel 状态, 0标识为未关闭, 否则为关闭
elemtype *_type // 元素类型信息
sendx uint // 环形队列send操作的Index, 初始化为0
recvx uint // 环形队列receive操作的Index, 初始化为0
recvq waitq // 阻塞的receiver goroutines 列表, 其为链表结构
sendq waitq // 阻塞的sender goroutines 列表, 其为链表结构
lock mutex // 互斥锁
}

上面是channel结构体定义,定义并不复杂, 其核心结构为buf: 环形队列. 下面i将通过操作示例的方式, 展示各种操作下的数据结构实例.

空channel初始化

初始化一个size == 0int32类型channel: make(chan int32):

初始化空channel

图片展示了channel实例, 简单解释下各个属性的赋值原因, 后面示例中不重复解释:

  • qcount = 0: 初始化后channel元素数量为0;
  • dataqsize = 0: 空channelsize默认为0, 且不会改变;
  • buf: 指向buf属性本身的地址;
  • elemsize = 4: 由于元素为int32, 四个字节大小;
  • closed = 0: 未关闭状态的channel;
  • elemtype = _int32: 元素类型为_int32, 包含有若干信息;
  • sendx = 0: 发送操作需要的索引, 默认为0, 由于为空channel, 所以不会使用的;
  • recvx = 0: 接收操作需要的索引, 默认为0, 由于为空channel, 所以不会使用的;
  • recvq = nil: 尚没有阻塞的接收协程出现;
  • rendq = nil: 尚没有阻塞的发送协程出现;

需要注意的是在size == 0时, c.buf环形队列指针, 指向了c.buf本身的地址, 这是因为在通道的读写操作中,通常会涉及到对 buf 字段的读写操作,而我们不希望这些操作与像 close() 这样的操作发生竞争。

buffer channel 初始化

初始化一个size == 6int32类型channel: make(chan int32, 6):

初始化Buffer channel

该实例中, 可以看到buf属性, 指向了一个环形队列.

环形队列

1
2
3
4
5
c := make(chan int, 6)
c <- 1 // send
c <- 1 // send
c <- 1 // send
<- c // receive

上面首先定义了一个size == 6int类型的channel; 其次连续进行三次send操作, 最后进行一次receive操作. 此时数据结构为:

  • sendx: 三次send操作, 使得snedx = 3;
  • recvx: 一次receive操作, 使得revcx = 1;

等待队列

1
2
3
4
c := make(chan int, 6)
go func () {<- c} // 阻塞
go func () {<- c} // 阻塞
go func () {<- c} // 阻塞
  • recvq: 由于对于存量数据为空的channel, 连续出现三个协程进行了receive操作, 到这这三个goroutine均被阻塞, 并挂在recvq属性上.

操作流程

Go提供了四种内置channel操作: make, send, receive, close. 而channel的常见操作特性想必大家都知道, 例如close一个nil channel会导致程序奔溃. 那么为何会奔溃, 哪一步逻辑导致的奔溃呢? 这就需要结合源码进行理解.

下面我将采用流程图的方式展示, 部分源码细节过于繁琐, 例如lock操作, 等将会被忽略, 尽可能梳理主干逻辑.

Make

  • 函数签名: func makechan(t *chantype, size int) *hchan {...}

  • 流程图:

  • 解读: 代码简单, 可读性强;

    1. 首先会检查元素的大小和对齐方式是否合法,
    2. 然后计算需要分配的内存大小。根据元素是否包含指针,选择不同的分配方式。
    3. 最后初始化channel的一些属性,比如元素大小、元素类型、缓冲区大小等。如果开启了debug模式,还会打印一些信息。总的来说,就是为了创建一个符合要求的channel。

Send

  • 函数签名:func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {...}

  • 流程图:

  • 解读:代码可读性一般, 实现了通道发送操作的逻辑,包括了非阻塞发送、阻塞发送和竞争检测等功能。

    1. 首先会判断通道 c 是否为 nil,如果为 nil 且不需要阻塞,则直接返回 false;否则会阻塞当前 goroutine

    2. 如果开启了 race 检测,会调用 racereadpc() 方法做竞争检测。

    3. 如果不需要阻塞且通道未关闭且通道已满,则直接返回 false。

    4. 如果通道已关闭,则解锁并 panic 报错。

    5. 尝试从接收 goroutine 列表中取出一个 goroutine,如果成功则直接发送数据并返回 true

    6. 如果通道缓冲区中有空间可以发送数据,则将数据拷贝到缓冲区中,并更新发送索引等信息,最后返回 true

    7. 如果不需要阻塞,则直接返回 false

    8. 如果需要阻塞,则创建一个 sudog 结构并加入发送 goroutine 列表,然后调用 gopark() 进行阻塞等待。

    9. 当有接收 goroutine 唤醒发送 goroutine 时,处理唤醒逻辑并返回 true

Receive

  • 函数签名:func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}

  • 流程图:

  • 解读: 代码可读性差, 下面是大概概括, 无法与上面流程图完全一致;

    1. 会判断通道是否为空,如果是非阻塞接收且通道为空,则直接返回;
    2. 如果通道已关闭并且为空,则清空数据并返回。
    3. 如果有数据可接收,则直接接收并返回。
    4. 如果是阻塞接收,则将当前 goroutine 挂起等待数据。

Close

  • 函数签名:func closechan(c *hchan) {...}

  • 流程图:

  • 解读: 这段代码清晰而易读;

    1. 首先会判断通道 c 是否为 nil,如果是则 panic 报错。
    2. 然后判断通道是否已经关闭,如果已关闭则解锁并 panic 报错。
    3. 如果开启了 race 检测,会进行相应的竞争检测(图中未展示)。
    4. 释放所有接收 goroutine 的资源,包括清空接收元素和标记等操作。
    5. 释放所有发送 goroutine 的资源,将发送元素清空,标记等操作。

结论

channel底层是实现核心为循环队列, 并不复杂. 本文结合源码总结了channel数据结构与常见操作的源码流程. 愿可以帮助到你.