0%

Go Source: sync.RWMutex

引言

本文是Go Source Code解读系列之sync/RWMutex读写锁. 本系列会以代码示例作为切入点, 解释源码.

问题

为什么不允许对sync.RWMutex通过recursive read的方式使用.

最近项目中遇到了random deadlock问题, 经过排查发现是因为在使用读写锁sync/RWMutex时, 使用了recursive read方式读取资源的同时触发了并发读, 这有可能触发deadlock.

示例代码抽象如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package mutex

import (
"fmt"
"sync"
"time"
)

var rw sync.RWMutex

func readData(i int) {
fmt.Printf("Try to get R lock: %v \n", i)
rw.RLock()
fmt.Printf("Get R lock: %v \n", i)
defer rw.RUnlock()
innerRead(i)
}

// recursive read, oh no!!!
func innerRead(i int) {
fmt.Printf("Try to get Inner R lock: %v \n", i)
rw.RLock()
fmt.Printf("Get inner R lock: %v \n", i)
defer rw.RUnlock()
time.Sleep(5 * time.Second)
fmt.Printf("Finish read something: %v \n", i)
}

func writeData(i int) {
fmt.Printf("Try to get W lock: %v \n", i)
rw.Lock()
fmt.Printf("Get W lock: %v \n", i)
defer rw.Unlock()
time.Sleep(5 * time.Second)
fmt.Printf("Finish write something: %v \n", i)
}

func RWCompetion() {
rw.Lock()
fmt.Printf("Ready??!! \n")
go readData(1)
go writeData(2)
fmt.Printf("Go ... \n")
rw.Unlock()
time.Sleep(20 * time.Second)
}

上面的代码中通过RWCompetion函数模拟了读写锁的竞争场景.

其中readData函数中模拟了读取数据操作, 存在recursive read现象, 其尝试内外两层获取读锁, 读取相关资源后, 依次释放内外读锁.

writeData函数模拟了写入数据操作, 其尝试获取写锁, 并在写入资源后, 释放写锁.

RWCompetion最后通过time.Sleep(20 * time.Second)保证读取操作有充分的时间完成.

现象

示例代码中一共发生了三次锁获取操作, 其中两次为读锁(RLock, innerRLock), 一次为写锁(WLock); 由于并发时机的不确定性, 其实际调用顺序也是不确定的, 可以分为:

  1. RLock -> innerRLock -> WLock : 先读后写;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [root@VM-0-9-centos learnGo]# go run ./cmd/main.go 
    Ready??!!
    Go ...
    Try to get R lock: 1
    Get R lock: 1
    Try to get Inner R lock: 1
    Get inner R lock: 1
    Try to get W lock: 2
    Finish read something: 1
    Get W lock: 2
    Finish write something: 2
  2. RLock -> WLock -> innerRLock: 没读完就写;

    1
    2
    3
    4
    5
    6
    7
    [root@VM-0-9-centos learnGo]# go run ./cmd/main.go 
    Ready??!!
    Go ...
    Try to get R lock: 1
    Get R lock: 1
    Try to get W lock: 2
    Try to get Inner R lock: 1
  3. WLock -> RLock -> innerRLock: 先写后读;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [root@VM-0-9-centos learnGo]# go run ./cmd/main.go 
    Ready??!!
    Go ...
    Try to get W lock: 2
    Get W lock: 2
    Try to get R lock: 1
    Finish write something: 2
    Get R lock: 1
    Try to get Inner R lock: 1
    Get inner R lock: 1
    Finish read something: 1

    通过控制台输出内容, 可以发现在第二种调用顺序 RLock -> WLock -> innerRLock时: 发生了DeadLock. 接下来会结合源码, 进行解释.

源码

RWMutex读写锁的特点:

  • 允许任意数量的读锁
  • 仅允许单独的写锁
  • 默认零值锁为unlock状态

