【Golang】源码 - sync - RWMutex

Posted by 西维蜀黍 on 2021-05-30, Last Modified on 2021-09-21

Source Code

A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.

A RWMutex must not be copied after first use.

If a goroutine holds a RWMutex for reading and another goroutine might call Lock, no goroutine should expect to be able to acquire a read lock until the initial read lock is released. In particular, this prohibits recursive read locking. This is to ensure that the lock eventually becomes available; a blocked Lock call excludes new readers from acquiring the lock.

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
  // 这个 semaphore 在作用是:当ongoing reader 完成了,waiting writer可以被通知到,并且解除blocked,以进行writing
  // 调用 runtime_Semrelease(&rw.writerSem) 可以unblock blocked writer (by wakeup it via semaphore),这时候调用了 runtime_SemacquireMutex(&rw.writerSem) 的地方就不会继续被blocked了
  // 调用 runtime_SemacquireMutex(&rw.writerSem) 的地方会被 blocked,直到 runtime_Semrelease(&rw.writerSem) 被调用
  
	readerSem   uint32 // semaphore for readers to wait for completing writers
  // 这个 semaphore 在作用是:当ongoing writing 完成了,waiting reader可以被通知到,并且解除blocked,以进行reading
  // 调用 runtime_Semrelease(&rw.readerSem) 可以 Unblock blocked readers (by wakeup them via semaphore),这时候调用了 runtime_SemacquireMutex(&rw.readerSem) 的地方就不会继续被blocked了
  // 调用 runtime_SemacquireMutex(&rw.readerSem) 的地方会被 block,直到 runtime_Semrelease(&rw.readerSem) 被调用
	readerCount int32  // number of pending readers
  // if readerCount < 0, means a writing is ongoing (thus all readers have to wait, until the writing is done)
  // if readerCount == 0, means no ongoing writing and no onging reading
  // if readerCount > 0, mean has onging reading, but no ongoing writing
  
	readerWait  int32  // number of departing readers
}

