手摸手Go 简单聊聊sync.RWMutex

光华路程序猿

共 5972字,需浏览 12分钟

 · 2021-03-09

那一天我二十一岁,在我一生的黄金时代,我有好多奢侈。我想爱,想吃,还想在一瞬间变成天上半明半暗的云,后来我才知道,生活就是个缓慢受锤的过程,人一天天老下去,奢望也一天天消逝,最后变得像挨了锤的牛一样。可是我过二十一岁生日时没有预见到这一点。我觉得自己会永远生猛下去,什么也锤不了我。---王小波

2ab9c57869c21812d62e31bd9b983957.webp

各位早上好~今天来聊聊Go提供的读写互斥锁sync.RWMutex,它可以加任意数量的读锁或者一个写锁。读写锁占用规则:

  • 读锁占用的情况下,会组织写锁的获取,但是不会阻止其他goroutine获取读锁
  • 写锁占用的情况下,则不允许任何(读锁/写锁)请求,将整个锁独占

其零值表示未上锁状态。


基本使用

使用sync.RWMutex可以很容易实现一个协程安全的字典结构。

package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

func init() {
 rand.Seed(time.Now().Unix())
}

type Key interface{}
type Value interface{}
type Dictionary struct {
 m    sync.RWMutex
 data map[Key]Value
}

func (d *Dictionary) Add(k Key, v Value) {
 d.m.Lock()
 defer d.m.Unlock()
 if d.data == nil {
  d.data = make(map[Key]Value)
 }
 d.data[k] = v
}

func (d *Dictionary) Get(k Key) Value {
 d.m.RLock()
 defer d.m.RUnlock()
 return d.data[k]
}
func main() {
 d := &Dictionary{}
 wg := sync.WaitGroup{}

 for i := 0; i < 10; i++ {
  wg.Add(2)
  weight := rand.Intn(100)
  go func(w int) {
   time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   d.Add("leo", fmt.Sprintf("leo超帅的 +%d", w))
   wg.Done()
  }(weight)
  go func() {
   time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   fmt.Println(d.Get("leo"))
   wg.Done()
  }()
 }
 wg.Wait()
}

sync.RWMutex源码分析

数据结构

type RWMutex struct {
 w           Mutex  // 写操作需要先尝试持有
 writerSem   uint32 // 等待读操作完成的写等待的信号量
 readerSem   uint32 // 等待写操作完成的读等待的信号量
 readerCount int32  // 阻塞的读操作数量
 readerWait  int32  // 写操作 来之前 读操作数量
}
// 最大读操作数量
const rwmutexMaxReaders = 1 << 30

上面几个属性,第一次看到readerWait有点儿懵 这个跟readerCount有啥关系呢?上图吧 其实也不复杂 具体可以配合下面代码分析一起可能会更好理解。

8a4e629a793fd4cbe685abc156158691.webp

rwmutex attribute

假设一个场景,不同操作时不同属性的值变化如下表:

操作writerSemreaderSemreaderCountreaderWaitrw.w
4次Rlock()且均未释放未阻塞写操作未阻塞读操作400
假设执行一次Unlock()未阻塞写操作未阻塞读操作4-1=300
尝试执行Lock()阻塞1个写操作未阻塞读操作3-(1<<30)30
Lock()等待readerWait个读操作执行完毕




执行2次RUnlock()同时执行2次Rlock()阻塞1个写操作阻塞2个读操作3-(1<<30)-2+23-20
第一次Lock()未获得锁 再次执行Lock() 将被阻塞在rw.w上阻塞1个写操作阻塞2个读操作3-(1<<30)-2+23-21
第4次RUnlock执行完毕时 会唤醒阻塞的第一个Lock未阻塞写阻塞2个读操作3-(1<<30)-2+2-1+(1<<30)=201

为什么他们的值会是这样?我们接着看源码,然后回过头再对照表格 自然就明了了。

操作方法

RLock

用于读操作抢占锁,它不应该被递归调用。

func (rw *RWMutex) RLock() {
 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
  // A writer is pending, wait for it.
  runtime_SemacquireMutex(&rw.readerSem, false0)
 }
}

执行RLock,若rw.readerCount加1小于0则说明存在写操作持有锁,则将当前的读操作阻塞到rw.readerSem上。

RUnlock

RUnlock一次只能解除一个Rlock操作,并不会影响其他的读操作。如果没有执行RLock,执行RUnlock会panic throw("sync: RUnlock of unlocked RWMutex")

func (rw *RWMutex) RUnlock() {
 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
  // Outlined slow-path to allow the fast-path to be inlined
  rw.rUnlockSlow(r)
 }
}

RUnlock首先判断r=rw.readerCount-1

  • r>=0 表示释放读锁成功

  • r<0表示存在写操作持有锁,进入slow-path

func (rw *RWMutex) rUnlockSlow(r int32) {
  //不存在RLock操作 不能执行RUnlock
 if r+1 == 0 || r+1 == -rwmutexMaxReaders {
  race.Enable()
  throw("sync: RUnlock of unlocked RWMutex")
 }
 // A writer is pending.
 if atomic.AddInt32(&rw.readerWait, -1) == 0 {//读操作执行完毕 唤醒等待的写操作
  //最后一个读操作解锁 唤醒写操作
  runtime_Semrelease(&rw.writerSem, false1)
 }
}

