Golang并發(fā)控制之errgroup使用詳解
errgroup
是 Go 官方庫 x 中提供的一個非常實用的工具,用于并發(fā)執(zhí)行多個 goroutine,并且方便的處理錯誤。
我們知道,Go 標準庫中有個 sync.WaitGroup
可以用來并發(fā)執(zhí)行多個 goroutine,errgroup
就是在其基礎上實現(xiàn)了 errgroup.Group
。不過,errgroup.Group
和 sync.WaitGroup
在功能上是有區(qū)別的,盡管它們都用于管理 goroutine 的同步。
errgroup 優(yōu)勢
與 sync.WaitGroup
相比,以下是設計 errgroup.Group
的原因和優(yōu)勢:
錯誤處理:
sync.WaitGroup
只負責等待 goroutine 完成,不處理 goroutine 的返回值或錯誤。errgroup.Group
雖然目前也不能直接處理 goroutine 的返回值,但在 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在Wait
方法中返回第一個非nil
的錯誤。
上下文取消:
errgroup
可以與 context.Context
配合使用,支持在某個 goroutine 出現(xiàn)錯誤時自動取消其他 goroutine,這樣可以更好地控制資源,避免不必要的工作。
簡化并發(fā)編程:
使用 errgroup
可以減少錯誤處理的樣板代碼,開發(fā)者不需要手動管理錯誤狀態(tài)和同步邏輯,使得并發(fā)編程更簡單、更易于維護。
限制并發(fā)數(shù)量:
errgroup
提供了便捷的接口來限制并發(fā) goroutine 的數(shù)量,避免過載,而 sync.WaitGroup
沒有這樣的功能。
以上,errgroup
為處理并發(fā)任務提供了更強大的錯誤管理和控制機制,因此在許多并發(fā)場景下是更優(yōu)的選擇。
隨著本文接下來的深入講解,你就能深刻體會到上面所說的優(yōu)勢了。
sync.WaitGroup 使用示例
在介紹 errgroup.Group
前,我們還是先來一起回顧下 sync.WaitGroup
的用法。
示例如下:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } var err error var wg sync.WaitGroup // 零值可用,不必顯式初始化 for _, url := range urls { wg.Add(1) // 增加 WaitGroup 計數(shù)器 // 啟動一個 goroutine 來獲取 URL go func() { defer wg.Done() // 當 goroutine 完成時遞減 WaitGroup 計數(shù)器 resp, e := http.Get(url) if e != nil { // 發(fā)生錯誤返回,并記錄該錯誤 err = e return } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) }() } // 等待所有 goroutine 執(zhí)行完成 wg.Wait() if err != nil { // err 會記錄最后一個錯誤 fmt.Printf("Error: %s\n", err) } }
示例中,我們使用 sync.WaitGroup
來啟動 3 個 goroutine 并發(fā)訪問 3 個不同的 URL
,并在成功時打印響應狀態(tài)碼,或失敗時記錄錯誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們獲取了兩個成功的響應,并打印了一條錯誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup
最典型的慣用法:
var wg sync.WaitGroup for ... { wg.Add(1) go func() { defer wg.Done() // do something }() } wg.Wait()
errgroup.Group 使用示例
其實 errgroup.Group
的使用套路與 sync.WaitGroup
非常類似。
基本使用
errgroup
基本使用套路如下:
- 導入
errgroup
包。 - 創(chuàng)建一個
errgroup.Group
實例。 - 使用
Group.Go
方法啟動多個并發(fā)任務。 - 使用
Group.Wait
方法等待所有 goroutine 完成或有一個返回錯誤。
將前文中的 sync.WaitGroup
程序示例使用 errgroup.Group
重寫為如下示例:
package main import ( "fmt" "net/http" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } // 使用 errgroup 創(chuàng)建一個新的 goroutine 組 var g errgroup.Group // 零值可用,不必顯式初始化 for _, url := range urls { // 使用 errgroup 啟動一個 goroutine 來獲取 URL g.Go(func() error { resp, err := http.Get(url) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個錯誤(如果有) if err := g.Wait(); err != nil { fmt.Printf("Error: %s\n", err) } }
可以發(fā)現(xiàn),這段程序與 sync.WaitGroup
示例很像,根據(jù)代碼中的注釋,很容易看懂。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
輸出結果也沒什么變化。
上下文取消
errgroup
提供了 errgroup.WithContext
可以附加取消功能,在任意一個 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在 Wait
方法中返回第一個非 nil
的錯誤。
示例如下:
package main import ( "context" "fmt" "net/http" "sync" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } // 創(chuàng)建一個帶有 context 的 errgroup // 任何一個 goroutine 返回非 nil 的錯誤,或 Wait() 等待所有 goroutine 完成后,context 都會被取消 g, ctx := errgroup.WithContext(context.Background()) // 創(chuàng)建一個 map 來保存結果 var result sync.Map for _, url := range urls { // 使用 errgroup 啟動一個 goroutine 來獲取 URL g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } // 發(fā)起請求 resp, err := http.DefaultClient.Do(req) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } defer resp.Body.Close() // 保存每個 URL 的響應狀態(tài)碼 result.Store(url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個錯誤(如果有) if err := g.Wait(); err != nil { fmt.Println("Error: ", err) } // 所有 goroutine 都執(zhí)行完成,遍歷并打印成功的結果 result.Range(func(key, value any) bool { fmt.Printf("fetch url %s status %s\n", key, value) return true }) }
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/withcontext/main.go
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK
由測試結果來看,對于 [http://www.google.com/](http://www.google.com/)
的請求可以接收到成功響應,由于對 [http://www.somestupidname.com/](http://www.somestupidname.com/)
請求報錯,程序來不及等待 [http://www.golang.org/](http://www.golang.org/)
響應,就被取消了。
其實我們大致可以猜測到,取消功能應該是通過 context.cancelCtx
來實現(xiàn)的,我們暫且不必深究,稍后探索源碼就能驗證我們的猜想了。
限制并發(fā)數(shù)量
errgroup
提供了 errgroup.SetLimit
可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個 errgroup.Group var g errgroup.Group // 設置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動 10 個 goroutine for i := 1; i <= 10; i++ { g.Go(func() error { // 打印正在運行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬任務耗時 fmt.Printf("Goroutine %d is done\n", i) return nil }) } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
可以限制最大并發(fā)為 3 個 goroutine。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
根據(jù)輸出可以發(fā)現(xiàn),雖然我們通過 for
循環(huán)啟動了 10 個 goroutine,但程序執(zhí)行時最多只允許同時啟動 3 個 goroutine,當這 3 個 goroutine 中有某個執(zhí)行完成并退出,才會有新的 goroutine 被啟動。
嘗試啟動
errgroup
還提供了 errgroup.TryGo
可以嘗試啟動一個任務,它返回一個 bool
值,標識任務是否啟動成功,true
表示成功,false
表示失敗。
errgroup.TryGo
需要搭配 errgroup.SetLimit
一同使用,因為如果不限制并發(fā)數(shù)量,那么 errgroup.TryGo
始終返回 true
,當達到最大并發(fā)數(shù)量限制時,errgroup.TryGo
返回 false
。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個 errgroup.Group var g errgroup.Group // 設置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動 10 個 goroutine for i := 1; i <= 10; i++ { if g.TryGo(func() error { // 打印正在運行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬工作 fmt.Printf("Goroutine %d is done\n", i) return nil }) { // 如果成功啟動,打印提示 fmt.Printf("Goroutine %d started successfully\n", i) } else { // 如果達到并發(fā)限制,打印提示 fmt.Printf("Goroutine %d could not start (limit reached)\n", i) } } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
限制最大并發(fā)為 3 個 goroutine,調用 g.TryGo
如果啟動任務成功,打印 Goroutine {i} started successfully
提示信息;啟動任務失敗,則打印 Goroutine {i} could not start (limit reached)
提示信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
因為限制最大并發(fā)數(shù)量為 3,所以前面 3 個 goroutine 啟動成功,并且正常執(zhí)行完成,其他幾個 goroutine 全部執(zhí)行失敗。
以上就是 errgroup
的全部用法了,更多使用場景你可以在實踐中去嘗試和感悟。
源碼解讀
接下來,我們一起閱讀下 errgroup
源碼,以此來加深對 errgroup
的理解。
errgroup
源碼非常少,僅有 3 個文件。這 3 個文件源碼內容分別如下:
主邏輯代碼:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
// Copyright 2016 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package errgroup provides synchronization, error propagation, and Context // cancelation for groups of goroutines working on subtasks of a common task. // // [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks // returning errors. package errgroup import ( "context" "fmt" "sync" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func(error) wg sync.WaitGroup sem chan token errOnce sync.Once err error } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } return g.err } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group's context, if the // group was created by calling WithContext. The error will be returned by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
為 Go 1.20 及更高版本提供的 withCancelCause
函數(shù)實現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
為低于 Go 1.20 版本提供的 withCancelCause
函數(shù)實現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
可以看到,errgroup
全部源碼加起來也不到 100 行,可謂短小精悍。
現(xiàn)在我們來分析下 errgroup
源碼。
根據(jù)包注釋我們可以知道,errgroup
包提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。errgroup.Group
與 sync.WaitGroup
相關,增加了處理任務返回錯誤的能力。
為了提供以上功能,首先 errgroup
定義了 token
和 Group
兩個結構體:
// 定義一個空結構體類型 token,會作為信號進行傳遞,用于控制并發(fā)數(shù) type token struct{} // Group 是一組協(xié)程的集合,這些協(xié)程處理同一整體任務的子任務 // // 零值 Group 是有效的,對活動協(xié)程的數(shù)量沒有限制,并且不會在出錯時取消 type Group struct { cancel func(error) // 取消函數(shù),就是 context.CancelCauseFunc 類型 wg sync.WaitGroup // 內部使用了 sync.WaitGroup sem chan token // 信號 channel,可以控制協(xié)程并發(fā)數(shù)量 errOnce sync.Once // 確保錯誤僅處理一次 err error // 記錄子協(xié)程集中返回的第一個錯誤 }
token
被定義為空結構體,用來傳遞信號,這也是 Go 中空結構體的慣用法。
NOTE:
你可以在我的另一篇文章《Go 中空結構體慣用法,我?guī)湍憧偨Y全了!》中查看空結構體的更多用法。
Group
是 errgroup
包提供的唯一公開結構體,其關聯(lián)的方法承載了所有功能。
cancel
屬性為一個函數(shù),上下文取消時會被調用,其實就是 context.CancelCauseFunc
類型,調用 errgroup.WithContext
時被賦值。
wg
屬性即為 sync.WaitGroup
,承擔并發(fā)控制的主邏輯,errgroup.Go
和 errgroup.TryGo
內部并發(fā)控制邏輯都會代理給 sync.WaitGroup
。
sem
屬性是 token
類型的 channel
,用于限制并發(fā)數(shù)量,調用 errgroup.SetLimit
是被賦值。
err
會記錄所有 goroutine 中出現(xiàn)的第一個錯誤,由errOnce
確保錯誤錯誤僅處理一次,所以后面再出現(xiàn)更多的錯誤都會被忽略。
接下來我們先看 errgroup.SetLimit
方法定義:
// SetLimit 限制該 Group 中活動的協(xié)程數(shù)量最多為 n,負值表示沒有限制 // // 任何后續(xù)對 Go 方法的調用都將阻塞,直到可以在不超過限額的情況下添加活動協(xié)程 // // 在 Group 中存在任何活動的協(xié)程時,限制不得修改 func (g *Group) SetLimit(n int) { // 傳進來的 n 就是 channel 長度,以此來限制協(xié)程的并發(fā)數(shù)量 if n < 0 { // 這里檢查如果小于 0 則不限制協(xié)程并發(fā)數(shù)量。此外,也不要將其設置為 0,會產生死鎖 g.sem = nil return } if len(g.sem) != 0 { // 如果存在活動的協(xié)程,調用此方法將產生 panic panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
errgroup.SetLimit
方法可以限制并發(fā)屬性,其內部邏輯很簡單,不過要注意在調用 errgroup.Go
或 errgroup.TryGo
方法前調用 errgroup.SetLimit
,以防程序出現(xiàn) panic
。
然后看下主邏輯 errgroup.Go
方法實現(xiàn):
// Go 會在新的協(xié)程中調用給定的函數(shù) // 它會阻塞,直到可以在不超過配置的活躍協(xié)程數(shù)量限制的情況下添加新的協(xié)程 // // 首次返回非 nil 錯誤的調用會取消該 Group 的上下文(context),如果該 context 是通過調用 WithContext 創(chuàng)建的,該錯誤將由 Wait 返回 func (g *Group) Go(f func() error) { if g.sem != nil { // 這個是限制并發(fā)數(shù)的信號通道 g.sem <- token{} // 如果超過了配置的活躍協(xié)程數(shù)量限制,向 channel 發(fā)送 token 會阻塞 } g.wg.Add(1) // 轉發(fā)給 sync.WaitGroup.Add(1),將活動協(xié)程數(shù)加一 go func() { defer g.done() // 當一個協(xié)程完成時,調用此方法,內部會將調用轉發(fā)給 sync.WaitGroup.Done() if err := f(); err != nil { // f() 就是我們要執(zhí)行的任務 g.errOnce.Do(func() { // 僅執(zhí)行一次,即只處理一次錯誤,所以會記錄第一個非 nil 的錯誤,與協(xié)程啟動順序無關 g.err = err // 記錄錯誤 if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause g.cancel(g.err) } }) } }() }
首先會檢測是否使用 errgroup.SetLimit
方法設置了并發(fā)限制,如果有限制,則使用 channel
來控制并發(fā)數(shù)量。
否則執(zhí)行主邏輯,其實就是 sync.WaitGroup
的套路代碼。
在 defer
中調用了 g.done()
,done
方法定義如下:
// 當一個協(xié)程完成時,調用此方法 func (g *Group) done() { // 如果設置了最大并發(fā)數(shù),則 sem 不為 nil,從 channel 中消費一個 token,表示一個協(xié)程已完成 if g.sem != nil { <-g.sem } g.wg.Done() // 轉發(fā)給 sync.WaitGroup.Done(),將活動協(xié)程數(shù)減一 }
另外,如果某個任務返回了錯誤,則通過 errOnce
確保錯誤只被處理一次,處理方式就是先記錄錯誤,然后調用 cancel
方法。
cancel
實際上是在 errgroup.WithContext
方法中賦值的:
// WithContext 返回一個新的 Group 和一個從 ctx 派生的關聯(lián) Context // // 派生的 Context 會在傳遞給 Go 的函數(shù)首次返回非 nil 錯誤或 Wait 首次返回時被取消,以先發(fā)生者為準。 func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx }
這里的 withCancelCause
有兩種實現(xiàn)。
如果 Go 版本大于等于 1.20,提供的 withCancelCause
函數(shù)實現(xiàn)如下:
// 構建約束標識了這個文件是 Go 1.20 版本被加入的 //go:build go1.20 package errgroup import "context" // 代理到 context.WithCancelCause func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
如果 Go 版本小于 1.20,提供的 withCancelCause
函數(shù)實現(xiàn)如下:
//go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
因為 context.WithCancelCause
方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在這個 Commit: 93782cc 中看到 withCancelCause
函數(shù)變更記錄。
調用 errgroup.Go
方法啟動任務后,我們會調用 errgroup.Wait
等待所有任務完成,其實現(xiàn)如下:
// Wait 會阻塞,直到來自 Go 方法的所有函數(shù)調用返回,然后返回它們中的第一個非 nil 錯誤(如果有的話) func (g *Group) Wait() error { g.wg.Wait() // 轉發(fā)給 sync.WaitGroup.Wait(),等待所有協(xié)程執(zhí)行完成 if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause g.cancel(g.err) } return g.err // 返回錯誤 }
所以,最終 errgroup.Wait
返回的錯誤其實就是 errgroup.Go
方法中記錄的第一個錯誤。
現(xiàn)在,我們還剩下最后一個方法 errgroup.TryGo
的源碼沒有分析,我把源碼貼在下面,并寫上了詳細的注釋:
// TryGo 僅在 Group 中活動的協(xié)程數(shù)量低于限額時,才在新的協(xié)程中調用給定的函數(shù) // // 返回值標識協(xié)程是否啟動 func (g *Group) TryGo(f func() error) bool { if g.sem != nil { // 如果設置了最大并發(fā)數(shù) select { case g.sem <- token{}: // 可以向 channel 寫入 token,說明沒有達到限額,可以啟動協(xié)程 // Note: this allows barging iff channels in general allow barging. default: // 如果超過了配置的活躍協(xié)程數(shù)量限制,會走到這個 case return false } } // 接下來的代碼與 Go 中的邏輯相同 g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true }
主邏輯與 errgroup.Go
方法一樣,不同的是 errgroup.Go
方法如果達到并發(fā)限額會阻塞,而 errgroup.TryGo
方法在達到并發(fā)限額時直接返回 false
。
其實 <font style="color:rgb(31, 35, 40);">errgroup.TryGo</font>
和 <font style="color:rgb(31, 35, 40);">errgroup.SetLimit</font>
兩個方法是后添加的功能,你可以在 issues/27837 中看到討論記錄。
至此,errgroup
源碼就都解讀完成了。
總結
errgroup
是官方為我們提供的擴展庫,在 sync.WaitGroup
基礎上,增加了處理任務返回錯誤的能力。提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。
errgroup.WithContext
方法可以附加取消功能,在任意一個 goroutine 返回錯誤時,立即取消其他正在運行的 goroutine,并在 Wait
方法中返回第一個非 nil
的錯誤。
errgroup.SetLimit
方法可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
errgroup.TryGo
可以嘗試啟動一個任務,返回值標識啟動成功或失敗。
errgroup
源碼設計精妙,值得借鑒。
以上就是Golang并發(fā)控制之errgroup使用詳解的詳細內容,更多關于Golang errgroup的資料請關注腳本之家其它相關文章!
相關文章
golang利用函數(shù)閉包實現(xiàn)簡單的中間件
中間件設計模式是一種常見的軟件設計模式,它在許多編程語言和框架中被廣泛應用,這篇文章主要為大家介紹一下golang利用函數(shù)閉包實現(xiàn)一個簡單的中間件,感興趣的可以了解下2023-10-10