go?sync?Waitgroup數(shù)據(jù)結(jié)構(gòu)實現(xiàn)基本操作詳解
WaitGroup 示例
本文基于 Go 1.19。
go 里面的 WaitGroup 是非常常見的一種并發(fā)控制方式,它可以讓我們的代碼等待一組 goroutine 的結(jié)束。 比如在主協(xié)程中等待幾個子協(xié)程去做一些耗時的操作,如發(fā)起幾個 HTTP 請求,然后等待它們的結(jié)果。
下面的代碼展示了一個 goroutine 等待另外 2 個 goroutine 結(jié)束的例子:
func TestWaitgroup(t *testing.T) {
var wg sync.WaitGroup
// 計數(shù)器 +2
wg.Add(2)
go func() {
sendHttpRequest("https://baidu.com")
// 計數(shù)器 -1
wg.Done()
}()
go func() {
sendHttpRequest("https://baidu.com")
// 計數(shù)器 -1
wg.Done()
}()
// 阻塞。計數(shù)器為 0 的時候,Wait 返回
wg.Wait()
}
// 發(fā)起 HTTP GET 請求
func sendHttpRequest(url string) (string, error) {
method := "GET"
client := &http.Client{}
req, err := http.NewRequest(method, url, nil)
if err != nil {
return "", err
}
res, err := client.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}
return string(body), err
}
在這個例子中,我們做了如下事情:
- 定義了一個
WaitGroup對象wg,調(diào)用wg.Add(2)將其計數(shù)器+2。 - 啟動兩個新的 goroutine,在這兩個 goroutine 中,使用
sendHttpRequest函數(shù)發(fā)起了一個 HTTP 請求。 - 在 HTTP 請求返回之后,調(diào)用
wg.Done將計數(shù)器-1。 - 在函數(shù)的最后,我們調(diào)用了
wg.Wait,這個方法會阻塞,直到WaitGroup的計數(shù)器的值為 0 才會解除阻塞狀態(tài)。
WaitGroup 基本原理
WaitGroup 內(nèi)部通過一個計數(shù)器來統(tǒng)計有多少協(xié)程被等待。這個計數(shù)器的值在我們啟動 goroutine 之前先寫入(使用 Add 方法), 然后在 goroutine 結(jié)束的時候,將這個計數(shù)器減 1(使用 Done 方法)。除此之外,在啟動這些 goroutine 的協(xié)程中, 會調(diào)用 Wait 來進行等待,在 Wait 調(diào)用的地方會阻塞,直到 WaitGroup 內(nèi)部的計數(shù)器減到 0。 也就實現(xiàn)了等待一組 goroutine 的目的
背景知識
在操作系統(tǒng)中,有多種實現(xiàn)進程/線程間同步的方式,如:test_and_set、compare_and_swap、互斥鎖等。 除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進程能夠同步活動。
信號量
一個信號量(semaphore)S是一個整型變量,它除了初始化外只能通過兩個標準的原子操作:wait() 和 signal() 來訪問。 操作 wait() 最初稱為 P(荷蘭語 proberen,測試);操作 signal() 最初稱為 V(荷蘭語 verhogen,增加),可按如下來定義 wait():
PV 原語。
wait(S) {
while (S <= 0)
; // 忙等待
S--;
}
可按如下來定義 signal():
signal(S) {
S++;
}
在 wait() 和 signal() 操作中,信號量整數(shù)值的修改應不可分割地執(zhí)行。也就是說,當一個進程修改信號量值時,沒有其他進程能夠同時修改同一信號量的值。
簡單來說,信號量實現(xiàn)的功能是:
- 當信號量>0 時,表示資源可用,則
wait會對信號量執(zhí)行減 1 操作。 - 當信號量<=0 時,表示資源暫時不可用,獲取信號量時,當前的進程/線程會阻塞,直到信號量為正時被喚醒。
WaitGroup 中的信號量
在 WaitGroup 中,使用了信號量來實現(xiàn) goroutine 的阻塞以及喚醒:
- 在調(diào)用
Wait的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時候解除阻塞狀態(tài),得以繼續(xù)執(zhí)行。 - 在調(diào)用
Done的時候,如果WaitGroup內(nèi)的等待協(xié)程的計數(shù)器減到 0 的時候,信號量會進行遞增,這樣那些阻塞的協(xié)程會進行執(zhí)行下去。
WaitGroup 數(shù)據(jù)結(jié)構(gòu)
type WaitGroup struct {
noCopy noCopy
// 高 32 位為計數(shù)器,低 32 位為等待者數(shù)量
state atomic.Uint64
sema uint32
}
noCopy
我們發(fā)現(xiàn),WaitGroup 中有一個字段 noCopy,顧名思義,它的目的是防止復制。 這個字段在運行時是沒有什么影響的,但是我們通過 go vet 可以發(fā)現(xiàn)我們對 WaitGroup 的復制。 為什么不能復制呢?因為一旦復制,WaitGroup 內(nèi)的計數(shù)器就不再準確了,比如下面這個例子:
func test(wg sync.WaitGroup) {
wg.Done()
}
func TestWaitGroup(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
test(wg)
wg.Wait()
}
go 里面的函數(shù)參數(shù)傳遞是值傳遞。調(diào)用 test(wg) 的時候?qū)?WaitGroup 復制了一份。
在這個例子中,程序會永遠阻塞下去,因為 test 中調(diào)用 wg.Done() 的時候,只是將 WaitGroup 副本的計數(shù)器減去了 1, 而 TestWaitGroup 里面的 WaitGroup 的計數(shù)器并沒有發(fā)生改變,因此 Wait 會永遠阻塞。
我們?nèi)绻枰獙?WaitGroup 作為參數(shù),請傳遞指針:
func test(wg *sync.WaitGroup) {
wg.Done()
}
傳遞指針之后,我們在 test 中調(diào)用 wg.Done() 修改的就是 TestWaitGroup 里面同一個 WaitGroup。 從而,Wait 方法可以正常返回。
state
WaitGroup 里面的 state 是一個 64 位的 atomic.Uint64 類型,它的高 32 位用來保存 counter(也就是上面說的計數(shù)器),低 32 位用來保存 waiter(也就是阻塞在 Wait 上的 goroutine 數(shù)量。)

