Golang?手寫(xiě)一個(gè)簡(jiǎn)單的并發(fā)任務(wù)?manager
前言
今天也是偏實(shí)戰(zhàn)的內(nèi)容,作為一個(gè)并發(fā)復(fù)習(xí)課,很簡(jiǎn)單,我們來(lái)看看怎樣實(shí)現(xiàn)一個(gè)并發(fā)任務(wù) manager。
在微服務(wù)的場(chǎng)景下,我們有很多任務(wù)的執(zhí)行是沒(méi)有明確的先后順序的,比如一個(gè)接口同時(shí)要做到任務(wù) A 和 任務(wù) B,兩個(gè)任務(wù)分別拿到一些數(shù)據(jù),最后組裝裁剪后通過(guò)接口下發(fā)。
此時(shí),A 和 B 兩個(gè)任務(wù)沒(méi)有依賴關(guān)系,如果我們串行來(lái)執(zhí)行,會(huì)拖慢整個(gè)任務(wù)的執(zhí)行節(jié)奏,用并發(fā)的方式來(lái)優(yōu)化是一個(gè)方向。
那怎么實(shí)現(xiàn)呢?
errgroup
一個(gè)常見(jiàn)的想法是用 errgroup,我們之前也介紹過(guò) Golang errgroup 設(shè)計(jì)和原理解析。
今天我們不打算用這種實(shí)現(xiàn),希望用更加基礎(chǔ)的組件來(lái)引發(fā)思考,看看如何活用 sync 包提供的基礎(chǔ)能力。另外一點(diǎn)是 errgroup 也有他的缺陷,如果在啟動(dòng)的協(xié)程中沒(méi)有手動(dòng) recover,那么一旦在我們的任務(wù)中出現(xiàn) panic,整個(gè)程序就 crash 了。
這一點(diǎn)還是很有爭(zhēng)議的,很多開(kāi)發(fā)者認(rèn)為這是符合預(yù)期的,也有一些開(kāi)發(fā)者希望在 New 一個(gè) errgroup 的時(shí)候能夠提供 option 控制是否來(lái) recover。近期還有兩個(gè) issue 在進(jìn)行激烈的討論,目前看沒(méi)有定論。
感興趣的同學(xué)可以看下這兩個(gè) issue:
- x/sync/errgroup: why not recover the fn's err in errgroup #40484
- proposal: x/sync/errgroup: propagate panics and Goexits through Wait #53757
需求拆解
ok,我們來(lái)試著用 sync 包基礎(chǔ)能力來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的并行任務(wù) manager。首先我們分析下需求。
- 一定要能做到并發(fā)執(zhí)行各個(gè)任務(wù),開(kāi)多個(gè)協(xié)程,而不是在一個(gè) main goroutine 里串行執(zhí)行各個(gè)任務(wù);
- 并發(fā)安全,我們當(dāng)然不希望出現(xiàn)數(shù)據(jù)異常,不希望并發(fā)執(zhí)行任務(wù)導(dǎo)致最后程序因?yàn)?runtime error 而掛掉;
- 如果多個(gè)任務(wù)都失敗,只返回一個(gè) error 即可;
- 能夠 recover from panic,不需要開(kāi)發(fā)者使用的時(shí)候再手動(dòng)去寫(xiě) recover 邏輯;
- 性能有保障。
并發(fā)執(zhí)行這一點(diǎn)我們可以借助 sync.WaitGroup 的能力,每次啟動(dòng)一個(gè)goroutine,WaitGroup 就加 1,在 defer 里完成 Done,啟動(dòng)所有 goroutine 之后,等著 Wait 返回結(jié)果即可。常規(guī)的能力復(fù)用。
需要額外處理的地方在于,怎么實(shí)現(xiàn)多個(gè)線程只有一個(gè) error 能賦值,以及 recover 的適配。
實(shí)戰(zhàn)代碼
我們理一下思路,看看代碼怎么寫(xiě)。
Job
首先一定需要定義一個(gè)通用的函數(shù)簽名,使得開(kāi)發(fā)者能夠傳入自己要執(zhí)行的并發(fā)任務(wù)。
type Job interface { Do(ctx context.Context, param interface{}) error Name() string }
JobManager
我們的 job manager 現(xiàn)階段可以簡(jiǎn)單實(shí)現(xiàn),只是一組 Job 的集合:
type JobManager []Job
錯(cuò)誤處理
要達(dá)到只有一個(gè) error 賦值,且不出現(xiàn) race condition,有兩個(gè)方案:
- sync.Mutex 加鎖;
- sync.Once 只執(zhí)行一次。
當(dāng)然,什么時(shí)候我們都可以用一把大鎖解決問(wèn)題,但它的性能不會(huì)很好,能用原子操作解決的盡量還是不要用 Mutex,這里參照 errgroup,我們可以用一個(gè) Once 對(duì)象來(lái)控制只賦值一次。
panic 恢復(fù)可以直接在 defer 里面 recover 即可,需要能帶出來(lái) stack trace,把它變成一個(gè) error 賦值
及時(shí)退出
有時(shí)候我們這個(gè)并發(fā)任務(wù)數(shù)量非常多,可能還沒(méi)創(chuàng)建完 goroutine,某個(gè)先創(chuàng)建的任務(wù)就已經(jīng)掛了,這時(shí)候需要有一個(gè)全局的信號(hào),終止后續(xù)的 goroutine 創(chuàng)建。這一點(diǎn)用原子操作就能實(shí)現(xiàn)。
完整代碼
把上面的分析落地,這樣我們就實(shí)現(xiàn)了一個(gè)帶上了 recover 能力,以及終止能力的的 errgroup。
package main import ( "context" "errors" "fmt" "sync" "sync/atomic" ) type Job interface { Do(ctx context.Context, param interface{}) error Name() string } type JobManager []Job func (mgr JobManager) Execute(ctx context.Context, param interface{}) error { var ( stop int32 = 0 err error wg sync.WaitGroup errOnce sync.Once ) for _, job := range mgr { if atomic.LoadInt32(&stop) > 0 { break } wg.Add(1) go func(j Job) { defer func() { wg.Done() if r := recover(); r != nil { errMsg := fmt.Sprintf("JobManager panic: job: %v, reason: %v", j.Name(), r) nerr := errors.New(errMsg) errOnce.Do(func() { if err == nil { err = nerr } }) atomic.AddInt32(&stop, 1) } }() nerr := j.Do(ctx, param) if nerr != nil { atomic.AddInt32(&stop, 1) errOnce.Do(func() { if err == nil { err = nerr } }) } }(job) } wg.Wait() return err }
使用方法也很簡(jiǎn)單:
var mgr = JobManager{ AJob, BJob, CJob, // 這里的各個(gè) Job 需要實(shí)現(xiàn)一開(kāi)始我們定義的接口 } err := mgr.Execute(ctx, param)
這里我們需要定義統(tǒng)一的 param interface{},建議是一個(gè)接口,各個(gè) Job 執(zhí)行完畢后如果有需要寫(xiě)入的數(shù)據(jù),可以調(diào)用 param 的 Setter 方法寫(xiě)入,最后直接拿 param 來(lái)做后續(xù)邏輯。
小結(jié)
今天我們用 sync.Once,以及 sync.WaitGroup 的能力實(shí)現(xiàn)了一個(gè)簡(jiǎn)易的并發(fā)任務(wù)調(diào)度器,希望能夠幫助大家回顧一下此前介紹的并發(fā)相關(guān)概念和用法。其實(shí)并發(fā)管理這一點(diǎn)很多時(shí)候我們會(huì)存在依賴,這時(shí)候可能需要將多個(gè) job 分層,或者梳理出來(lái)拓?fù)潢P(guān)系來(lái)執(zhí)行,我們今天只是簡(jiǎn)單入門(mén),復(fù)習(xí)一下相關(guān)知識(shí)。
建議大家回顧一下此前對(duì)于 once 以及 errgroup 的源碼解析,相信你會(huì)更能融會(huì)貫通。
到此這篇關(guān)于Golang 手寫(xiě)一個(gè)簡(jiǎn)單的并發(fā)任務(wù) manager的文章就介紹到這了,更多相關(guān)Golang manager內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang?xorm?自定義日志記錄器之使用zap實(shí)現(xiàn)日志輸出、切割日志(最新)
這篇文章主要介紹了golang?xorm?自定義日志記錄器,使用zap實(shí)現(xiàn)日志輸出、切割日志,包括連接postgresql數(shù)據(jù)庫(kù)的操作方法及?zap日志工具?,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化
這篇文章主要介紹了golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12Go語(yǔ)言實(shí)現(xiàn)常見(jiàn)限流算法的示例代碼
計(jì)數(shù)器、滑動(dòng)窗口、漏斗算法、令牌桶算法是我們常見(jiàn)的幾個(gè)限流算法,本文將依次用Go語(yǔ)言實(shí)現(xiàn)這幾個(gè)限流算法,感興趣的可以了解一下2023-05-05Golang并發(fā)繞不開(kāi)的重要組件之Goroutine詳解
Goroutine、Channel、Context、Sync都是Golang并發(fā)編程中的幾個(gè)重要組件,這篇文中主要為大家介紹了Goroutine的相關(guān)知識(shí),需要的可以參考一下2023-06-06golang實(shí)現(xiàn)對(duì)JavaScript代碼混淆
在Go語(yǔ)言中,你可以使用一些工具來(lái)混淆JavaScript代碼,一個(gè)常用的工具是Terser,它可以用于壓縮和混淆JavaScript代碼,你可以通過(guò)Go語(yǔ)言的`os/exec`包來(lái)調(diào)用Terser工具,本文給通過(guò)一個(gè)簡(jiǎn)單的示例給大家介紹一下,感興趣的朋友可以參考下2024-01-01Redis?BloomFilter布隆過(guò)濾器原理與實(shí)現(xiàn)
你在開(kāi)發(fā)或者面試過(guò)程中,有沒(méi)有遇到過(guò)?海量數(shù)據(jù)需要查重,緩存穿透怎么避免等等這樣的問(wèn)題呢?下面這個(gè)東西超棒,好好了解下,面試過(guò)關(guān)斬將,凸顯你的不一樣2022-10-10golang基于websocket通信tcp keepalive研究記錄
這篇文章主要為大家介紹了golang基于websocket通信tcp keepalive研究記錄,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06