一文詳解go同步協(xié)程的必備工具WaitGroup
1. 簡(jiǎn)介
本文將介紹 Go 語(yǔ)言中的 WaitGroup 并發(fā)原語(yǔ),包括 WaitGroup 的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng)以及常見的使用方式。能夠更好地理解和應(yīng)用 WaitGroup 來(lái)協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行,提高 Go 并發(fā)編程的效率和穩(wěn)定性。
2. 基本使用
2.1 定義
WaitGroup
是Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)中的一個(gè)結(jié)構(gòu)體,它提供了一種簡(jiǎn)單的機(jī)制,用于同步多個(gè)協(xié)程的執(zhí)行。適用于需要并發(fā)執(zhí)行多個(gè)任務(wù)并等待它們?nèi)客瓿珊蟛拍芾^續(xù)執(zhí)行后續(xù)操作的場(chǎng)景。
2.2 使用方式
首先主協(xié)程創(chuàng)建WaitGroup實(shí)例,然后在每個(gè)協(xié)程的開始處,調(diào)用Add(1)
方法,表示需要等待一個(gè)任務(wù)執(zhí)行完成,然后協(xié)程在任務(wù)執(zhí)行完成之后,調(diào)用Done
方法,表示任務(wù)已經(jīng)執(zhí)行完成了。
主協(xié)程中,需要調(diào)用Wait()
方法,等待所有協(xié)程完成任務(wù),示例如下:
func main(){ //首先主協(xié)程創(chuàng)建WaitGroup實(shí)例 var wg sync.WaitGroup // 開始時(shí)調(diào)用Add方法表示有個(gè)任務(wù)開始執(zhí)行 wg.Add(1) go func() { // 開始執(zhí)行... //完成之后,調(diào)用Done方法 wg.Done() }() // 調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù) wg.Wait() // 執(zhí)行后續(xù)邏輯 }
2.3 使用例子
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Printf("任務(wù)%d開始執(zhí)行\(zhòng)n", i) // 模擬協(xié)程任務(wù)執(zhí)行一段時(shí)間 time.Sleep(time.Duration(rand.Int() % 100)) // 線程任務(wù)執(zhí)行完成 fmt.Printf("任務(wù)%d執(zhí)行完畢\n", i) }(i) } fmt.Println("主協(xié)程開始等待所有任務(wù)執(zhí)行完成...") wg.Wait() fmt.Println("所有協(xié)程已經(jīng)執(zhí)行完畢...") }
在這個(gè)例子中,我們使用了sync.WaitGroup
來(lái)等待5個(gè)協(xié)程執(zhí)行完畢。在循環(huán)中,每創(chuàng)建一個(gè)任務(wù),我們調(diào)用一次wg.Add(1)
方法,然后啟動(dòng)一個(gè)協(xié)程去執(zhí)行任務(wù),當(dāng)協(xié)程完成任務(wù)后,調(diào)用wg.Done
方法,告知主協(xié)程任務(wù)已經(jīng)執(zhí)行完畢。然后主協(xié)程會(huì)在5個(gè)協(xié)程任務(wù)全部執(zhí)行完畢之后,才會(huì)繼續(xù)向下執(zhí)行。
3.實(shí)現(xiàn)原理
3.1 設(shè)計(jì)初衷
WaitGroup
的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作,通常會(huì)在一組協(xié)程中使用。
3.2 基本原理
sync.WaitGroup
結(jié)構(gòu)體中的 state1
和 state2
字段是用于實(shí)現(xiàn) WaitGroup
功能的重要變量。
type WaitGroup struct { noCopy noCopy state1 uint64 state2 uint32 }
由于 WaitGroup
需要等待一組操作完成之后再執(zhí)行,因此需要等待所有操作完成之后才能繼續(xù)執(zhí)行。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)計(jì)數(shù)器 counter
來(lái)記錄還有多少個(gè)操作沒(méi)有完成,如果 counter
的值為 0,則表示所有操作已經(jīng)完成。
同時(shí),WaitGroup
在所有任務(wù)都完成之后,需要喚醒所有處于等待的協(xié)程,此時(shí)需要知道有多少個(gè)協(xié)程處于等待狀態(tài)。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)等待計(jì)數(shù)器 waiter
來(lái)記錄當(dāng)前有多少個(gè)協(xié)程正在等待操作完成。
這里WaitGroup
對(duì)于計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn),是通過(guò)一個(gè)64位無(wú)符號(hào)整數(shù)來(lái)實(shí)現(xiàn)的,也就是WaitGroup
結(jié)構(gòu)體中的state1,其中高32位保存了任務(wù)計(jì)數(shù)器counter
的值,低32位保存了等待計(jì)數(shù)器waiter
的值。當(dāng)我們創(chuàng)建一個(gè) WaitGroup
實(shí)例時(shí),該實(shí)例的任務(wù)計(jì)數(shù)器和等待計(jì)數(shù)器都被初始化為 0。
而且,等待協(xié)程需要等待所有任務(wù)完成之后才能繼續(xù)執(zhí)行,所以等待協(xié)程在任務(wù)未完成時(shí)會(huì)被阻塞,當(dāng)任務(wù)全部完成后,自動(dòng)被喚醒。WaitGroup
使用 state2
用于實(shí)現(xiàn)信號(hào)量機(jī)制。通過(guò)調(diào)用 runtime_Semacquire()
和 runtime_Semrelease()
函數(shù),可以在不阻塞線程的情況下進(jìn)行等待和通知操作。
3.3 代碼實(shí)現(xiàn)
3.3.1 Add方法
調(diào)用 Add()
方法增加/減小counter
的值,delta的值可以是正數(shù),也可以是負(fù)數(shù),下面是Add
方法的源碼實(shí)現(xiàn):
func (wg *WaitGroup) Add(delta int) { // delta 的值可以為負(fù)數(shù),Done方法便是通過(guò)Add(-1)來(lái)實(shí)現(xiàn)的 // statep: 為state1的地址 semap: 為state2的地址 statep, semap := wg.state() // 高32位的值 加上 delta,增加任務(wù)計(jì)數(shù)器的值 state := atomic.AddUint64(statep, uint64(delta)<<32) // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù) v := int32(state >> 32) // 取低32位數(shù)據(jù),獲取到等待線程的值 w := uint32(state) // v > 0: 說(shuō)明還有待完成的任務(wù)數(shù),此時(shí)不應(yīng)該喚醒等待協(xié)程 // w = 0: 說(shuō)明沒(méi)有協(xié)程在等待,此時(shí)可以直接退出 if v > 0 || w == 0 { return } // 此時(shí)v = 0,所有任務(wù)都完成了,喚醒等待協(xié)程 *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
3.3.2 Done方法實(shí)現(xiàn)
調(diào)用 Done()
方法表示完成了一個(gè)任務(wù),通過(guò)調(diào)用Add
方法,delta
值為-1,減少任務(wù)計(jì)數(shù)器counter
的值,當(dāng)其歸為0時(shí),便自動(dòng)喚醒所有處于等待的協(xié)程。
// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) }
3.3.3 Wait方法實(shí)現(xiàn)
調(diào)用Wait
方法,等待任務(wù)執(zhí)行完成,增加等待計(jì)數(shù)器Waiter
的值:
func (wg *WaitGroup) Wait() { // statep: 為state1的地址 semap: 為state2的地址 statep, semap := wg.state() for { // 加載state1的值 state := atomic.LoadUint64(statep) // v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù) v := int32(state >> 32) // 沒(méi)有任務(wù)待執(zhí)行,全部都完成了 if v == 0 { return } // 增加waiter計(jì)數(shù)器的值 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 等待被喚醒 runtime_Semacquire(semap) return } } }
3.4 實(shí)現(xiàn)補(bǔ)充
Add
方法,Done
方法以及Wait
方法實(shí)現(xiàn)中,有一些異常場(chǎng)景的驗(yàn)證邏輯被我刪除掉了。當(dāng)出現(xiàn)異常場(chǎng)景時(shí),說(shuō)明用戶使用方式和WaitGroup
的設(shè)計(jì)初衷相違背了,此時(shí)WaitGroup
就會(huì)直接panic。
下面通過(guò)說(shuō)明使用的注意事項(xiàng),來(lái)間接介紹WaitGroup
的異常驗(yàn)證邏輯。
4.使用注意事項(xiàng)
4.1 Add方法和Done方法需要成對(duì)出現(xiàn)
下面是一個(gè)Add方法和Done方法沒(méi)有成對(duì)出現(xiàn)的例子,此時(shí)Add方法調(diào)多了,此時(shí)計(jì)數(shù)器永遠(yuǎn)大于0,Wait 方法會(huì)一直阻塞等待。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() fmt.Println("Goroutine 1") }() go func() { fmt.Println("Goroutine 2") }() wg.Wait() fmt.Println("All goroutines finished") }
在上述代碼中,我們調(diào)用了wg.Add(2)
,但只調(diào)用了一次wg.Done()
。這會(huì)導(dǎo)致counter
的值大于0,因此調(diào)用wg.Wait()
會(huì)被永久阻塞,不會(huì)繼續(xù)向下繼續(xù)執(zhí)行。
還有另外一種情況時(shí)Done方法調(diào)用多了,此時(shí)任務(wù)計(jì)數(shù)器counter
的值為負(fù)數(shù),從WaitGroup
設(shè)計(jì)的語(yǔ)意來(lái)看,就是需要等待完成的任務(wù)數(shù)為負(fù)數(shù),這個(gè)不符合預(yù)期,此時(shí)將會(huì)直接panic
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(1) go func() { fmt.Println("Goroutine 1 started") wg.Done() // 第一次調(diào)用Done方法 wg.Done() // 第二次調(diào)用Done方法 fmt.Println("Goroutine 1 completed") }() wg.Wait() fmt.Println("All goroutines completed") }
在上面的例子中,我們啟動(dòng)了一個(gè)goroutine,第一次調(diào)用Add
方法,counter的值變?yōu)?,在第14行調(diào)用Done
,此時(shí)計(jì)數(shù)器的值變?yōu)?,此時(shí)等待中的goroutine將會(huì)被喚醒。在第15行又調(diào)用了一次Done
方法,當(dāng)counter減小為0時(shí),再次調(diào)用Done
方法會(huì)導(dǎo)致panic,因?yàn)榇藭r(shí)waitGroup
的計(jì)數(shù)器已經(jīng)為0,再次減少將導(dǎo)致負(fù)數(shù)計(jì)數(shù),這是不被允許的。
所以在調(diào)用Done方法時(shí),需要保證每次調(diào)用都與Add方法的調(diào)用一一對(duì)應(yīng),否則會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤。
4.2 在所有任務(wù)都已經(jīng)添加之后,才調(diào)用Wait方法進(jìn)行等待
WaitGroup
的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作。所以,如果在所有任務(wù)添加之前,便調(diào)用Wait
方法進(jìn)行等待,此時(shí)有可能會(huì)導(dǎo)致等待協(xié)程提前被喚醒,執(zhí)行下一步操作,而尚未添加的任務(wù)則不會(huì)被等待,這違反了WaitGroup的設(shè)計(jì)初衷,也不符合預(yù)期。下面是一個(gè)簡(jiǎn)單的例子:
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup for i := 1; i <= 3; i++ { go func(id int) { wg.Add(1) defer wg.Done() fmt.Printf("Goroutine %d started\n", id) time.Sleep(time.Duration(id) * time.Second) fmt.Printf("Goroutine %d finished\n", id) }(i) } // 不等待所有任務(wù)添加,就開始等待 wg.Wait() fmt.Println("All goroutines finished") time.Sleep(10 * time.Second) }
代碼執(zhí)行結(jié)果如下,等待協(xié)程被提前喚醒,執(zhí)行之后的操作,而子任務(wù)在等待協(xié)程喚醒后才開始執(zhí)行:
All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished
在這個(gè)例子中,我們創(chuàng)建了三個(gè)協(xié)程并打印出它們開始和結(jié)束的消息。但是,我們沒(méi)有在任務(wù)開始前調(diào)用Add
方法添加任務(wù),而是在任務(wù)開始之后再調(diào)用Add
方法添加任務(wù)。
這可能會(huì)導(dǎo)致某些任務(wù)未被加入到WaitGroup
中,等待協(xié)程就調(diào)用了wg.Wait
方法,這樣就會(huì)導(dǎo)致一些任務(wù)未被加入WaitGrou
,從而導(dǎo)致等待協(xié)程不會(huì)等待這些任務(wù)執(zhí)行完成。如果這種情況發(fā)生了,我們會(huì)看到"All goroutines finished"被輸出,但實(shí)際上有一些協(xié)程還沒(méi)有完成。
因此,我們應(yīng)該在所有任務(wù)添加完畢之后再調(diào)用Wait
方法,以保證等待的正確性。
5. WaitGroup常見使用場(chǎng)景
在函數(shù)或方法中使用,如果一個(gè)大任務(wù)可以拆分為多個(gè)獨(dú)立的子任務(wù),此時(shí)會(huì)將其進(jìn)行拆分,并使用多個(gè)協(xié)程來(lái)并發(fā)執(zhí)行這些任務(wù),提高執(zhí)行效率,同時(shí)使用WaitGroup
等待所有子任務(wù)執(zhí)行完成,完成協(xié)程間的同步。
下面來(lái)看go-redis中ClusterClient
結(jié)構(gòu)體中ForEachMaster
方法中對(duì)于WaitGroup
的使用。ForEachMaster
方法通常用于在 Redis 集群中執(zhí)行針對(duì)所有主節(jié)點(diǎn)的某種操作,例如在集群中添加或刪除鍵,或者執(zhí)行一些全局的診斷操作,具體執(zhí)行的操作由傳入?yún)?shù)fn
指定。
這里ForEachMaster
方法會(huì)對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作,這里的實(shí)現(xiàn)是對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作這個(gè)大任務(wù),拆分為多個(gè)獨(dú)立的子任務(wù),每個(gè)子任務(wù)完成對(duì)一個(gè)Master節(jié)點(diǎn)執(zhí)行指定操作,然后每個(gè)子任務(wù)啟動(dòng)一個(gè)協(xié)程去執(zhí)行,主協(xié)程使用WaitGroup
等待所有協(xié)程完成指定子任務(wù),ForEachMaster
也就完成了對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作的任務(wù)。具體實(shí)現(xiàn)如下:
func (c *ClusterClient) ForEachMaster( ctx context.Context, fn func(ctx context.Context, client *Client) error, ) error { // 重新加載集群狀態(tài),以確保狀態(tài)信息是最新的 state, err := c.state.ReloadOrGet(ctx) if err != nil { return err } var wg sync.WaitGroup // 用于協(xié)程間通信 errCh := make(chan error, 1) // 獲取到redis集群中所有的master節(jié)點(diǎn) for _, master := range state.Masters { // 啟動(dòng)一個(gè)協(xié)程來(lái)執(zhí)行該任務(wù) wg.Add(1) go func(node *clusterNode) { // 任務(wù)完成時(shí),調(diào)用Done告知WaitGroup任務(wù)已完成 defer wg.Done() err := fn(ctx, node.Client) if err != nil { select { case errCh <- err: default: } } }(master) } // 主協(xié)程等待所有任務(wù)完成 wg.Wait() return nil }
總結(jié)
本文介紹了 Go 語(yǔ)言中的 WaitGroup 并發(fā)原語(yǔ),它提供了一種簡(jiǎn)單且強(qiáng)大的機(jī)制來(lái)協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行。我們首先學(xué)習(xí)了 WaitGroup 的基本使用方法,包括如何創(chuàng)建 WaitGroup、如何向計(jì)數(shù)器中添加值、如何等待所有 Goroutine 完成以及如何在 Goroutine 中通知 WaitGroup 完成。
接著,我們了解了 WaitGroup 的實(shí)現(xiàn)原理,包括計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn)。了解了實(shí)現(xiàn)原理之后,我們可以更好地理解 WaitGroup 的內(nèi)部機(jī)制以及如何更好地使用它來(lái)實(shí)現(xiàn)我們的需求。
在接下來(lái)的部分中,我們介紹了一些使用 WaitGroup 的注意事項(xiàng),以及常見的使用方式?;诖?,我們完成了對(duì)WaitGroup的介紹,更多關(guān)于go同步協(xié)程WaitGroup的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang 操作 Kafka 如何設(shè)置消息的失效時(shí)間
在使用 Golang 操作 Kafka 時(shí),你可以使用 Sarama 庫(kù)來(lái)設(shè)置消息的失效時(shí)間,這篇文章主要介紹了Golang操作Kafka設(shè)置消息的失效時(shí)間,需要的朋友可以參考下2023-06-06深入理解Go高級(jí)并發(fā)模式編寫更高效可擴(kuò)展的應(yīng)用程序
Go對(duì)并發(fā)提供了強(qiáng)大的原生支持,本文討論Go的高級(jí)并發(fā)模式,理解這些并發(fā)模式,可以幫助我們編寫高效的Go應(yīng)用程序,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-02-02Golang定制化zap日志庫(kù)使用過(guò)程分析
Zap是我個(gè)人比較喜歡的日志庫(kù),是uber開源的,有較好的性能,在項(xiàng)目開發(fā)中,經(jīng)常需要把程序運(yùn)行過(guò)程中各種信息記錄下來(lái),有了詳細(xì)的日志有助于問(wèn)題排查和功能優(yōu)化,但如何選擇和使用性能好功能強(qiáng)大的日志庫(kù),這個(gè)就需要我們從多角度考慮2023-03-03Go語(yǔ)言實(shí)戰(zhàn)之實(shí)現(xiàn)均衡器功能
這篇文章主要為大家詳細(xì)介紹了如何利用Golang?實(shí)現(xiàn)一個(gè)簡(jiǎn)單的流浪均衡器,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-04-04golang?select?機(jī)制和超時(shí)問(wèn)題
golang 中的協(xié)程使用非常方便,但是協(xié)程什么時(shí)候結(jié)束是一個(gè)控制問(wèn)題,可以用 select 配合使用,這篇文章主要介紹了golang?select?機(jī)制和超時(shí)問(wèn)題,需要的朋友可以參考下2022-06-06