sema
WaitGroup 通過 sema 來記錄信號量:
runtime_Semrelease表示將信號量遞增(對應信號量中的signal操作)runtime_Semacquire表示將信號量遞減(對應信號量中的wait操作)
簡單來說,在調(diào)用 runtime_Semacquire 的時候 goroutine 會阻塞,而調(diào)用 runtime_Semrelease 會喚醒阻塞在同一個信號量上的 goroutine。
WaitGroup 的三個基本操作
Add: 這會將WaitGroup里面的counter加上一個整數(shù)(也就是傳遞給Add的函數(shù)參數(shù))。Done: 這會將WaitGroup里面的counter減去 1。Wait: 這會將WaitGroup里面的waiter加上 1,并且調(diào)用Wait的地方會阻塞。(有可能會有多個 goroutine 等待一個WaitGroup)
WaitGroup 的實現(xiàn)
Add 的實現(xiàn)
Add 做了下面兩件事:
- 將
delta加到state的高 32 位上 - 如果
counter為0了,并且waiter大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在Wait上的 goroutine。
源碼實現(xiàn):
func (wg *WaitGroup) Add(delta int) {
// wg.state 的計數(shù)器加上 delta
//(加到 state 的高 32 上)
state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta
v := int32(state >> 32) // 高 32 位(counter)
w := uint32(state) // 低 32 位(waiter)
// 計數(shù)器不能為負數(shù)(加上 delta 之后不能為負數(shù),最小只能到 0)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 正常使用情況下,是先調(diào)用 Add 再調(diào)用 Wait 的,這種情況下,w 是 0,v > 0
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v > 0,計數(shù)器大于 0
// w == 0,沒有在 Wait 的協(xié)程
// 說明還沒有到喚醒 waiter 的時候
if v > 0 || w == 0 {
return
}
// Add 負數(shù)的時候,v 會減去對應的數(shù)值,減到最后 v 是 0。
// 計數(shù)器是 0,并且有等待的協(xié)程,現(xiàn)在要喚醒這些協(xié)程。
// 存在等待的協(xié)程時,goroutine 已將計數(shù)器設置為0。
// 現(xiàn)在不可能同時出現(xiàn)狀態(tài)突變:
// - Add 不能與 Wait 同時發(fā)生,
// - 如果看到計數(shù)器==0,則 Wait 不會增加等待的協(xié)程。
// 仍然要做一個廉價的健康檢查,以檢測 WaitGroup 的誤用。
if wg.state.Load() != state { // 不能在 Add 的同時調(diào)用 Wait
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 將等待的協(xié)程數(shù)量設置為 0。
wg.state.Store(0)
for ; w != 0; w-- {
// signal,調(diào)用 Wait 的地方會解除阻塞
runtime_Semrelease(&wg.sema, false, 0) // goyield
}
}
Done 的實現(xiàn)
WaitGroup 里的 Done 其實只是對 Add 的調(diào)用,但是它的效果是,將計數(shù)器的值減去 1。 背后的含義是:一個被等待的協(xié)程執(zhí)行完畢了。
Wait 的實現(xiàn)
Wait 主要功能是阻塞當前的協(xié)程:
Wait會先判斷計數(shù)器是否為0,為0說明沒有任何需要等待的協(xié)程,那么就可以直接返回了。- 如果計數(shù)器還不是
0,說明有協(xié)程還沒執(zhí)行完,那么調(diào)用Wait的地方就需要被阻塞起來,等待所有的協(xié)程完成。
源碼實現(xiàn):
func (wg *WaitGroup) Wait() {
for {
// 獲取當前計數(shù)器
state := wg.state.Load()
// 計數(shù)器
v := int32(state >> 32)
// waiter 數(shù)量
w := uint32(state)
// v 為 0,不需要等待,直接返回
if v == 0 {
// 計數(shù)器是 0,不需要等待
return
}
// 增加 waiter 數(shù)量。
// 調(diào)用一次 Wait,waiter 數(shù)量會加 1。
if wg.state.CompareAndSwap(state, state+1) {
// 這會阻塞,直到 sema (信號量)大于 0
runtime_Semacquire(&wg.sema) // goparkunlock
// state 不等 0
// wait 還沒有返回又繼續(xù)使用了 WaitGroup
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
// 解除阻塞狀態(tài)了,可以返回了
return
}
// 狀態(tài)沒有修改成功(state 沒有成功 +1),開始下一次嘗試。
}
}
總結(jié)
WaitGroup使用了信號量來實現(xiàn)了并發(fā)資源控制,sema字段表示信號量。- 使用
runtime_Semacquire會使得 goroutine 阻塞直到計數(shù)器減少至0,而使用runtime_Semrelease會使得信號量遞增,這等于是通知之前阻塞在信號量上的協(xié)程,告訴它們可以繼續(xù)執(zhí)行了。 WaitGroup作為參數(shù)傳遞的時候,需要傳遞指針作為參數(shù),否則在被調(diào)用函數(shù)內(nèi)對Add或者Done的調(diào)用,在caller里面調(diào)用的Wait會觀測不到。WaitGroup使用一個 64 位的數(shù)來保存計數(shù)器(高 32 位)和waiter(低 32 位,正在等待的協(xié)程的數(shù)量)。WaitGroup使用Add增加計數(shù)器,使用Done來將計數(shù)器減1,使用Wait來等待 goroutine。Wait會阻塞直到計數(shù)器減少到0。
以上就是go sync Waitgroup數(shù)據(jù)結(jié)構(gòu)實現(xiàn)基本操作詳解的詳細內(nèi)容,更多關于go sync Waitgroup數(shù)據(jù)結(jié)構(gòu)的資料請關注腳本之家其它相關文章!
相關文章
Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項的,即哈希后的數(shù)據(jù)無法解密,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2023-03-03