这意味着:

  • 当有writer进来时,
    • 如果这时候没有ongoing reader,writer就直接write
    • 如果这时候有ongoing reader,writer就一直等待(被阻塞在runtime_SemacquireMutex(&rw.writerSem),直到
  • 当有reader进来时,
    • 如果这时候没有ongoing writer,reader就直接read
    • 如果这时候有ongoing writer,reader一直等待(被阻塞在untime_SemacquireMutex(&rw.readerSem)),直到writer完成(writer完成后,会调用runtime_Semrelease(&rw.readerSem)来通知所有reader)

Read

RLock locks rw for reading.

It should not be used for recursive read locking; a blocked Lock call excludes new readers from acquiring the lock. See the documentation on the RWMutex type.

func (rw *RWMutex) RLock() {
	...
	if atomic.AddInt32(&rw.readerCount, 1) < 0 { 
    // enter here meaning that a writing is ongoing (thus all readers have to wait, until the writing is done)
    
		// A writer is writing, thus this reader has to wait for the writer
    // meaning this reader will be blocked here until the writer finishes writing and in turn will call runtime_Semrelease(&rw.readerSem) so as to wakeup mutilple blocked readers
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	...
}

RUnlock undoes a single RLock call; it does not affect other simultaneous readers. It is a run-time error if rw is not locked for reading on entry to RUnlock.

func (rw *RWMutex) RUnlock() {
	...
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // r < 0 说明这时候有Ongoing writer
    // 我们一般假设一个read操作非常快,而如果一个read操作非常慢,慢到这时候已经有writer在等待了,就会进入 rUnlockSlow
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	...
}

// 我们一般假设一个read操作非常快,而如果一个read操作非常慢,慢到这时候已经有writer在等待了,就会进入 rUnlockSlow
func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders { // r+1 == 0 意味着在当前没有ongoing writer的时候,调用了 RUnlock(),而且在此之前并没有调用RLock();r+1 == -rwmutexMaxReaders意味着当前有ongoing writer,而调用RUnlock()的时候,没有调用RLock()
    // 在这两种情况中任何一种情况下,都会panic
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 如果 atomic.AddInt32(&rw.readerWait, -1) == 0,意味着这个reader是最后一个还在reader的reader,因而这时候调用 runtime_Semrelease(&rw.writerSem) 来唤醒在因在等待而被blocked的writer
 
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
  // 如果 atomic.AddInt32(&rw.readerWait, -1) != 0,意味着这个reader不是最后一个还在reader的reader,因而这时候不需要调用 runtime_Semrelease(&rw.writerSem) 去唤醒在因在等待而被blocked的writer
}

Write

Lock locks rw for writing. If the lock is already locked for reading or writing, Lock blocks until the lock is available.

func (rw *RWMutex) Lock() {
	...
  // 获取 lock,以block其他正在尝试执行 Write 操作的 G
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers until all readings are done
  // 判断当前有没有 Reader 或者在等待的 Reader
  //  1. r != 0 表示当前有 Reader,r 等于当前正在reading的 reader的个数
  //  2. r == 0 表示当前没有 ongoing Reader
  // atomic.AddInt32(&rw.readerWait, r) != 0 意味着没有 waiting readers
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 当当前有 Reader,且有 waiting readers,获取semaphore以阻塞这些reader
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	...
}

Unlock unlocks rw for writing. It is a run-time error if rw is not locked for writing on entry to Unlock.

As with Mutexes, a locked RWMutex is not associated with a particular goroutine. One goroutine may RLock (Lock) a RWMutex and then arrange for another goroutine to RUnlock (Unlock) it.

func (rw *RWMutex) Unlock() {
	...

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
  // Unblock blocked readers (by wakeup them via semaphore), if any 
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	...
}

场景

RR 场景

在两次读的时候,对 readerCount 加1,且 readerCount 永远不会小于 0,因此两次读都不会被阻塞。

RW 场景

两种情况:

  1. writer拿到锁的时候,reader已经释放rlock了
  2. writer拿到锁的时候,reader还没有释放rlock

情况1

  • 在读的时候,readerCount 会变成1
  • 读完了,这时候释放rlock,而且readerCount变成0
  • 在要开始写的时候,
    • 执行r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders,这时候 r会等于0,意味着所有的reader都释放了rlock

情况2

  • 在读的时候
    • 获取rlock
    • readerCount 会变成1
  • 在要开始写的时候,
    • 获取lock
    • 执行r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders,这时候如果 r不等于0,且atomic.AddInt32(&rw.readerWait, r) != 0,意味着仍有reader未释放rlock
    • 执行runtime_SemacquireMutex(&rw.writerSem)进行阻塞,直到所有reader都释放rlock的时候,writer被唤醒,并继续执行
  • Reader执行r := atomic.AddInt32(&rw.readerCount, -1),这时候r会<0(因为有waiting writer)
  • 执行rw.rUnlockSlow(r)
    • 执行atomic.AddInt32(&rw.readerWait, -1),这时候这个函数的返回值会等于0,
    • 因而执行runtime_Semrelease(&rw.writerSem
  • writer从runtime_SemacquireMutex(&rw.writerSem)处被唤醒,最终writer成功拿到lock

WR场景

  • Writer lock mutex
  • writer执行atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
  • 判断readerCount原始的值等于0
  • 我们假设其等于0
  • reader来了之后,执行atomic.AddInt32(&rw.readerCount, 1),其返回值 < 0
  • 因而执行runtime_SemacquireMutex(&rw.readerSem)
  • 因而被blocked在readerSem的信号量上,等待被writer唤醒

WW 场景

  • 两 writer 执行rw.w.Lock()看谁先拿到锁,没拿到的就会被阻塞

Benchmark

https://geektutu.com/post/hpg-mutex.html

Reference