Golang 语言标准库 sync 包的 RWMutex 读写互斥锁怎么使用?

微信扫一扫,分享到朋友圈

Golang 语言标准库 sync 包的 RWMutex 读写互斥锁怎么使用?

01

介绍

Mutex 互斥锁严格锁定读和写,这在读多写少的场景,未免显得有些「浪费」,在 Go 语言中,sync 包中的 RWMutex 类型可以解决这类问题,RWMutex 是基于 Mutex 实现的,RWMutex 是读写(reader/writer)互斥锁,RWMutex 在某一特定时间内,只能由若干 reader(读操作) 持有锁或只能由单个 writer(写操作) 持有锁。

例如,如果某个执行读操作的 goroutine 持有锁(共享锁),其他读操作的 goroutine 将不会阻塞,而是可以并发访问共享变量,提升读性能;如果某个执行写操作的 goroutine 持有锁(排它锁),其他的 goroutine,无论是执行读操作,还是执行写操作,都会阻塞,直到这个持有锁的写操作 goroutine 释放锁。

02

使用场景

读者通过阅读 Part 01 的内容,相信已经明白,RWMutex 类型适用于读多写少的场景。

如果我们在开始写程序的时候,就可以预估是读多写少的场景,那就直接使用 RWMutex 类型的读写互斥锁,否则,可以先使用 Mutex 类型的互斥锁,后续代码优化的时候,再根据实际情况来看是否可以改用 RWMutex 类型的读写互斥锁来优化代码的读性能。

RWMutex 类型一共有 6 个方法,

通过阅读 Go 源码 /usr/local/go/src/sync/rwmutex.go,

我们可以发现分别是 RLock、RUnlock、rUnlockSlow、Lock、Unlock 和 RLocker。

下面分别介绍一下这几个方法:

RLock/RUnlock:RLock 锁定写操作,如果锁已被写操作持有,RLock 方法会被阻塞,直到锁释放;如果锁已被读操作持有,RLock 方法会直接返回。RUnlock 是读操作对应的释放锁的方法。一般用于读操作的场景。

Lock/Unlock: Lock 锁定读写操作,不管是读操作持有锁,还是写操作持有锁,Lock 方法都会被阻塞,直到锁释放。Unlock 是对应的释放锁方法。一般用于写操作的场景。

rUnlockSlow:检查读操作是否全部释放锁,如果读锁全部释放,才可以唤醒写操作去请求写锁。

RLocker:RLocker 为读操作返回一个 Locker 接口,它的 Lock 方法会调用 RWMutex 类型的 RLock方法,它的 Unlock 方法会调用 RWMutex 类型的 RUnlock方法。

03

实现原理

在 Go 语言中,标准库 sync 包的 RWMutex 类型是采用「写优先」(Write-preferring)的设计,一个写调用持有锁,新的读调用会被阻塞。

RWMutex 类型的字段:

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}


const rwmutexMaxReaders = 1 << 30

阅读源码,可以发现 RWMutex 类型共有 5 个字段,其中一个是 Mutex 类型,剩余 4 个字段是辅助字段。

  • w:帮助解决多个写操作竞争锁的问题。

  • writerSem:writer 信号量。

  • readerSem:reader 信号量。

  • readerCount:记录当前 reader 的数量。

  • readerWait:记录 waiter 请求锁时,需要等待完成的 reader 数量。

rwmutexMaxReaders 常量,定义的是 reader 的最大值。

RLock 方法:

func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

阅读源码,第 6 行代码,对 readerCount 进行加 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,因为,RWMutex 是采用写优先的方案设计的,此时,需要优先处理 writer 操作,暂时把新来的 reader 阻塞。

RUnlock 方法和 

rUnlockSlow


方法:

func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}


func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

阅读源码,第 7 行代码,对 readerCount 进行减 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,第 9 行代码,通过调用 rUnlockSlow 方法,检查 reader 是否全部释放读锁了,如果已全部释放读锁,就可以唤醒请求写锁的 writer 了。

通过 Rlock 和 RUnlock 方法的源码,我们可以得出的结论是,writer 请求锁的优先处理权只限定于新 reader,如果在 writer 请求锁时,已有 reader 持有锁,仍然需要等待持有锁的 reader 释放锁。

Lock 方法:

func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

阅读源码,可以发现 RWMutex 的 Lock 方法,使用 Mutex 的 Lock 方法,当有一个 writer 持有互斥锁时,通过将 readerCount 减去 rwmutexMaxReaders 常量,使 readerCount 变为负数,即保存了reader 数量,也代表了当前有 writer 请求锁。

第 9 行代码,还记录了当前持有锁的 reader 的数量,如果持有锁的 reader 的数量不等于 0,第 11 行代码,将 readerCount 赋值给 readerWait,同时当前 writer 进入阻塞状态,等待所有持有锁的 reader 全部释放锁,才会唤醒当前被阻塞的 writer。

Unlock 方法:

func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}


// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

阅读源码,可以发现当前 writer 释放锁时,第 9 行代码,会通过将 readerCount 加上 rwmutexMaxReaders 常量,将 readerCount 变为正数,代表当前没有 writer 持有锁了,第 15 行代码,开始唤醒阻塞等待的 reader,第 19 行代码,释放互斥锁,等待其他 writer 请求锁。

细心的读者可能已经发现,Lock 方法是先持有互斥锁,再修改字段,Unlock 方法是最后释放互斥锁,再修改字段,采用这种顺序,是为了保证修改字段也受到互斥锁的保护。

04

踩坑

RWMutex 读写互斥锁的锁操作必须成对出现,Lock 和 RLock 操作,如果在未成对调用 Unlock 和 RUnlock 的情况下,重复调用 Lock 和 RLock,因为锁还没有被释放,可能会导致死锁;

Unlock 和 RUnlock 操作,如果在未对调用 Lock 和 RLock 的情况下,直接给一个未加锁的 RWMutex 释放锁,会导致程序 panic。

05

总结

本文开篇先介绍为什么使用 RWMutex 读写互斥锁,接着介绍 RWMutex 的方法和使用场景,然后介绍了 RLock/RUnlock 和 Lock/Unlock 的实现原理,最后列举了一个非常容易踩的「坑」。

免费领取资料

扫描下方二维码关注微信公众号「Golang 语言开发栈」,回复「 资料
」关键字,免费获取 Golang 语言学习资料,回复「 微信群
」关键字,申请加入微信群,和我一起学习 Golang。

微信扫一扫,分享到朋友圈

Golang 语言标准库 sync 包的 RWMutex 读写互斥锁怎么使用?

如何消除汽车盲区?敏视3D全景环视系统助你驾驶更安全

上一篇

手把手教你使用容器服务 TKE 集群审计排查问题

下一篇

你也可能喜欢

Golang 语言标准库 sync 包的 RWMutex 读写互斥锁怎么使用?

长按储存图像,分享给朋友