因为初始状态下sync.RWMutex是未上锁状态,rw.readerCount初始为0,或者在无读操作加锁的情况下,写操作加锁rw.readerCount会被置为const rwmutexMaxReaders = 1 << 30,因此

r+1 == 0 || r+1 == -rwmutexMaxReaders表明当前无读操作持有锁,而直接执行RUnlock会panic。

尝试进行rw.readerWait-1操作,然后判断若rw.readerWait==0则表明写操作抢占锁之前的读操作都已经处理完毕,此时可以唤醒被阻塞在rw.writerSem上的写操作了。

Lock

针对写操作时尝试获取锁,如果当前锁被读操作或写操作持有,则阻塞等待直到锁可用。

func (rw *RWMutex) Lock() {
 // 首先解决于其他写操作的竞争问题
 rw.w.Lock()
 // 告诉读操作,这里存在阻塞的写操作
 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
 // 如果仍存存在读操作持有锁 则阻塞等待
 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
  runtime_SemacquireMutex(&rw.writerSem, false0)
 }
}

大致步骤:

  1. Lock首先调用了rw.w.Lock()来解决多个写操作并发请求的竞争问题:如果存在多个写操作,只有一个写操作会获取到rw.w锁接着尝试剩余的操作,其他的写操作会被阻塞在rw.w上。
  2. 调用atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders),这里结合RLock()中的atomic.AddInt32(&rw.readerCount, 1) < 0则将读操作阻塞在rw.readerSem上,以此来让读操作感知到当前是否存在阻塞的写操作。

Unlock

sync.Mutex一样,一个锁定的sync.RWMutex跟特定的goroutine没有任何关联。一个goroutine可能RLockLock)一个sync.RWMutex,然后另一个goroutine可以RUnlockUnlock)掉这个锁状态。

// 这里也是不允许针对一个未执行Lock的rw执行Unlock操作的
func (rw *RWMutex) Unlock() {
 // 通知读操作,这里没有激活的写操作了
 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
 if r >= rwmutexMaxReaders {
  race.Enable()
  throw("sync: Unlock of unlocked RWMutex")
 }
 // Unblock blocked readers, if any.
 for i := 0; i < int(r); i++ {
  runtime_Semrelease(&rw.readerSem, false0)
 }
 // Allow other writers to proceed.
 rw.w.Unlock()
}

基本逻辑:

  1. 通过atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)恢复rw.readerCount,即通知读操作这里没有激活的写操作,意味着这个时候读操作可以有机会竞争锁了,即使仍存在阻塞在rw.w上的写操作,这里应该是防止读操作会因为写操作过多被饿死。
  2. 判断是否是没Lock的情况下执行了Unlock
  3. 依次唤醒阻塞在rw.readerSem上的读操作
  4. rw.w.Unlock意味着阻塞在rw.w上的其他写操作可以接着抢占锁了。

关于递归读锁定问题

细心的童鞋可能会发现sync.RWMutex是禁止递归读锁定的,官方是这么说的

If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.

大概意思是说,如果我们持有一个sync.RWMutex的读锁时,可能会有另一个写操作尝试获取锁,因为前面的读锁未释放则这个写操作只能阻塞等待。不幸的是,这个读操作干完活并不释放读锁,而是继续递归调用读操作获取锁,但是这个时候获取读锁的时候发现前面有阻塞的写锁请求,则这个读操作请求只能阻塞等待前面的写操作完事儿。最早的读操作又等待当前的读操作完事儿去释放锁,完美的一个贪吃蛇构成的一个死锁的场景就出现啦。

795fe31b617a220dead659a9f7eff572.webp

recursive rlock

举个栗子吧:

我们在斐波那契数列递归函数里,递归获取读锁,然后中途我们来个写锁请求,看看啥结果

package main

import (
 "sync"
 "time"
)

var m sync.RWMutex

func fibonacci(num int) int {
 if num < 2 {
  return 1
 }
 m.RLock()
 defer m.RUnlock()
 time.Sleep(time.Millisecond * 100)
 return fibonacci(num-1) + fibonacci(num-2)
}
func main() {
 done := make(chan int)
 go func() {
  m.Lock()
  time.Sleep(time.Millisecond * 200)
  m.Unlock()
  done <- 1
 }()
 fibonacci(5)
 <-done
}

输出结果是那么熟悉的味道。

fatal error: all goroutines are asleep - deadlock!

总结

sync.RWMutex提供了比sync.Mutex更加细粒度的锁控制,将读锁和写锁做了分离,本来逻辑会比较复杂,但是它是基于sync.Mutex所以整体逻辑就变得比较简单,可能readerCountreaderWait咋一看有点儿懵,不过仔细看看不难,通过readerCount的值来达到读写锁通信的目的 设计还是很巧妙的,受益匪浅。




如果阅读过程中发现本文存疑或错误的地方,可以关注公众号留言。如果觉得还可以 帮忙点个在看😁






浏览 37
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报