Golang Mutex互斥鎖深入理解
引言
Golang的并發(fā)編程令人著迷,使用輕量的協(xié)程、基于CSP的channel、簡(jiǎn)單的go func()就可以開(kāi)始并發(fā)編程,在并發(fā)編程中,往往離不開(kāi)鎖的概念。
本文介紹了常用的同步原語(yǔ) sync.Mutex
,同時(shí)從源碼剖析它的結(jié)構(gòu)與實(shí)現(xiàn)原理,最后簡(jiǎn)單介紹了mutex在日常使用中可能遇到的問(wèn)題,希望大家讀有所獲。
Mutex結(jié)構(gòu)
Mutex運(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)位于sync/mutex.go
包
type Mutex struct { state int32 sema uint32 }
其中state
表示當(dāng)前互斥鎖的狀態(tài),sema
表示 控制鎖狀態(tài)的信號(hào)量.
互斥鎖的狀態(tài)定義在常量中:
const ( mutexLocked = 1 << iota // 1 ,處于鎖定狀態(tài); 2^0 mutexWoken // 2 ;從正常模式被從喚醒; 2^1 mutexStarving // 4 ;處于饑餓狀態(tài); 2^2 mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個(gè)數(shù)需要左移的位數(shù): 1 << mutexWaiterShift starvationThresholdNs = 1e6 // 鎖進(jìn)入饑餓狀態(tài)的等待時(shí)間 )
0即其他狀態(tài)。
sema
是一個(gè)組合,低三位分別表示鎖的三種狀態(tài),高29位表示正在等待互斥鎖釋放的gorountine個(gè)數(shù),和Java表示線程池狀態(tài)那部分有點(diǎn)類(lèi)似
一個(gè)mutex對(duì)象僅占用8個(gè)字節(jié),讓人不禁感嘆其設(shè)計(jì)的巧妙
饑餓模式和正常模式
正常模式
在正常模式下,等待的協(xié)程會(huì)按照先進(jìn)先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新創(chuàng)建的goroutine競(jìng)爭(zhēng)時(shí),大概率無(wú)法獲得鎖。
饑餓模式
為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了饑餓模式,保證了Mutex的公平性
在饑餓模式中,互斥鎖會(huì)直接交給等待隊(duì)列最前面的goroutine。新的goroutine 在該狀態(tài)下不能獲取鎖、也不會(huì)進(jìn)入自旋狀態(tài),它們只會(huì)在隊(duì)列的末尾等待。
狀態(tài)的切換
在正常模式下,一旦Goroutine超過(guò)1ms沒(méi)有獲取到鎖,它就會(huì)將當(dāng)前互斥鎖切換饑餓模式
如果一個(gè)goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時(shí)間少于 1ms,那么當(dāng)前的互斥鎖就會(huì)切換回正常模式。
加鎖和解鎖
加鎖
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // 原注釋?zhuān)?Slow path (outlined so that the fast path can be inlined) // 將 m.lockSlow() }
可以看到,當(dāng)前互斥鎖的狀態(tài)為0時(shí),嘗試將當(dāng)前鎖狀態(tài)設(shè)置為更新鎖定狀態(tài),且這些操作是原子的。
若當(dāng)前狀態(tài)不為0,則進(jìn)入lockSlow
方法
先定義了幾個(gè)參數(shù)
var waitStartTime int64 starving := false // awoke := false iter := 0 old := m.state
隨后進(jìn)入一個(gè)很大的for循環(huán),讓我們來(lái)逐步分析
自旋
for { // 1 && 2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 3. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
old&(mutexLocked|mutexStarving) == mutexLocked
當(dāng)且僅當(dāng)當(dāng)前鎖狀態(tài)為mutexLocked時(shí),表達(dá)式為true
runtime_canSpin(iter)
是否滿(mǎn)足自旋條件
- 運(yùn)行在擁有多個(gè)CPU的機(jī)器上;
- 當(dāng)前Goroutine為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;
- 當(dāng)前機(jī)器上至少存在一個(gè)正在運(yùn)行的處理器 P,并且處理的運(yùn)行隊(duì)列為空;
如果當(dāng)前狀態(tài)下自旋是合理的,將awoke
置為true,同時(shí)設(shè)置鎖狀態(tài)為mutexWoken
,進(jìn)入自旋邏輯
runtime_doSpin()
會(huì)執(zhí)行30次PAUSE
指令,并且僅占用CPU資源 代碼位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
計(jì)算鎖的新?tīng)顟B(tài)
停止了自旋后,
new := old // 1. if old&mutexStarving == 0 { new |= mutexLocked } // 2. if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 3 && 4. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 5. if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
old&mutexStarving == 0
表明原來(lái)不是饑餓模式。如果是饑餓模式的話(huà),其他goroutine不會(huì)執(zhí)行接下來(lái)的代碼,直接進(jìn)入等待隊(duì)列隊(duì)尾- 如果原來(lái)是
mutexLocked
或者mutexStarving
模式,waiterCounts數(shù)加一 - 如果被標(biāo)記為饑餓狀態(tài),且鎖狀態(tài)為
mutexLocked
的話(huà),設(shè)置鎖的新?tīng)顟B(tài)為饑餓狀態(tài)。 - 被標(biāo)記為饑餓狀態(tài)的前提是
被喚醒過(guò)且搶鎖失敗
- 計(jì)算新?tīng)顟B(tài)
更新鎖狀態(tài)
// 1. if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 3. runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 4. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 5. if old&mutexStarving != 0 { / if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } }
- 嘗試將鎖狀態(tài)設(shè)置為new 。這里設(shè)置成功不代表上鎖成功,有可能new不為
mutexLocked
或者是waiterCount數(shù)量的改變 waitStartTime
不為0 說(shuō)明當(dāng)前goroutine已經(jīng)等待過(guò)了,將當(dāng)前goroutine放到等待隊(duì)列的隊(duì)頭- 走到這里,會(huì)調(diào)用
runtime_SemacquireMutex
方法使當(dāng)前協(xié)程阻塞,runtime_SemacquireMutex
方法中會(huì)不斷嘗試獲得鎖,并會(huì)陷入休眠 等待信號(hào)量釋放。 - 當(dāng)前協(xié)程可以獲得信號(hào)量,從
runtime_SemacquireMutex
方法中返回。此時(shí)協(xié)程會(huì)去更新starving
標(biāo)志位:如果當(dāng)前starving
標(biāo)志位為true或者等待時(shí)間超過(guò)starvationThresholdNs
,將starving
置為true
之后會(huì)按照饑餓模式與正常模式,走不同的邏輯
- - 在正常模式下,這段代碼會(huì)設(shè)置喚醒和饑餓標(biāo)記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán);
- - 在饑餓模式下,當(dāng)前 Goroutine 會(huì)獲得互斥鎖,如果等待隊(duì)列中只存在當(dāng)前 Goroutine,互斥鎖還會(huì)從饑餓模式中退出;
解鎖
func (m *Mutex) Unlock() { // 1. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 2. m.unlockSlow(new) } }
- 將鎖狀態(tài)的值增加 -mutexLocked 。如果新?tīng)顟B(tài)不等于0,進(jìn)入
unlockSlow
方法
func (m *Mutex) unlockSlow(new int32) { // 1. if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // 2. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 2.1. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 2.2. runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 3. runtime_Semrelease(&m.sema, true, 1) } }
1.new+mutexLocked
代表將鎖置為1,如果兩個(gè)狀態(tài)& 不為0,則說(shuō)明重復(fù)解鎖.如果重復(fù)解鎖則拋出panic
2. 如果等待者數(shù)量等于0,或者鎖的狀態(tài)已經(jīng)變?yōu)閙utexWoken、mutexStarving、mutexStarving,則直接返回
- 將waiterCount數(shù)量-1,嘗試選擇一個(gè)goroutine喚醒
- 嘗試更新鎖狀態(tài),如果更新鎖狀態(tài)成功,則喚醒隊(duì)尾的一個(gè)gorountine
3. 如果不滿(mǎn)足 2
的判斷條件,則進(jìn)入饑餓模式,同時(shí)交出鎖的使用權(quán)
可能遇到的問(wèn)題
鎖拷貝
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock()
此時(shí)mu2
能夠正常解鎖,那么我們?cè)僭囋嚱怄imu1
呢
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock() mu1.Unlock()
可以看到發(fā)生了error
panic導(dǎo)致沒(méi)有unlock
當(dāng)lock()之后,可能由于代碼問(wèn)題導(dǎo)致程序發(fā)生了panic,那么mutex無(wú)法被及時(shí)unlock(),由于其他協(xié)程還在等待鎖,此時(shí)可能觸發(fā)死鎖
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個(gè)協(xié)程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { log.Println("recover") } wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ safeSlice.lock.Unlock() }() } wg.Wait() log.Println(len(safeSlice.s)) }
修改:
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個(gè)協(xié)程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { } safeSlice.lock.Unlock() wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ }() } wg.Wait() log.Println(len(safeSlice.s)) }
以上就是Golang Mutex互斥鎖深入理解的詳細(xì)內(nèi)容,更多關(guān)于Golang Mutex互斥鎖的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang將切片或數(shù)組根據(jù)某個(gè)字段進(jìn)行分組操作
這篇文章主要介紹了golang將切片或數(shù)組根據(jù)某個(gè)字段進(jìn)行分組操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12使用Go語(yǔ)言構(gòu)建高效的二叉搜索樹(shù)聯(lián)系簿
樹(shù)是一種重要的數(shù)據(jù)結(jié)構(gòu),而二叉搜索樹(shù)(BST)則是樹(shù)的一種常見(jiàn)形式,在本文中,我們將學(xué)習(xí)如何構(gòu)建一個(gè)高效的二叉搜索樹(shù)聯(lián)系簿,感興趣的可以了解下2024-01-01Golang中結(jié)構(gòu)體映射mapstructure庫(kù)深入詳解
mapstructure用于將通用的map[string]interface{}解碼到對(duì)應(yīng)的 Go 結(jié)構(gòu)體中,或者執(zhí)行相反的操作。很多時(shí)候,解析來(lái)自多種源頭的數(shù)據(jù)流時(shí),我們一般事先并不知道他們對(duì)應(yīng)的具體類(lèi)型。只有讀取到一些字段之后才能做出判斷2023-01-01go語(yǔ)言中切片的長(zhǎng)度和容量的區(qū)別
這篇文章主要介紹了go語(yǔ)言中切片的長(zhǎng)度和容量的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04go實(shí)現(xiàn)Redis讀寫(xiě)分離示例詳解
本篇文章將介紹Redis通信協(xié)議RESP,?而后在使用go來(lái)編寫(xiě)一個(gè)中間件,從而來(lái)完成Redis讀寫(xiě)分離,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08