核心代码结构包括 一个代码结构体 RWMutex, 4个方法RLock, RUnlock, Lock, Unlock;

  • 结构体: RWMutex

    1
    2
    3
    4
    5
    6
    7
    type RWMutex struct {
    w Mutex // 互斥锁, 用于标识是否存在写锁
    writerSem uint32 // 信号量, 用于标识写锁, 在释放掉当前存在的读锁后, 消费掉此信号
    readerSem uint32 // 信号量, 用于标识读锁, 在释放掉当前存在的写锁后, 消费掉此信号
    readerCount atomic.Int32 // number of pending readers 所有读锁计数
    readerWait atomic.Int32 // number of departing readers 被写锁排斥的读锁计数
    }
  • 方法: RLock, RUnlock, Lock, Unlock

    • RLock

      1
      2
      3
      4
      5
      6
      7
      8
      func (rw *RWMutex) RLock() {
      ...
      if rw.readerCount.Add(1) < 0 {
      // A writer is pending, wait for it.
      runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
      }
      ...
      }

      代码通过rw.readerCount.Add(1)对读者计数进行加1操作,并判断加1后的结果是否小于0。如果小于0,表示有一个写者正在等待,需要等待写者完成。

    • RUnlock

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      func (rw *RWMutex) RUnlock() {
      ...
      if r := rw.readerCount.Add(-1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
      }
      ...
      }
      func (rw *RWMutex) rUnlockSlow(r int32) {
      ...
      // A writer is pending.
      if rw.readerWait.Add(-1) == 0 {
      // The last reader unblocks the writer.
      runtime_Semrelease(&rw.writerSem, false, 1)
      }
      }

      代码通过rw.readerCount.Add(-1)对读者计数进行减1操作,并判断减1后的结果是否小于0。如果小于0,表示可能有其他的读者或写者在等待,需要进入慢路径处理。

      然后,代码通过调用rw.rUnlockSlow()方法进入慢路径处理: 代码判断通过rw.readerWait.Add(-1)将等待的读者数量减1。如果减1后的结果为0,表示最后一个读者解锁了,需要唤醒等待的写者。

    • Lock

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      func (rw *RWMutex) Lock() {
      ...
      // First, resolve competition with other writers.
      rw.w.Lock()
      // Announce to readers there is a pending writer.
      r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
      // Wait for active readers.
      if r != 0 && rw.readerWait.Add(r) != 0 {
      runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
      }
      ...
      }

      代码通过rw.w.Lock()来解决与其他写者的竞争。这表示在获取写锁之前,需要先解决与其他写者的竞争。

      接着,代码通过rw.readerCount.Add(-rwmutexMaxReaders)将读者计数减去最大读者数,并将结果加上最大读者数。这是为了向读者们宣告有一个等待的写者。

      然后,代码通过rw.readerWait.Add(r)等待活动读者。如果等待的读者数量不为0,代码会通过runtime_SemacquireRWMutex()函数等待写者信号量。

    • Unlock

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      func (rw *RWMutex) Unlock() {
      ...
      // Announce to readers there is no active writer.
      r := rw.readerCount.Add(rwmutexMaxReaders)
      if r >= rwmutexMaxReaders {
      race.Enable()
      fatal("sync: Unlock of unlocked RWMutex")
      }
      // Unblock blocked readers, if any.
      for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
      }
      // Allow other writers to proceed.
      rw.w.Unlock()
      ...
      }

      代码通过rw.readerCount.Add(rwmutexMaxReaders)将读者计数加上最大读者数,并将结果赋值给变量r。这是为了向读者们宣告没有活动的写者。

      接着,代码通过一个循环来释放读者信号量。循环的次数为r的值,即之前计算的读者计数加上最大读者数。在循环中,通过调用runtime_Semrelease()函数来释放读者信号量。

      然后,代码通过rw.w.Unlock()来释放写锁。这表示写者已经完成了写操作,其他的写者可以继续执行。

缘由

