Background
悲观锁和乐观并发控制机制
悲观锁是一种悲观思想,它总认为最坏的情况可能会出现,它认为数据很可能会被其他人所修改,不管读还是写,悲观锁在执行操作之前都先上锁。
对读对写都需要加锁导致性能低,所以悲观锁用的机会不多。但是在多写的情况下,还是有机会使用悲观锁的,因为乐观并发控制机制遇到写不一致的情况下会一直重试,会浪费更多的时间。
乐观并发控制机制的思想与悲观锁的思想相反,它总认为资源和数据不会被别人所修改,所以读取不会上锁,但是乐观并发控制机制在进行写入操作的时候会判断当前数据是否被修改过。
乐观并发控制机制的实现方案主要包含CAS和版本号机制。乐观并发控制机制适用于多读的场景,可以提高吞吐量。
CAS
CAS即Compare And Swap(比较与交换),是一种有名的无锁算法。即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步。CAS涉及三个关系:指向内存一块区域的指针V、旧值A和将要写入的新值B。
总结来说:
- CAS 算法:会执行一个原子操作
- 在这个原子操作中,先检查当前value是否等于current,如果相等,则意味着value没被其他线程修改过,更新并返回true。如果不相等,compareAndSet则会返回false,然后循环继续尝试更新。
CAS实现的乐观并发控制会带来潜在的ABA问题。
CAS 的权衡
在轻度到中度的争用情况下,非阻塞算法的性能会超越阻塞算法,因为 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话肯定是真的,因为没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。
在高度争用的情况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,因为当线程阻塞时,它就会停止争用,耐心地等候轮到自己,从而避免了进一步争用。但是,这么高的争用程度并不常见,因为多数时候,线程会把线程本地的计算与争用共享数据的操作分开,从而给其他线程使用共享数据的机会。
版本号机制
版本号机制是通过一个版本号version来实现版本控制。
自旋锁(Spin Lock)
之前介绍的CAS就是自旋锁的一种具体实现。同一时刻只能有一个线程获取到锁,没有获取到锁的线程通常有两种处理方式:
- 一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING);
- 把自己阻塞起来,等待重新调度请求,这种是互斥锁。
自旋锁的原理比较简单,如果持有锁的线程能在短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户进程和内核切换的消耗。
但是如果长时间上锁的话,自旋锁会非常耗费性能,它阻止了其他线程的运行和调度。线程持有锁的时间越长,则持有该锁的线程将被OS调度程序中断的风险越大。如果发生中断情况,那么其他线程将保持旋转状态(反复尝试获取锁),而持有该锁的线程并不打算释放锁,这样导致的是结果是无限期推迟,直到持有锁的线程可以完成并释放它为止。
解决上面这种情况一个很好的方式是给自旋锁设定一个自旋时间,等时间一到立即释放自旋锁。自旋锁的目的是占着CPU资源不进行释放,等到获取锁立即进行处理。
信号量 Semaphore
信号量是 Edsger Dijkstra 发明的数据结构,在解决多种同步问题时很有用。其本质是一个整数,并关联两个操作:
- 申请
acquire
(也称为wait
、decrement
或P
操作) - 释放
release
(也称signal
、increment
或V
操作)
acquire
操作将信号量减 1,
- 如果结果值为负则线程阻塞,且直到其他线程进行了信号量累加为正数才能恢复。
- 如结果为正数,线程则继续执行。
release
操作将信号量加 1,如存在被阻塞的线程,此时他们中的一个线程将解除阻塞。
Go 运行时提供的runtime_SemacquireMutex
和runtime_Semrelease
函数可用来实现sync.RWMutex
互斥锁。
Self Implementation
V1
package main
import "sync/atomic"
type MyMutex struct {
state int32
}
func xadd(val *int32, delta int32) (new int32) {
for {
old := *val
if atomic.CompareAndSwapInt32(val, old, old+delta) {
return old + delta
}
}
panic("unreached")
}
func (m *MyMutex) Lock() {
if xadd(&m.state, 1) == 1 {
return
}
}
func (m *MyMutex) Unlock() {
if xadd(&m.state, -1) == 0 {
return
}
}
首先Mutex的结构非常简单,只包括status一个标志位。
- status表示当前是否已经有goroutine占用了锁,如果key==0则说明当前mutex未被占用,如果key==1则说明mutex已被占用。
这个实现非常简单:
- 优点在于如果一个锁被占用的时间非常非常短,那么其他想在这时获取锁的 entity 只需要自旋很短很短的时间,就能够获取到锁,而且这个 entity 对应的线程做内核态和用户态之间切换的概率也非常低(因为只自旋很短很短的时间)。最终总的效率非常高
- 但是潜在的问题也非常明显,假设如果一个 entity 会长时间占用一个锁,这时候有100个 entity 同时想获取锁,这时候这100个entity 会一直自旋,这自然会消耗CPU,而且更关键的是消耗的这些CPU时间片没有任何的意义
在这里,获取锁的对象被描述为entity,而不是goroutine,原因是因为有可能会发生这种情况:一个获取到锁的goroutine A可以pass mutex的指针给另一个goroutine B,并由goroutine B 来解锁。
因此,mutex并不一定会关联到特定的一个goroutine,
Definition
mutex对象仅有两个数值字段,分为为state(存储状态)和sema(用于计算休眠goroutine数量的信号量)。
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
state
的含义(在默认情况下,互斥锁的所有状态位都是 0):
最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放
- mutexLocked:第0个 bit,标记这个
mutex
是否已被某个goroutine所拥有。0表示未锁定,1表示已锁定 - mutexWoken:第1个 bit,表示是否已经从正常模式中被唤醒,0表示无事件,1表示mutex已被解除锁定,可以唤醒等待其它goroutine
- mutexStarving:第2个bit,标记当前的互斥锁是否进入了饥饿状态
- waitersCount — 当前互斥锁上等待的 goroutine 个数,0表示没有等待者
Mutex的状态机比较复杂,使用一个int32来表示:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken //2
mutexStarving //4
mutexWaiterShift = iota //3
)
32 3 2 1 0
| | | | |
| | | | |
v-----------------------------------------------v-------------v-------------v-------------+
| | | | v
| waitersCount |mutexStarving| mutexWoken | mutexLocked |
| | | | |
+-----------------------------------------------+-------------+-------------+-------------+
同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
为了保证锁的公平性,设计上互斥锁有两种模式:正常模式(normal)和饥饿模式(starvation)。
正常模式:
- 所有等待锁的goroutine按照FIFO顺序等待。被唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine一起竞争锁的拥有权(ownership)。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚被唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。如果一个正在等待的goroutine超过1ms后仍没有获取锁,那么它将会把锁转变为饥饿模式(starvation)。
饥饿模式(starvation):
- 锁的所有权将从unlock的gorutine直接交给等待队列中的第一个gorutine等待者。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态,也不会去尝试自旋操作(spin),而是被放在等待队列的尾部。
- 如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:1. 它是队列中的最后一个;2. 它等待的时长小于1ms。则会将锁的状态转换为正常状态。
- 正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
加锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
// 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
可以看到,这里用了CAS(compare-and-swap),CAS实现是基于乐观并发控制思想(optimistic concurrent control)。
slowLock
的获取锁流程有两种模式: 饥饿模式 和 正常模式,被starving
标识。
func (m *Mutex) lockSlow() {
// 标记本goroutine的等待时间
var waitStartTime int64
// 本goroutine是否已经处于饥饿状态
starving := false
// 本goroutine是否已唤醒
awoke := false
// 自旋次数
iter := 0
// 记录锁当前的状态
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解锁
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
Reference
- https://tour.golang.org/concurrency/9
- https://gobyexample.com/mutexes
- https://golang.org/pkg/sync/#Mutex.Lock
- https://segmentfault.com/a/1190000000506960?utm_source=sf-similar-article
- https://segmentfault.com/a/1190000023874384
- https://colobu.com/2018/12/18/dive-into-sync-mutex/
- https://fivezh.github.io/2019/04/09/sync_mutex_translation/
- https://en.wikipedia.org/wiki/Semaphore_(programming)