Go語言讀寫鎖RWMutex的源碼分析
前言
在前面兩篇文章中 初見 Go Mutex 、Go Mutex 源碼詳解,我們學(xué)習(xí)了 Go語言 中的 Mutex
,它是一把互斥鎖,每次只允許一個 goroutine
進(jìn)入臨界區(qū),可以保證臨界區(qū)資源的狀態(tài)正確性。但是有的情況下,并不是所有 goroutine
都會修改臨界區(qū)狀態(tài),可能只是讀取臨界區(qū)的數(shù)據(jù),如果此時還是需要每個 goroutine
拿到鎖依次進(jìn)入的話,效率就有些低下了。例如房間里面有一幅畫,有人想修改,有人只是想看一下,完全可以放要看的一部分人進(jìn)去,等他們看完再讓修改的人進(jìn)去修改,這樣既提高了效率,也保證了臨界區(qū)資源的安全??春托薷?,對應(yīng)的就是讀和寫,本篇文章我們就一起來學(xué)習(xí)下 Go語言 中的讀寫鎖sync.RWMutex
。
說明:本文中的示例,均是基于Go1.17 64位機(jī)器
RWMutex 總覽
RWMutex
是一個讀/寫互斥鎖,在某一時刻只能由任意數(shù)量的 reader
持有 或者 一個 writer 持有。也就是說,要么放行任意數(shù)量的 reader,多個 reader 可以并行讀;要么放行一個 writer,多個 writer 需要串行寫。
RWMutex 對外暴露的方法有五個:
- RLock():讀操作獲取鎖,如果鎖已經(jīng)被
writer
占用,會一直阻塞直到writer
釋放鎖;否則直接獲得鎖; - RUnlock():讀操作完畢之后釋放鎖;
- Lock():寫操作獲取鎖,如果鎖已經(jīng)被
reader
或者writer
占用,會一直阻塞直到獲取到鎖;否則直接獲得鎖; - Unlock():寫操作完畢之后釋放鎖;
- RLocker():返回讀操作的
Locker
對象,該對象的Lock()
方法對應(yīng)RWMutex
的RLock()
,Unlock()
方法對應(yīng)RWMutex
的RUnlock()
方法。
一旦涉及到多個 reader 和 writer ,就需要考慮優(yōu)先級問題,是 reader 優(yōu)先還是 writer 優(yōu)先:
- reader優(yōu)先:只要有 reader 要進(jìn)行讀操作,writer 就一直等待,直到?jīng)]有 reader 到來。這種方式做到了讀操作的并發(fā),但是如果 reader 持續(xù)到來,會導(dǎo)致 writer 饑餓,一直不能進(jìn)行寫操作;
- writer優(yōu)先:只要有 writer 要進(jìn)行寫操作,reader 就一直等待,直到?jīng)]有 writer 到來。這種方式提高了寫操作的優(yōu)先級,但是如果 writer 持續(xù)到來,會導(dǎo)致 reader 饑餓,一直不能進(jìn)行讀操作;
- 沒有優(yōu)先級:按照先來先到的順序,沒有誰比誰更優(yōu)先,這種相對來說會更公平。
我們先來看下 RWMutex 的運(yùn)行機(jī)制,就可以知道它的優(yōu)先級是什么了。
可以想象 RWMutex 有兩個隊伍,一個是包含 所有reader 和你獲得準(zhǔn)入權(quán)writer 的 隊列A,一個是還沒有獲得準(zhǔn)入權(quán) writer 的 隊列B。
- 隊列 A 最多只允許有 一個writer,如果有其他 writer,需要在 隊列B 等待;
- 當(dāng)一個 writer 到了 隊列A 后,只允許它 之前的reader 執(zhí)行讀操作,新來的 reader 需要在 隊列A 后面排隊;
- 當(dāng)前面的 reader 執(zhí)行完讀操作之后,writer 執(zhí)行寫操作;
- writer 執(zhí)行完寫操作后,讓 后面的reader 執(zhí)行讀操作,再喚醒隊列B 的一個 writer 到 隊列A 后面排隊。
初始時刻 隊列A 中 writer W1
前面有三個 reader,后面有兩個 reader,隊列B中有兩個 writer
RWMutex運(yùn)行示例:初始時刻
并發(fā)讀 多個 reader 可以同時獲取到讀鎖,進(jìn)入臨界區(qū)進(jìn)行讀操作;writer W1
在 隊列A 中等待,同時又來了兩個 reader,直接在 隊列A 后面排隊
RWMutex運(yùn)行示例:并發(fā)讀
寫操作 W1
前面所有的 reader 完成后,W1
獲得鎖,進(jìn)入臨界區(qū)操作
RWMutex運(yùn)行示例:寫操作
獲得準(zhǔn)入權(quán) W1
完成寫操作退出,先讓后面排隊的 reader 進(jìn)行讀操作,然后從 隊列B 中喚醒 W2
到 隊列A 排隊。W2
從 隊列B 到 隊列A 的過程中,R8
先到了 隊列A,因此 R8
可以執(zhí)行讀操作。R9
、R10
、R11
在 W2
之后到的,所以在后面排隊;新來的 W4
直接在隊列B 排隊。
RWMutex運(yùn)行示例:獲得準(zhǔn)入權(quán)
從上面的示例可以看出,RWMutex
可以看作是沒有優(yōu)先級,按照先來先到的順序去執(zhí)行,只不過是 多個reader 可以 并行 去執(zhí)行罷了。
深入源碼
數(shù)據(jù)結(jié)構(gòu)
type?RWMutex?struct?{ ?w???????????Mutex??//?控制?writer?在?隊列B?排隊 ?writerSem???uint32?//?寫信號量,用于等待前面的?reader?完成讀操作 ?readerSem???uint32?//?讀信號量,用于等待前面的?writer?完成寫操作 ?readerCount?int32??//?reader?的總數(shù)量,同時也指示是否有?writer?在隊列A?中等待 ?readerWait??int32??//?隊列A?中?writer?前面?reader?的數(shù)量 } //?允許最大的?reader?數(shù)量 const?rwmutexMaxReaders?=?1?<<?30
上述中的幾個變量,比較特殊的是 readerCount
,不僅表示當(dāng)前 所有reader
的數(shù)量,同時表示是否有 writer
在隊列A中等待。當(dāng) readerCount
變?yōu)?nbsp;負(fù)數(shù)
時,就代表有 writer 在隊列A 中等待了。
- 當(dāng)有 writer 進(jìn)入 隊列A 后,會將
readerCount
變?yōu)樨?fù)數(shù),即readerCount = readerCount - rwmutexMaxReaders
,同時利用readerWait
變量記錄它前面有多少個 reader; - 如果有新來的 reader,發(fā)現(xiàn)
readerCount
是負(fù)數(shù),就會直接去后面排隊; - writer 前面的 reader 在釋放鎖時,會將
readerCount
和readerWait
都減一,當(dāng) readerWait==0 時,表示 writer 前面的所有 reader 都執(zhí)行完了,可以讓 writer 執(zhí)行寫操作了; - writer 執(zhí)行寫操作完畢后,會將 readerCount 再變回正數(shù),
readerCount = readerCount + rwmutexMaxReaders
。
舉例:假設(shè)當(dāng)前有兩個 reader,readerCount = 2;允許最大的reader 數(shù)量為 10
- 當(dāng) writer 進(jìn)入隊列A 時,readerCount = readerCount - rwmutexMaxReaders = -8,readerWait = readerCount = 2
- 如果再來 3 個reader,readerCount = readerCount + 3 = -5
- 獲得讀鎖的兩個reader 執(zhí)行完后,readerCount = readerCount - 2 = -7,readerWait = readerWait-2 =0,writer 獲得鎖
- writer 執(zhí)行完后,readerCount = readerCount + rwmutexMaxReaders = 3,當(dāng)前有 3個 reader
RLock()
reader 執(zhí)行讀操作之前,需要調(diào)用 RLock() 獲取鎖
func?(rw?*RWMutex)?RLock()?{ ? ??//?reader?加鎖,將?readerCount?加一,表示多了個?reader ??if?atomic.AddInt32(&rw.readerCount,?1)?<?0?{ ?? ????//?如果?readerCount<0,說明有?writer?在自己前面等待,排隊等待讀信號量 ??runtime_SemacquireMutex(&rw.readerSem,?false,?0) ?} }
RUnlock()
reader 執(zhí)行完讀操作后,調(diào)用 RUnlock() 釋放鎖
func?(rw?*RWMutex)?RUnlock()?{ ??//?reader?釋放鎖,將?readerCount?減一,表示少了個?reader ?if?r?:=?atomic.AddInt32(&rw.readerCount,?-1);?r?<?0?{ ?? ????//?如果readerCount<0,說明有?writer?在自己后面等待,看是否要讓?writer?運(yùn)行 ??rw.rUnlockSlow(r) ?} }
func?(rw?*RWMutex)?rUnlockSlow(r?int32)?{ ??//?將?readerWait?減一,表示前面的?reader?少了一個 ?if?atomic.AddInt32(&rw.readerWait,?-1)?==?0?{ ???? ??//?如果?readerWait?變?yōu)榱?,那么自己就是最后一個完成的?reader ????//?釋放寫信號量,讓 writer 運(yùn)行 ??runtime_Semrelease(&rw.writerSem,?false,?1) ?} }
Lock()
writer 執(zhí)行寫操作之前,調(diào)用 Lock() 獲取鎖
func?(rw?*RWMutex)?Lock()?{ ???//?利用互斥鎖,如果前面有 writer,那么就需要等待互斥鎖,即在隊列B 中排隊等待;如果沒有,可以直接進(jìn)入?隊列A 排隊 ???rw.w.Lock() ??//?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?將?readerCount?變成了負(fù)數(shù) ??//?再加?rwmutexMaxReaders,相當(dāng)于?r?=?readerCount,r?就是?writer?前面的?reader?數(shù)量 ???r?:=?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?+?rwmutexMaxReaders ??? ???//?如果?r!=?0?,表示自己前面有?reader,那么令?readerWait?=?r,要等前面的?reader?運(yùn)行完 ???if?r?!=?0?&&?atomic.AddInt32(&rw.readerWait,?r)?!=?0?{ ??????runtime_SemacquireMutex(&rw.writerSem,?false,?0) ???} }
Lock()
和 RUnlock()
是會并發(fā)進(jìn)行的:
- 如果 Lock() 將 readerCount 變?yōu)樨?fù)數(shù)后,假設(shè) r=3,表示加入的那一刻前面有三個 reader,還沒有賦值 readerWait CPU 就被強(qiáng)占了,readerWait = 0;
- 假設(shè)此時三個 reader 的 RUnlock() 會進(jìn)入到 rUnlockSlow() 邏輯,每個 reader 都將 readerWait 減一, readerWait 會變成負(fù)數(shù),此時不符合喚醒 writer 的條件;
- 三個 reader 運(yùn)行完之后,此時 readerWait = -3, Lock() 運(yùn)行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不會休眠,直接獲取到鎖,因為前面的 reader 都運(yùn)行完了。
這就是為什么 rUnlockSlow()
要判斷 atomic.AddInt32(&rw.readerWait, -1) == 0
以及 Lock()
要判斷 atomic.AddInt32(&rw.readerWait, r) != 0
的原因。
Unlock()
writer 執(zhí)行寫操作之后,調(diào)用 Lock() 釋放鎖
func?(rw?*RWMutex)?Unlock()?{ ??//?將?readerCount?變?yōu)檎龜?shù),表示當(dāng)前沒有?writer?在隊列A?等待了 ?r?:=?atomic.AddInt32(&rw.readerCount,?rwmutexMaxReaders) ??//?將自己后面等待的?reader?喚醒,可以進(jìn)行讀操作了 ?for?i?:=?0;?i?<?int(r);?i++?{ ??runtime_Semrelease(&rw.readerSem,?false,?0) ?} ? ??//?釋放互斥鎖,如果隊列B有writer,相當(dāng)于喚醒一個來隊列A 排隊 ?rw.w.Unlock() }
writer 對 readerCount 一加一減,不會改變整體狀態(tài),只是用正負(fù)來表示是否有 writer 在等待。當(dāng)然,如果在 writer 將 readerCount變?yōu)樨?fù)數(shù)后,來了很多 reader,將 readerCount 變?yōu)榱苏龜?shù),此時reader 在 writer 沒有釋放鎖的時候就獲取到鎖了,是有問題的。但是 rwmutexMaxReaders 非常大,可以不考慮這個問題。
常見問題
不可復(fù)制
和 Mutex 一樣,RWMutex 也是不可復(fù)制。不能復(fù)制的原因和互斥鎖一樣。一旦讀寫鎖被使用,它的字段就會記錄它當(dāng)前的一些狀態(tài)。這個時候你去復(fù)制這把鎖,就會把它的狀態(tài)也給復(fù)制過來。但是,原來的鎖在釋放的時候,并不會修改你復(fù)制出來的這個讀寫鎖,這就會導(dǎo)致復(fù)制出來的讀寫鎖的狀態(tài)不對,可能永遠(yuǎn)無法釋放鎖。
不可重入
不可重入的原因是,獲得鎖之后,還沒釋放鎖,又申請鎖,這樣有可能造成死鎖。比如 reader A 獲取到了讀鎖,writer B 等待 reader A 釋放鎖,reader 還沒釋放鎖又申請了一把鎖,但是這把鎖申請不成功,他需要等待 writer B。這就形成了一個循環(huán)等待的死鎖。
加鎖和釋放鎖一定要成對出現(xiàn),不能忘記釋放鎖,也不能解鎖一個未加鎖的鎖。
實戰(zhàn)一下
Go 中的 map 是不支持 并發(fā)寫的,我們可以利用 讀寫鎖 RWMutex 來實現(xiàn)并發(fā)安全的 map。在讀多寫少的情況下,使用 RWMutex 要比 Mutex 性能高。
package?main import?( ?"fmt" ?"math/rand" ?"sync" ?"time" ) type?ConcurrentMap?struct?{ ?m?????sync.RWMutex ?items?map[string]interface{} } func?(c?*ConcurrentMap)?Add(key?string,?value?interface{})?{ ?c.m.Lock() ?defer?c.m.Unlock() ?c.items[key]?=?value } func?(c?*ConcurrentMap)?Remove(key?string)?{ ?c.m.Lock() ?defer?c.m.Unlock() ?delete(c.items,?key) } func?(c?*ConcurrentMap)?Get(key?string)?interface{}?{ ?c.m.RLock() ?defer?c.m.RUnlock() ?return?c.items[key] } func?NewConcurrentMap()?ConcurrentMap?{ ?return?ConcurrentMap{ ??items:?make(map[string]interface{}), ?} } func?main()?{ ?m?:=?NewConcurrentMap() ?var?wait?sync.WaitGroup ?wait.Add(10000) ?for?i?:=?0;?i?<?10000;?i++?{ ??key?:=?fmt.Sprintf("%d",?rand.Intn(10)) ??value?:=?fmt.Sprintf("%d",?rand.Intn(100)) ??if?i%100?==?0?{ ???go?func()?{ ????defer?wait.Done() ????m.Add(key,?value) ???}() ??}?else?{ ???go?func()?{ ????defer?wait.Done() ????fmt.Println(m.Get(key)) ????time.Sleep(time.Millisecond?*?10) ???}() ??} ?} ?wait.Wait() }
以上就是Go語言讀寫鎖RWMutex的源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Go語言讀寫鎖RWMutex的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章

golang調(diào)用shell命令(實時輸出,終止)

go數(shù)據(jù)結(jié)構(gòu)和算法BitMap原理及實現(xiàn)示例

jenkins配置golang?代碼工程自動發(fā)布的實現(xiàn)方法