经过上面的源码解释, 可以分析出: RLock -> WLock -> innerRLock 的流程为:

  • 顺利获得RLock, 此时rw.readerCount == 1, 标识正在使用的读者有一个;
  • 试图去拿WLock, 发现存在读者在读, 进入了等待写信号量状态, 此时:rw.w处于锁定状态, rw.readerCount == 1- 1<<30;
  • 试图去拿innerRLock, 发现前面存在一个写锁操作, 进入了等待读信号量状态.

上面的三步操作, 其依赖关系为:

  • innerRLock -> WLock: innerRLock在等待WLock释放;
  • RLock -> innerRLock: RLock在等待innerRLock释放;
  • WLock -> RLock: WLock在等待RLock释放;

从而导致了死锁现象.

通过代码中添加RWMutex输出逻辑, 可以佐证上面的解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package mutex

import (
"fmt"
"sync"
"time"
)

var rw sync.RWMutex

func readData(i int) {
fmt.Printf("Try to get R lock: %v \n", i)
mutexInfo("TryRLock: ")
rw.RLock()
fmt.Printf("Get R lock: %v \n", i)
mutexInfo("GetRLock: ")
defer rw.RUnlock()
time.Sleep(1 * time.Second)
innerRead(i)
}

func innerRead(i int) {
mutexInfo("TryInnerRLock: ")
fmt.Printf("Try to get Inner R lock: %v \n", i)
rw.RLock()
mutexInfo("GetInnerRLock: ")
fmt.Printf("Get inner R lock: %v \n", i)
defer rw.RUnlock()
time.Sleep(5 * time.Second)
fmt.Printf("Finish read something: %v \n", i)
}

func writeData(i int) {
fmt.Printf("Try to get W lock: %v \n", i)
mutexInfo("TryWLock: ")
rw.Lock()
fmt.Printf("Get W lock: %v \n", i)
mutexInfo("GetWLock: ")
defer rw.Unlock()
time.Sleep(5 * time.Second)
fmt.Printf("Finish write something: %v \n", i)
}

func mutexInfo(pre string) {
fmt.Printf("%s: RWMutext stats: %#v \n", pre, rw)
}

func RWCompetion() {
rw.Lock()
fmt.Printf("Ready??!! \n")
go readData(1)
go writeData(2)
fmt.Printf("Go ... \n")
rw.Unlock()
time.Sleep(20 * time.Second)
}

1
2
3
4
5
6
7
8
9
10
11
[root@VM-0-9-centos learnGo]# go run ./cmd/main.go 
Ready??!!
Go ...
Try to get R lock: 1
TryRLock: : RWMutext stats: ... readerCount:atomic.Int32{_:atomic.noCopy{}, v:0}, readerWait:atomic.Int32{_:atomic.noCopy{}, v:0}}
Get R lock: 1
GetRLock: : RWMutext stats:... readerCount:atomic.Int32{_:atomic.noCopy{}, v:1}, readerWait:atomic.Int32{_:atomic.noCopy{}, v:0}}
Try to get W lock: 2
TryWLock: : RWMutext stats: ... readerCount:atomic.Int32{_:atomic.noCopy{}, v:1}, readerWait:atomic.Int32{_:atomic.noCopy{}, v:0}}
TryInnerRLock: : RWMutext stats: ... readerCount:atomic.Int32{_:atomic.noCopy{}, v:-1073741823}, readerWait:atomic.Int32{_:atomic.noCopy{}, v:1}}
Try to get Inner R lock: 1

PS: 由于并发的特性, 需要多试几次, 才可以复现上面的输出结果. 无关信息通过...忽略处理;

结论

对于sync/RWMutex中的读锁操作允许重复读, 但是不允许recursive读. 这对导致内外层锁之间的依赖性, 破坏了原子性;

对于sync/RWMutex中的写锁操作仅允许单独写, 其会阻塞后续读锁操作(防止饥饿现象).

在使用过程中, 禁止recursive读操作即可;