引言
本文是Go Source Code
解读系列之sync/RWMutex
读写锁. 本系列会以代码示例作为切入点, 解释源码.
问题
为什么不允许对sync.RWMutex
通过recursive read
的方式使用.
最近项目中遇到了random deadlock
问题, 经过排查发现是因为在使用读写锁sync/RWMutex
时, 使用了recursive read
方式读取资源的同时触发了并发读, 这有可能触发deadlock
.
示例代码抽象如下:
1 | package mutex |
上面的代码中通过RWCompetion
函数模拟了读写锁的竞争场景.
其中readData
函数中模拟了读取数据操作, 存在recursive read
现象, 其尝试内外两层获取读锁, 读取相关资源后, 依次释放内外读锁.
writeData
函数模拟了写入数据操作, 其尝试获取写锁, 并在写入资源后, 释放写锁.
在RWCompetion
最后通过time.Sleep(20 * time.Second)
保证读取操作有充分的时间完成.
现象
示例代码中一共发生了三次锁获取操作, 其中两次为读锁(RLock, innerRLock
), 一次为写锁(WLock
); 由于并发时机的不确定性, 其实际调用顺序也是不确定的, 可以分为:
RLock -> innerRLock -> WLock
: 先读后写;1
2
3
4
5
6
7
8
9
10
11[root@VM-0-9-centos learnGo]# go run ./cmd/main.go
Ready??!!
Go ...
Try to get R lock: 1
Get R lock: 1
Try to get Inner R lock: 1
Get inner R lock: 1
Try to get W lock: 2
Finish read something: 1
Get W lock: 2
Finish write something: 2RLock -> WLock -> innerRLock
: 没读完就写;1
2
3
4
5
6
7[root@VM-0-9-centos learnGo]# go run ./cmd/main.go
Ready??!!
Go ...
Try to get R lock: 1
Get R lock: 1
Try to get W lock: 2
Try to get Inner R lock: 1WLock -> RLock -> innerRLock
: 先写后读;1
2
3
4
5
6
7
8
9
10
11[root@VM-0-9-centos learnGo]# go run ./cmd/main.go
Ready??!!
Go ...
Try to get W lock: 2
Get W lock: 2
Try to get R lock: 1
Finish write something: 2
Get R lock: 1
Try to get Inner R lock: 1
Get inner R lock: 1
Finish read something: 1通过控制台输出内容, 可以发现在第二种调用顺序
RLock -> WLock -> innerRLock
时: 发生了DeadLock
. 接下来会结合源码, 进行解释.
源码
RWMutex
读写锁的特点:
- 允许任意数量的读锁
- 仅允许单独的写锁
- 默认零值锁为
unlock
状态
核心代码结构包括 一个代码结构体 RWMutex
, 4个方法RLock, RUnlock, Lock, Unlock
;
结构体:
RWMutex
1
2
3
4
5
6
7type RWMutex struct {
w Mutex // 互斥锁, 用于标识是否存在写锁
writerSem uint32 // 信号量, 用于标识写锁, 在释放掉当前存在的读锁后, 消费掉此信号
readerSem uint32 // 信号量, 用于标识读锁, 在释放掉当前存在的写锁后, 消费掉此信号
readerCount atomic.Int32 // number of pending readers 所有读锁计数
readerWait atomic.Int32 // number of departing readers 被写锁排斥的读锁计数
}方法:
RLock, RUnlock, Lock, Unlock
RLock
1
2
3
4
5
6
7
8func (rw *RWMutex) RLock() {
...
if rw.readerCount.Add(1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
...
}代码通过
rw.readerCount.Add(1)
对读者计数进行加1操作,并判断加1后的结果是否小于0。如果小于0,表示有一个写者正在等待,需要等待写者完成。RUnlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func (rw *RWMutex) RUnlock() {
...
if r := rw.readerCount.Add(-1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
func (rw *RWMutex) rUnlockSlow(r int32) {
...
// A writer is pending.
if rw.readerWait.Add(-1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}代码通过
rw.readerCount.Add(-1)
对读者计数进行减1操作,并判断减1后的结果是否小于0。如果小于0,表示可能有其他的读者或写者在等待,需要进入慢路径处理。然后,代码通过调用
rw.rUnlockSlow()
方法进入慢路径处理: 代码判断通过rw.readerWait.Add(-1)
将等待的读者数量减1。如果减1后的结果为0,表示最后一个读者解锁了,需要唤醒等待的写者。Lock
1
2
3
4
5
6
7
8
9
10
11
12func (rw *RWMutex) Lock() {
...
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
...
}代码通过
rw.w.Lock()
来解决与其他写者的竞争。这表示在获取写锁之前,需要先解决与其他写者的竞争。接着,代码通过
rw.readerCount.Add(-rwmutexMaxReaders)
将读者计数减去最大读者数,并将结果加上最大读者数。这是为了向读者们宣告有一个等待的写者。然后,代码通过
rw.readerWait.Add(r)
等待活动读者。如果等待的读者数量不为0,代码会通过runtime_SemacquireRWMutex()
函数等待写者信号量。Unlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}代码通过
rw.readerCount.Add(rwmutexMaxReaders)
将读者计数加上最大读者数,并将结果赋值给变量r
。这是为了向读者们宣告没有活动的写者。接着,代码通过一个循环来释放读者信号量。循环的次数为
r
的值,即之前计算的读者计数加上最大读者数。在循环中,通过调用runtime_Semrelease()
函数来释放读者信号量。然后,代码通过
rw.w.Unlock()
来释放写锁。这表示写者已经完成了写操作,其他的写者可以继续执行。
缘由
经过上面的源码解释, 可以分析出: RLock -> WLock -> innerRLock
的流程为:
- 顺利获得
RLock
, 此时rw.readerCount == 1
, 标识正在使用的读者有一个; - 试图去拿
WLock
, 发现存在读者在读, 进入了等待写信号量状态, 此时:rw.w
处于锁定状态,rw.readerCount == 1- 1<<30
; - 试图去拿
innerRLock
, 发现前面存在一个写锁操作, 进入了等待读信号量状态.
上面的三步操作, 其依赖关系为:
innerRLock -> WLock
:innerRLock
在等待WLock
释放;RLock -> innerRLock
:RLock
在等待innerRLock
释放;WLock -> RLock
:WLock
在等待RLock
释放;
从而导致了死锁现象.
通过代码中添加RWMutex
输出逻辑, 可以佐证上面的解释:
1 | package mutex |
1 | [root@VM-0-9-centos learnGo]# go run ./cmd/main.go |
PS: 由于并发的特性, 需要多试几次, 才可以复现上面的输出结果. 无关信息通过...
忽略处理;
结论
对于sync/RWMutex
中的读锁操作允许重复读, 但是不允许recursive
读. 这对导致内外层锁之间的依赖性, 破坏了原子性;
对于sync/RWMutex
中的写锁操作仅允许单独写, 其会阻塞后续读锁操作(防止饥饿现象).
在使用过程中, 禁止recursive
读操作即可;