Golang信號(hào)量設(shè)計(jì)實(shí)現(xiàn)示例詳解
開篇
在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結(jié)構(gòu)包含了兩個(gè)字段,state 和 sema:
type Mutex struct { state int32 sema uint32 }
- state 代表互斥鎖的狀態(tài),比如是否被鎖定;
- sema 表示信號(hào)量,協(xié)程阻塞會(huì)等待該信號(hào)量,解鎖的協(xié)程釋放信號(hào)量從而喚醒等待信號(hào)量的協(xié)程。
這個(gè) sema 就是 semaphore 信號(hào)量的意思。Golang 協(xié)程之間的搶鎖,實(shí)際上爭(zhēng)搶給Locked
賦值的權(quán)利,能給 Locked
置為1,就說明搶鎖成功。搶不到就阻塞等待 sema
信號(hào)量,一旦持有鎖的協(xié)程解鎖,那么等待的協(xié)程會(huì)依次被喚醒。
有意思的是,雖然 semaphore 在鎖的實(shí)現(xiàn)中起到了至關(guān)重要的作用,Golang 對(duì)信號(hào)量的實(shí)現(xiàn)卻是隱藏在 runtime 中,并沒有包含到標(biāo)準(zhǔn)庫里來,在 src 源碼中我們可以看到底層依賴的信號(hào)量相關(guān)函數(shù)。
// defined in package runtime // Semacquire waits until *s > 0 and then atomically decrements it. // It is intended as a simple sleep primitive for use by the synchronization // library and should not be used directly. func runtime_Semacquire(s *uint32) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
- runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻將 s 減去 1【原子操作】;
- runtime_Semrelease:將 s 增加 1,然后通知一個(gè)阻塞在 runtime_Semacquire 的 goroutine【原子操作】。
兩個(gè)原子操作,一個(gè) acquire,一個(gè) release,其實(shí)就代表了對(duì)資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個(gè)并發(fā)控制的能力,而對(duì)信號(hào)量的管理又是 Mutex 的基礎(chǔ)。
雖然源碼看不到,但 Golang 其實(shí)在擴(kuò)展庫 golang.org/x/sync/semaphore
也提供了一套信號(hào)量的實(shí)現(xiàn),我們可以由此來參考一下,理解 semaphore 的實(shí)現(xiàn)思路。
信號(hào)量
在看源碼之前,我們先理清楚【信號(hào)量】設(shè)計(jì)背后的場(chǎng)景和原理。
信號(hào)量的概念是荷蘭計(jì)算機(jī)科學(xué)家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。在系統(tǒng)中,會(huì)給每一個(gè)進(jìn)程一個(gè)信號(hào)量,代表每個(gè)進(jìn)程目前的狀態(tài)。未得到控制權(quán)的進(jìn)程,會(huì)在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號(hào)到來。
在 Mutex 依賴的信號(hào)量機(jī)制中我們可以看到,這里本質(zhì)就是依賴 sema 一個(gè) uint32 的變量 + 原子操作來實(shí)現(xiàn)并發(fā)控制能力。當(dāng) goroutine 完成對(duì)信號(hào)量等待時(shí),該變量 -1,當(dāng) goroutine 完成對(duì)信號(hào)量的釋放時(shí),該變量 +1。
如果一個(gè)新的 goroutine 發(fā)現(xiàn)信號(hào)量不大于 0,說明資源暫時(shí)沒有,就得阻塞等待。直到信號(hào)量 > 0,此時(shí)的語義是有新的資源,該goroutine就會(huì)結(jié)束等待,完成對(duì)信號(hào)量的 -1 并返回。注意我們上面有提到,runtime 支持的兩個(gè)方法都是原子性的,不用擔(dān)心兩個(gè)同時(shí)在等待的 goroutine 同時(shí)搶占同一份資源。
典型的信號(hào)量場(chǎng)景是【圖書館借書】。假設(shè)學(xué)校圖書館某熱門書籍現(xiàn)在只有 100 本存貨,但是上萬學(xué)生都想借閱,怎么辦?
直接買一萬本書是非常簡(jiǎn)單粗暴的解法,但資源有限,這不是長(zhǎng)久之計(jì)。
常見的解決方案很簡(jiǎn)單:學(xué)生們先登記,一個(gè)一個(gè)來。我們先給 100 個(gè)同學(xué)發(fā)出,剩下的你們繼續(xù)等,等到什么時(shí)候借書的同學(xué)看完了,把書還回來了,就給排隊(duì)等待的同學(xué)們發(fā)放。同時(shí),為了避免超發(fā),每發(fā)一個(gè),都需要在維護(hù)的記錄里將【余量】減去 1,每還回來一個(gè),就把【余量】加上 1。
runtime_Semacquire 就是排隊(duì)等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。
另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質(zhì)上只是一種場(chǎng)景,事實(shí)上就算是圖書館借書,也完全有可能出現(xiàn)一個(gè)人同時(shí)借了兩本一模一樣的書。所以,信號(hào)量的設(shè)計(jì)需要支持 N 個(gè)資源的獲取和釋放。
所以,我們對(duì)于 acquire 和 release 兩種操作的語義如下:
- release: 將信號(hào)量增加 n【保證原子性】;
- acquire: 若信號(hào)量 < n,阻塞等待,直到信號(hào)量 >= n,此時(shí)將信號(hào)量的值減去 n【保證原子性】。
semaphore 擴(kuò)展庫實(shí)現(xiàn)
這里我們結(jié)合golang.org/x/sync/semaphore
源碼來看看怎樣設(shè)計(jì)出來我們上面提到的信號(hào)量結(jié)構(gòu)。
// NewWeighted creates a new weighted semaphore with the given // maximum combined weight for concurrent access. func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w } // Weighted provides a way to bound concurrent access to a resource. // The callers can request access with a given weight. type Weighted struct { size int64 // 最大資源數(shù) cur int64 // 當(dāng)前已被使用的資源 mu sync.Mutex waiters list.List // 等待隊(duì)列 }
有意思的是,雖然包名是 semaphore,但是擴(kuò)展庫里真正給【信號(hào)量結(jié)構(gòu)體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個(gè)數(shù) n(對(duì)應(yīng) size),就可以生成一個(gè) Weighted 信號(hào)量結(jié)構(gòu)。
Weighted 提供了三個(gè)方法來實(shí)現(xiàn)對(duì)信號(hào)量機(jī)制的支持:
- Acquire
對(duì)應(yīng)上面我們提到的 acquire 語義,注意我們提到過,抽象的來講,acquire 成功與否其實(shí)不太看返回值,而是只要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。
但在 Golang 實(shí)現(xiàn)當(dāng)中,我們肯定不希望,如果發(fā)生了異常 case,導(dǎo)致一直阻塞在這里。所以你可以看到 Acquire 的入?yún)⒗镉袀€(gè) context.Context,借用 context 的上下文控制能力,你可以對(duì)此進(jìn)行 cancel, 可以設(shè)置 timeout 等待超時(shí),就能對(duì) acquire 行為進(jìn)行更多約束。
所以,acquire 之后我們?nèi)匀恍枰獧z查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經(jīng)不合法了。
- Release
跟上面提到的 release 語義完全一致,傳入你要釋放的資源數(shù) n,保證原子性地增加信號(hào)量。
- TryAcquire
這里其實(shí)跟 sync 包中的各類 TryXXX 函數(shù)定位很像。并發(fā)的機(jī)制中大都包含 fast path 和 slow path,比如首個(gè) goroutine 先來 acquire,那么一定是能拿到的,后續(xù)再來請(qǐng)求的 goroutine 由于慢了一步,只能走 slow path 進(jìn)行等待,自旋等操作。sync 包中絕大部分精華,都在于 slow path 的處理。fast path 大多是一個(gè)基于 atomic 包的原子操作,比如 CAS 就可以解決。
TryAcquire 跟 Acquire 的區(qū)別在于,雖然也是要資源,但是不等待。有了我就獲取,就減信號(hào)量,返回 trye。但是如果目前還沒有,我不會(huì)阻塞在這里,而是直接返回 false。
下面我們逐個(gè)方法看看,Weighted 是怎樣實(shí)現(xiàn)的。
Acquire
// Acquire acquires the semaphore with a weight of n, blocking until resources // are available or ctx is done. On success, returns nil. On failure, returns // ctx.Err() and leaves the semaphore unchanged. // // If ctx is already done, Acquire may still succeed without blocking. func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() } ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil } }
在閱讀之前回憶一下上面 Weighted 結(jié)構(gòu)的定義,注意 Weighted 并沒有維護(hù)一個(gè)變量用來表示【當(dāng)前剩余的資源】,這一點(diǎn)是通過 size(初始化的時(shí)候設(shè)置,表示總資源數(shù))減去 cur(當(dāng)前已被使用的資源),二者作差得到的。
我們來拆解一下上面這段代碼:
第一步:這是常規(guī)意義上的 fast path
s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil }
- 先上鎖,保證并發(fā)安全;
- 校驗(yàn)如果 size - cur >= n,代表剩余的資源是足夠,同時(shí) waiters 這個(gè)等待隊(duì)列為空,代表沒有別的協(xié)程在等待;
- 此時(shí)就沒什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 個(gè)資源,然后解鎖返回,很直接。
第二步:針對(duì)特定場(chǎng)景做提前剪枝
if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() }
如果請(qǐng)求的資源數(shù)量,甚至都大于資源總數(shù)量了,說明這個(gè)協(xié)程心里沒數(shù)。。。。就算我現(xiàn)在把所有初始化的資源都拿回來,也喂不飽你呀?。?!那能怎么辦,我就不煩勞后面流程處理了,直接等你的 context 什么時(shí)候 Done,給你返回 context 的錯(cuò)誤就行了,同時(shí)先解個(gè)鎖,別耽誤別的 goroutine 拿資源。
第三步:資源是夠的,只是現(xiàn)在沒有,那就把當(dāng)前goroutine加到排隊(duì)的隊(duì)伍里
ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock()
這里 ready 結(jié)構(gòu)是個(gè)空結(jié)構(gòu)體的 channel,僅僅是為了實(shí)現(xiàn)協(xié)程間通信,通知什么時(shí)候資源 ready,建立一個(gè)屬于這個(gè) goroutine 的 waiter,然后塞到 Weighted 結(jié)構(gòu)的等待隊(duì)列 waiters 里。
搞定了以后直接解鎖,因?yàn)槟阋呀?jīng)來排隊(duì)了,手續(xù)處理完成,以后的路有別的通知機(jī)制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊(duì)。
第四步:排隊(duì)等待
select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil }
一個(gè) select 語句,只看兩種情況:1. 這個(gè) goroutine 的 context 超時(shí)了;2. 拿到了資源,皆大歡喜。
重點(diǎn)在于 ctx.Done 分支里 default 的處理。我們可以看到,如果是超時(shí)了,此時(shí)還沒拿到資源,首先會(huì)把當(dāng)前 goroutine 從 waiters 等待隊(duì)列里移除(合情合理,你既然因?yàn)樽约旱脑蜃霾涣酥鳎瑳]法繼續(xù)等待了,就別耽誤別人事了)。
然后接著判斷,若這個(gè) goroutine 同時(shí)也是排在最前的 goroutine,而且恰好現(xiàn)在有資源了,就趕緊通知隊(duì)里的 goroutine 們,伙計(jì)們,現(xiàn)在有資源了,趕緊來拿。我們來看看這個(gè) notifyWaiters 干了什么:
func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break // No more waiters blocked. } w := next.Value.(waiter) if s.size-s.cur < w.n { // Not enough tokens for the next waiter. We could keep going (to try to // find a waiter with a smaller request), but under load that could cause // starvation for large requests; instead, we leave all remaining waiters // blocked. // // Consider a semaphore used as a read-write lock, with N tokens, N // readers, and one writer. Each reader can Acquire(1) to obtain a read // lock. The writer can Acquire(N) to obtain a write lock, excluding all // of the readers. If we allow the readers to jump ahead in the queue, // the writer will starve — there is always one token available for every // reader. break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } }
其實(shí)很簡(jiǎn)單,遍歷 waiters 這個(gè)等待隊(duì)列,拿到排隊(duì)最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變量,資源給你,然后把你從等待隊(duì)列里移出去,再 close ready 那個(gè)goroutine 就行,算是通知一下。
重點(diǎn)部分在于,如果資源不夠怎么辦?
想象一下現(xiàn)在的處境,Weighted 這個(gè) semaphore 的確有資源,而目前要處理的這個(gè) goroutine 的的確確就是排隊(duì)最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現(xiàn)在你要的數(shù)量,比我手上有的少。。。。
很無奈,那怎么辦呢?
無非兩種解法:
- 我先不管你,反正你要的不夠,我先看看你后面那個(gè) goroutine 人家夠不夠,雖然你現(xiàn)在是排位第一個(gè),但是也得繼續(xù)等著;
- 沒辦法,你排第一,需求我就得滿足,所以我們都繼續(xù)等,等啥時(shí)候資源夠了就給你。
擴(kuò)展庫實(shí)際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這里的考慮在注釋里有提到,如果直接繼續(xù)看后面的 goroutine 夠不夠,優(yōu)先滿足后面的,在一些情況下會(huì)餓死有大資源要求的 goroutine,設(shè)計(jì)上不希望這樣的情況發(fā)生。
簡(jiǎn)單說:要的多不是錯(cuò),既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權(quán)利。
Release
// Release releases the semaphore with a weight of n. func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock() }
Release 這里的實(shí)現(xiàn)非常簡(jiǎn)單,一把鎖保障不出現(xiàn)并發(fā),然后將 cur 減去 n 即可,說明此時(shí)又有 n 個(gè)資源回到了貨倉。然后和上面 Acquire 一樣,調(diào)用 notifyWaiters,叫排隊(duì)第一的哥們(哦不,是 goroutine)來領(lǐng)東西了。
TryAcquire
// TryAcquire acquires the semaphore with a weight of n without blocking. // On success, returns true. On failure, returns false and leaves the semaphore unchanged. func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
其實(shí)就是 Acquire 方法的 fast path,只是返回了個(gè) bool,標(biāo)識(shí)是否獲取成功。
總結(jié)
今天我們了解了擴(kuò)展庫 semaphore 對(duì)于信號(hào)量的封裝實(shí)現(xiàn),整體代碼加上注釋也才 100 多行,是非常好的學(xué)習(xí)材料,建議大家有空了對(duì)著源碼再過一遍。Acquire 和 Release 的實(shí)現(xiàn)都很符合直覺。
其實(shí),我們使用 buffered channel 其實(shí)也可以模擬出來 n 個(gè)信號(hào)量的效果,但就不具備 semaphore Weighted 這套實(shí)現(xiàn)里面,一次獲取多個(gè)資源的能力了。
以上就是Golang信號(hào)量設(shè)計(jì)實(shí)現(xiàn)示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go信號(hào)量設(shè)計(jì)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang WaitGroup實(shí)現(xiàn)原理解析
WaitGroup是Golang并發(fā)的兩種方式之一,一個(gè)是Channel,另一個(gè)是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)的相關(guān)資料,需要的朋友可以參考下2023-02-02剖析Go編寫的Socket服務(wù)器模塊解耦及基礎(chǔ)模塊的設(shè)計(jì)
這篇文章主要介紹了Go的Socket服務(wù)器模塊解耦及日志和定時(shí)任務(wù)的模塊設(shè)計(jì),舉了一些Go語言編寫的服務(wù)器模塊的例子,需要的朋友可以參考下2016-03-03golang使用OpenTelemetry實(shí)現(xiàn)跨服務(wù)全鏈路追蹤詳解
這篇文章主要為大家介紹了golang使用OpenTelemetry實(shí)現(xiàn)跨服務(wù)全鏈路追蹤詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09golang?實(shí)現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg的處理方法
這篇文章主要介紹了golang?實(shí)現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg,下面主要介紹Golang 代碼使用方法及Golang PDF轉(zhuǎn)JPEG的詳細(xì)代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10構(gòu)建Golang應(yīng)用最小Docker鏡像的實(shí)現(xiàn)
這篇文章主要介紹了構(gòu)建Golang應(yīng)用最小Docker鏡像的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Golang基礎(chǔ)學(xué)習(xí)之map的示例詳解
哈希表是常見的數(shù)據(jù)結(jié)構(gòu),有的語言會(huì)將哈希稱作字典或者映射,在Go中,哈希就是常見的數(shù)據(jù)類型map,本文就來聊聊Golang中map的相關(guān)知識(shí)吧2023-03-03