Source Code
A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.
A RWMutex must not be copied after first use.
If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
// 这个 semaphore 在作用是:当ongoing reader 完成了,waiting writer可以被通知到,并且解除blocked,以进行writing
// 调用 runtime_Semrelease(&rw.writerSem) 可以unblock blocked writer (by wakeup it via semaphore),这时候调用了 runtime_SemacquireMutex(&rw.writerSem) 的地方就不会继续被blocked了
// 调用 runtime_SemacquireMutex(&rw.writerSem) 的地方会被 blocked,直到 runtime_Semrelease(&rw.writerSem) 被调用
readerSem uint32 // semaphore for readers to wait for completing writers
// 这个 semaphore 在作用是:当ongoing writing 完成了,waiting reader可以被通知到,并且解除blocked,以进行reading
// 调用 runtime_Semrelease(&rw.readerSem) 可以 Unblock blocked readers (by wakeup them via semaphore),这时候调用了 runtime_SemacquireMutex(&rw.readerSem) 的地方就不会继续被blocked了
// 调用 runtime_SemacquireMutex(&rw.readerSem) 的地方会被 block,直到 runtime_Semrelease(&rw.readerSem) 被调用
readerCount int32 // number of pending readers
// if readerCount < 0, means a writing is ongoing (thus all readers have to wait, until the writing is done)
// if readerCount == 0, means no ongoing writing and no onging reading
// if readerCount > 0, mean has onging reading, but no ongoing writing
readerWait int32 // number of departing readers
}
这意味着:
- 当有writer进来时,
- 如果这时候没有ongoing reader,writer就直接write
- 如果这时候有ongoing reader,writer就一直等待(被阻塞在
runtime_SemacquireMutex(&rw.writerSem
),直到
- 当有reader进来时,
- 如果这时候没有ongoing writer,reader就直接read
- 如果这时候有ongoing writer,reader一直等待(被阻塞在
untime_SemacquireMutex(&rw.readerSem)
),直到writer完成(writer完成后,会调用runtime_Semrelease(&rw.readerSem)
来通知所有reader)
Read
RLock locks rw for reading.
It should not be used for recursive read locking; a blocked Lock call excludes new readers from acquiring the lock. See the documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// enter here meaning that a writing is ongoing (thus all readers have to wait, until the writing is done)
// A writer is writing, thus this reader has to wait for the writer
// meaning this reader will be blocked here until the writer finishes writing and in turn will call runtime_Semrelease(&rw.readerSem) so as to wakeup mutilple blocked readers
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
RUnlock undoes a single RLock call; it does not affect other simultaneous readers. It is a run-time error if rw is not locked for reading on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // r < 0 说明这时候有Ongoing writer
// 我们一般假设一个read操作非常快,而如果一个read操作非常慢,慢到这时候已经有writer在等待了,就会进入 rUnlockSlow
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
// 我们一般假设一个read操作非常快,而如果一个read操作非常慢,慢到这时候已经有writer在等待了,就会进入 rUnlockSlow
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders { // r+1 == 0 意味着在当前没有ongoing writer的时候,调用了 RUnlock(),而且在此之前并没有调用RLock();r+1 == -rwmutexMaxReaders意味着当前有ongoing writer,而调用RUnlock()的时候,没有调用RLock()
// 在这两种情况中任何一种情况下,都会panic
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 如果 atomic.AddInt32(&rw.readerWait, -1) == 0,意味着这个reader是最后一个还在reader的reader,因而这时候调用 runtime_Semrelease(&rw.writerSem) 来唤醒在因在等待而被blocked的writer
runtime_Semrelease(&rw.writerSem, false, 1)
}
// 如果 atomic.AddInt32(&rw.readerWait, -1) != 0,意味着这个reader不是最后一个还在reader的reader,因而这时候不需要调用 runtime_Semrelease(&rw.writerSem) 去唤醒在因在等待而被blocked的writer
}
Write
Lock
locks rw for writing. If the lock is already locked for reading or writing, Lock
blocks until the lock is available.
func (rw *RWMutex) Lock() {
...
// 获取 lock,以block其他正在尝试执行 Write 操作的 G
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers until all readings are done
// 判断当前有没有 Reader 或者在等待的 Reader
// 1. r != 0 表示当前有 Reader,r 等于当前正在reading的 reader的个数
// 2. r == 0 表示当前没有 ongoing Reader
// atomic.AddInt32(&rw.readerWait, r) != 0 意味着没有 waiting readers
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 当当前有 Reader,且有 waiting readers,获取semaphore以阻塞这些reader
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
Unlock unlocks rw for writing. It is a run-time error if rw is not locked for writing on entry to Unlock.
As with Mutexes, a locked RWMutex is not associated with a particular goroutine. One goroutine may RLock (Lock) a RWMutex and then arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers (by wakeup them via semaphore), if any
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
场景
RR 场景
在两次读的时候,对 readerCount
加1,且 readerCount
永远不会小于 0,因此两次读都不会被阻塞。
RW 场景
两种情况:
- writer拿到锁的时候,reader已经释放rlock了
- writer拿到锁的时候,reader还没有释放rlock
情况1
- 在读的时候,
readerCount
会变成1 - 读完了,这时候释放rlock,而且
readerCount
变成0 - 在要开始写的时候,
- 执行
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
,这时候 r会等于0,意味着所有的reader都释放了rlock
- 执行
情况2
- 在读的时候
- 获取rlock
readerCount
会变成1
- 在要开始写的时候,
- 获取lock
- 执行
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
,这时候如果 r不等于0,且atomic.AddInt32(&rw.readerWait, r) != 0
,意味着仍有reader未释放rlock - 执行
runtime_SemacquireMutex(&rw.writerSem)
进行阻塞,直到所有reader都释放rlock的时候,writer被唤醒,并继续执行
- Reader执行
r := atomic.AddInt32(&rw.readerCount, -1)
,这时候r会<0(因为有waiting writer) - 执行
rw.rUnlockSlow(r)
- 执行
atomic.AddInt32(&rw.readerWait, -1)
,这时候这个函数的返回值会等于0, - 因而执行
runtime_Semrelease(&rw.writerSem
- 执行
- writer从
runtime_SemacquireMutex(&rw.writerSem)
处被唤醒,最终writer成功拿到lock
WR场景
- Writer lock mutex
- writer执行
atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
- 判断readerCount原始的值等于0
- 我们假设其等于0
- reader来了之后,执行
atomic.AddInt32(&rw.readerCount, 1)
,其返回值 < 0 - 因而执行
runtime_SemacquireMutex(&rw.readerSem)
- 因而被blocked在readerSem的信号量上,等待被writer唤醒
WW 场景
- 两 writer 执行
rw.w.Lock()
看谁先拿到锁,没拿到的就会被阻塞
Benchmark
https://geektutu.com/post/hpg-mutex.html