Golang sync包中errgroup的使用詳解
1、初識 errgroup
WaitGroup 主要用于控制任務組下的并發(fā)子任務。它的具體做法就是,子任務 goroutine 執(zhí)行前通過 Add 方法添加任務數(shù)目,子任務 goroutine 結束時調(diào)用 Done 標記已完成任務數(shù),主任務 goroutine 通過 Wait 方法等待所有的任務完成后才能執(zhí)行后續(xù)邏輯。
package main import ( "net/http" "sync" ) func main() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.baidu.com/", "http://www.bokeyuan12111.com/", } for _, url := range urls { wg.Add(1) go func(url string) { defer wg.Done() resp, err := http.Get(url) if err != nil { return } resp.Body.Close() }(url) } wg.Wait() }
在以上示例代碼中,我們通過三個 goroutine 去并發(fā)的請求 url,直到所有的子任務 goroutine 均完成訪問,主任務 goroutine 下的 wg.Wait 才會停止阻塞。
但在實際的項目代碼中,子任務 goroutine 的執(zhí)行并不總是順風順水,它們也許會產(chǎn)生 error。而 WaitGroup 并沒有告訴我們在子 goroutine 發(fā)生錯誤時,如何將其拋給主任務 groutine。
這個時候可以考慮使用 errgroup
package main import ( "fmt" "net/http" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.baidu.com/", "http://www.bokeyuan12111.com/", } g := new(errgroup.Group) for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err != nil { fmt.Println(err) return err } fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode) return resp.Body.Close() }) } if err := g.Wait(); err != nil { fmt.Println(err) } else { fmt.Println("All success!") } }
結果如下:
get [http://www.baidu.com/] success: [200]
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
Get "http://www.golang.org/": dial tcp 142.251.42.241:80: i/o timeout
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
可以看到,執(zhí)行獲取www.bokeyuan12111.com和www.golang.org兩個 url 的子 groutine 均發(fā)生了錯誤,在主任務 goroutine 中成功捕獲到了第一個錯誤信息。
除了 擁有 WaitGroup 的控制能力 和 錯誤傳播 的功能之外,errgroup 還有最重要的 context 反向傳播機制,我們來看一下它的設計。
2、errgroup 源碼解析
errgroup 的設計非常精練,全部代碼如下
type Group struct { cancel func() wg sync.WaitGroup errOnce sync.Once err error } func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel}, ctx } func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err } func (g *Group) Go(f func() error) { g.wg.Add(1) go func() { defer g.wg.Done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() }
可以看到,errgroup 的實現(xiàn)依靠于結構體 Group,它通過封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在 Go() 方法中新起一個子任務 goroutine,并在 Wait() 方法中通過 sync.WaitGroup 的 Wait 進行阻塞等待。
同時 Group 利用 sync.Once 保證了它有且僅會保留第一個子 goroutine 錯誤。
最后,Group 通過嵌入 context.WithCancel 方法產(chǎn)生的 cancel 函數(shù),能夠在子 goroutine 發(fā)生錯誤時,及時通過調(diào)用 cancle 函數(shù),將 Context 的取消信號及時傳播出去。當然,這一特性需要用戶代碼的配合。
3、errgroup 上下文取消
在 errgroup 的文檔(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文檔的 pipeline( https://blog.golang.org/pipelines) ,實現(xiàn)了一個任務組 goroutine 中上下文取消(Context cancelation)演示的示例。但該 Demo 的前提知識略多,本文這里基于其思想,提供一個易于理解的使用示例。
package main import ( "context" "fmt" "golang.org/x/sync/errgroup" ) func main() { g, ctx := errgroup.WithContext(context.Background()) dataChan := make(chan int, 20) // 數(shù)據(jù)生產(chǎn)端任務子 goroutine g.Go(func() error { defer close(dataChan) for i := 1; ; i++ { if i == 10 { return fmt.Errorf("data 10 is wrong") } dataChan <- i fmt.Println(fmt.Sprintf("sending %d", i)) } }) // 數(shù)據(jù)消費端任務子 goroutine for i := 0; i < 3; i++ { g.Go(func() error { for j := 1; ; j++ { select { case <-ctx.Done(): return ctx.Err() case number := <-dataChan: fmt.Println(fmt.Sprintf("receiving %d", number)) } } }) } // 主任務 goroutine 等待 pipeline 結束數(shù)據(jù)流 err := g.Wait() if err != nil { fmt.Println(err) } fmt.Println("main goroutine done!") }
在以上示例中,我們模擬了一個數(shù)據(jù)傳送管道。在數(shù)據(jù)的生產(chǎn)與消費任務集中,有四個子任務 goroutine:一個生產(chǎn)數(shù)據(jù)的 goroutine,三個消費數(shù)據(jù)的 goroutine。當數(shù)據(jù)生產(chǎn)方存在錯誤數(shù)據(jù)時(數(shù)據(jù)等于 10 ),我們停止數(shù)據(jù)的生產(chǎn)與消費,并將錯誤拋出,回到 main goroutine 的執(zhí)行邏輯中。
可以看到,因為 errgroup 中的 Context cancle 函數(shù)的嵌入,我們在子任務 goroutine 中也能反向控制任務上下文。
程序的某一次運行,輸出結果如下:
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 1
receiving 3
receiving 2
receiving 4
data 10 is wrong
main goroutine done!
4、總結
errgroup 是 Go 官方的并發(fā)原語補充庫,相對于標準庫中提供的原語而言,顯得沒那么核心。這里總結一下 errgroup 的特性。
- 繼承了 WaitGroup 的功能
- 錯誤傳播:能夠返回任務組中發(fā)生的第一個錯誤,但有且僅能返回該錯誤
- context 信號傳播:如果子任務 goroutine 中有循環(huán)邏輯,則可以添加 ctx.Done 邏輯,此時通過 context 的取消信號,提前結束子任務執(zhí)行。
到此這篇關于Golang sync包中errgroup的使用詳解的文章就介紹到這了,更多相關Golang errgroup內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
詳解Go語言strconv與其他基本數(shù)據(jù)類型轉(zhuǎn)換函數(shù)的使用
這篇文章將以 string 類型為中心,通過 strconv 標準庫,介紹其與其他基本數(shù)據(jù)類型相互轉(zhuǎn)換的函數(shù)。文中的示例代碼講解詳細,感興趣的可以了解一下2022-12-12Golang實現(xiàn)協(xié)程超時控制的方式總結
我們知道,go協(xié)程如果不做好處理,很容易造成內(nèi)存泄漏,所以對goroutine做超時控制,才能有效避免這種情況發(fā)生,本文為大家整理了兩個常見的Golang超時控制方法,需要的可以收藏一下2023-05-05GO 使用Webhook 實現(xiàn)github 自動化部署的方法
這篇文章主要介紹了GO 使用Webhook 實現(xiàn)github 自動化部署的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05