詳解Go語(yǔ)言如何實(shí)現(xiàn)一個(gè)最簡(jiǎn)化的協(xié)程池
背景
學(xué)習(xí)完優(yōu)秀的協(xié)程池開(kāi)源項(xiàng)目ants之后,想要對(duì)協(xié)程池做了一個(gè)總結(jié),在ants的基礎(chǔ)上做了簡(jiǎn)化,保留了一個(gè)協(xié)程池的核心功能,旨在幫助協(xié)程池的理解和使用。
為什么要用協(xié)程池
- 降低資源開(kāi)銷(xiāo):協(xié)程池可以重復(fù)使用協(xié)程,減少創(chuàng)建、銷(xiāo)毀協(xié)程的開(kāi)銷(xiāo),從而提高系統(tǒng)性能。
- 提高響應(yīng)速度:接收任務(wù)后可以立即執(zhí)行,無(wú)需等待創(chuàng)建協(xié)程時(shí)間。
- 增強(qiáng)可管理型:可以對(duì)協(xié)程進(jìn)行集中調(diào)度,統(tǒng)一管理,方便調(diào)優(yōu)。
實(shí)現(xiàn)協(xié)程池都需要實(shí)現(xiàn)哪些功能
思考一下,如果如果讓你實(shí)現(xiàn)一個(gè)協(xié)程池,有哪些必要的核心功能呢?
- 協(xié)程池需要提供一個(gè)對(duì)外接收任務(wù)提交的接口
- 如何創(chuàng)建一個(gè)協(xié)程,創(chuàng)建好的協(xié)程存放在哪里?
- 協(xié)程池中的協(xié)程在沒(méi)有task時(shí),是如何存在的?
- 如何為使用者提交的task分配一個(gè)協(xié)程?
- 協(xié)程執(zhí)行完task后,如何返還到協(xié)程池中?
協(xié)程池內(nèi)部需要維護(hù)一個(gè)裝有一個(gè)個(gè)協(xié)程的隊(duì)列,用于存放管理的協(xié)程,為了拓展功能方便,我們把每個(gè)協(xié)程都封裝一個(gè)worker,這個(gè)worker隊(duì)列需要具備幾個(gè)核心功能:
協(xié)程池整體的架構(gòu)
帶著這些如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單協(xié)程池必要核心功能的問(wèn)題,我們來(lái)看下,一個(gè)協(xié)程池的核心流程,用圖來(lái)表示就是:
從圖上可以看出協(xié)程池主要包括3個(gè)組件:
協(xié)程池(gorutine-pool) :它是整個(gè)協(xié)程池的入口和主體,內(nèi)部持有一個(gè)協(xié)程隊(duì)列,用于存放、調(diào)度worker。
協(xié)程隊(duì)列(worker-queue) :持有協(xié)程池維護(hù)的所有的協(xié)程,為了拓展方便,將協(xié)程封裝成worker,一個(gè)worker對(duì)應(yīng)一個(gè)協(xié)程。
worker:每個(gè)worker對(duì)應(yīng)一個(gè)協(xié)程,它能夠運(yùn)行一個(gè)任務(wù),通常是個(gè)函數(shù),是真正干活的地方。
主要流程:
當(dāng)一個(gè)使用者一個(gè)task提交后,協(xié)程池從workerQueue中獲取一個(gè)可用的worker負(fù)責(zé)執(zhí)行此task,如果worker隊(duì)列中沒(méi)有可用的worker,并且worker的數(shù)量還沒(méi)有達(dá)到隊(duì)列設(shè)置最大數(shù)量,可以新建一個(gè)worker補(bǔ)充到隊(duì)列中,worker執(zhí)行完任務(wù)后,還需要能夠?qū)⒆约悍颠€到workder隊(duì)列中,才能達(dá)到復(fù)用的目的
三個(gè)組件的實(shí)現(xiàn)
分別來(lái)看下三個(gè)組件是如何實(shí)現(xiàn)協(xié)程池的
gorutine-pool實(shí)現(xiàn)
// Pool pool負(fù)責(zé)worker的調(diào)度 type Pool struct { //pool最大最大線程數(shù)量 cap int32 //當(dāng)前運(yùn)行的worker數(shù)量 running int32 //worker隊(duì)列 workers workerQueue //控制并發(fā)訪問(wèn)臨界資源 lock sync.Mutex }
pool結(jié)構(gòu)體中cap
和running
兩個(gè)屬性用來(lái)管理協(xié)程池的數(shù)量,workers
存放創(chuàng)建的協(xié)程,lock
控制并發(fā)訪問(wèn)臨界資源。
從上述的架構(gòu)圖中可以看出,pool需要對(duì)外提供接收task的方法,以及兩個(gè)內(nèi)部從workerQueue獲取worker、返還worker到workerQueue的方法。
Submit
// Submit 提交任務(wù) func (p *Pool) Submit(f func()) error { if worker := p.retrieveWorker(); worker != nil { worker.inputFunc(f) return nil } return ErrPoolOverload }
Submit()是給調(diào)用者提交task任務(wù)的方法,它的入?yún)⑹且粋€(gè)函數(shù),這個(gè)函數(shù)就是協(xié)程池使用者想讓協(xié)程池執(zhí)行的內(nèi)容。協(xié)程池pool會(huì)嘗試為這個(gè)task分配一個(gè)worker來(lái)處理task,但是,如果協(xié)程池的worker都被占用,并且有數(shù)量限制無(wú)法再創(chuàng)建新的worker,pool也無(wú)能為力,這里會(huì)返回給調(diào)用者一個(gè)"過(guò)載"的異常,當(dāng)然這里可以拓展其它的拒絕策略。
retrieveWorker()
//從workerQueue中獲取一個(gè)worker func (p *Pool) retrieveWorker() worker { p.lock.Lock() w := p.workers.detach() if w != nil { p.lock.Unlock() } else { //沒(méi)有拿到可用的worker //如果容量還沒(méi)耗盡,再創(chuàng)建一個(gè)worker if p.running < p.cap { w = &goWorker{ pool: p, task: make(chan func()), } w.run() } p.lock.Unlock() } return w }
retrieveWorker()
是從pool中的workerQueue中獲取一個(gè)worker具體實(shí)現(xiàn)。獲取worker是一個(gè)并發(fā)操作,這里使用鎖控制并發(fā)。調(diào)用workers.detach()
從workerQueue中獲取worker,如果沒(méi)有拿到可用的worker,這時(shí)候還需要看看目前pool中現(xiàn)存活的worker數(shù)量是否已經(jīng)達(dá)到上限,未達(dá)上限,則可以創(chuàng)建新的worker加入到pool中。
revertWorker()
// 執(zhí)行完任務(wù)的worker返還到workerQueue func (p *Pool) revertWorker(w *goWorker) bool { defer func() { p.lock.Unlock() }() p.lock.Lock() //判斷容量,如果協(xié)程存活數(shù)量大于容量,銷(xiāo)毀 if p.running < p.cap { p.workers.insert(w) return true } return false }
返還當(dāng)前worker到pool是在worker執(zhí)行完task之后,返回時(shí)需要判斷當(dāng)前存活的worker數(shù)量是否到達(dá)pool的上限,已達(dá)上限則返回失敗。另外,由于running
屬性存在并發(fā)訪問(wèn)的問(wèn)題,返還操作也需要加鎖。
workerQueue實(shí)現(xiàn)
為了提高拓展性,我們將workerQueue抽象成接口,也就是說(shuō)可以有多種協(xié)程隊(duì)列實(shí)現(xiàn)來(lái)適配更多的使用場(chǎng)景
workerQueue接口
// 定義一個(gè)協(xié)程隊(duì)列的接口 type workerQueue interface { //隊(duì)列長(zhǎng)度 len() int //插入worker insert(w worker) //分派一個(gè)worker detach() worker }
插入insert()
和分配detach()
是實(shí)現(xiàn)協(xié)程隊(duì)列的核心方法。這里我們以底層基于“棧”思想的結(jié)構(gòu)作為workerQueue的默認(rèn)實(shí)現(xiàn),也即后進(jìn)入隊(duì)列的協(xié)程優(yōu)先被分配使用。
// 底層構(gòu)造一個(gè)棧類型的隊(duì)列來(lái)管理多個(gè)worker type workerStack struct { items []worker }
我們使用數(shù)組來(lái)存放worker,用數(shù)組來(lái)模擬先進(jìn)后出的協(xié)程隊(duì)列
// 新創(chuàng)建一個(gè)worker func (ws *workerStack) insert(w worker) { ws.items = append(ws.items, w) }
workerStack的insert()
實(shí)現(xiàn)很簡(jiǎn)單,直接在數(shù)組尾巴追加一個(gè)worker
// 分配一個(gè)worker func (ws *workerStack) detach() worker { l := ws.len() if l == 0 { return nil } w := ws.items[l-1] ws.items[l-1] = nil ws.items = ws.items[:l-1] return w }
detach()
負(fù)責(zé)從數(shù)組中獲取一個(gè)可用的空閑worker,每次獲取時(shí)取用的是數(shù)組的最后一個(gè)元素,也就是協(xié)程隊(duì)列末尾的worker優(yōu)先被分配出去了。
注意這里將下標(biāo)l-1
位置的對(duì)象置為nil,可以防止內(nèi)存泄露
worker實(shí)現(xiàn)
type worker interface { workId() string run() //接收函數(shù)執(zhí)行任務(wù) inputFunc(func()) } type goWorker struct { workerId string //需要持有自己所屬的 Pool 因?yàn)橐退M(jìn)行交互 pool *Pool task chan func() }
這里同樣了為了拓展,將worker抽象成了一個(gè)接口,goWorker是它的一個(gè)默認(rèn)實(shí)現(xiàn),worker最核心的工作就是等待著task到來(lái),接到task后執(zhí)行,task具體來(lái)說(shuō)就是一個(gè)函數(shù)。這里其實(shí)是一個(gè)很簡(jiǎn)單的生產(chǎn)者/消費(fèi)者模型,我們想到使用管道來(lái)實(shí)現(xiàn)生產(chǎn)消費(fèi)模型,定義一個(gè)函數(shù)類型的管道,你或許要問(wèn)為什么使用管道,還有別的方式可以實(shí)現(xiàn)這個(gè)功能么?不急,我們來(lái)看看worker要實(shí)現(xiàn)什么功能:
- 創(chuàng)建出來(lái)的worker,未必馬上就有task分配過(guò)來(lái)執(zhí)行,它肯定要把自己“阻塞”住,隨時(shí)等待task到來(lái)
- task傳遞過(guò)來(lái)終歸需要一個(gè)介質(zhì)
鑒于這個(gè)場(chǎng)景,使用管道式非常合適的,管道內(nèi)沒(méi)有元素時(shí),worker阻塞等待,當(dāng)管道內(nèi)有task進(jìn)來(lái)時(shí),worker被喚醒,從管道中取出task進(jìn)行處理。當(dāng)然,我們使用一個(gè)死循環(huán),不斷自旋的從一個(gè)容器中讀取task,也能達(dá)到同樣的目的,但卻沒(méi)有使用管道合適、優(yōu)雅!
func (g *goWorker) run() { go func() { defer func() { atomic.AddInt32(&g.pool.running, -1) }() //running+1 atomic.AddInt32(&g.pool.running, 1) for { select { case f := <-g.task: if f == nil { return } //執(zhí)行提交的任務(wù) f() } //worker返還到queue中 if ok := g.pool.revertWorker(g); !ok { return } } }() } func (g *goWorker) inputFunc(f func()) { g.task <- f }
run()
是worker的核心方法,worker通常被創(chuàng)建后就會(huì)調(diào)用run()
,一起來(lái)看下主要做了那些內(nèi)容:
- 使用select 監(jiān)聽(tīng)task管道,阻塞當(dāng)前協(xié)程,回到管道內(nèi)有task進(jìn)入
- 監(jiān)聽(tīng)到task,則會(huì)執(zhí)行task的內(nèi)容
- task執(zhí)行完畢后,會(huì)調(diào)用
poo.revertWorker()
將當(dāng)前worker返還到協(xié)程池中待用,當(dāng)然這里未必會(huì)返成功。
很容易忽略的一個(gè)點(diǎn)是:為什么要新啟動(dòng)一個(gè)協(xié)程來(lái)完成以上工作?因?yàn)閣orker中沒(méi)有task時(shí),要阻塞等待任務(wù),如果不是在一個(gè)新的協(xié)程中,整個(gè)程序都阻塞在第一個(gè)worker的run()
中,所謂協(xié)程池,就是指每個(gè)worker對(duì)應(yīng)的這個(gè)協(xié)程。
另外,pool維護(hù)一個(gè)running
屬性來(lái)表示存活的worker數(shù)量,當(dāng)調(diào)用run()
方法后,表示worker是可用的了,running
值+1。如果worker返回協(xié)程池失敗,run()
執(zhí)行完畢,worker對(duì)應(yīng)的協(xié)程被系統(tǒng)銷(xiāo)毀,表示當(dāng)前worker生命周期結(jié)束了,對(duì)應(yīng)的寫(xiě)成會(huì)將running
值-1。由于多個(gè)worker并發(fā)修改running
值,使用了atomic.AddInt32
控制臨界資源的修改。
至此,實(shí)現(xiàn)一個(gè)簡(jiǎn)單協(xié)程池的核心的功能都已經(jīng)完成,和ants相比,這是一個(gè)相當(dāng)精簡(jiǎn)的協(xié)程池,旨在幫助我們加深對(duì)協(xié)程池、線程池這類組件模型的理解,離真正可用有段距離。ants中更完備的功能,比如:ants實(shí)現(xiàn)了定期清理空閑worker,以及對(duì)鎖的優(yōu)化、worker的池化等等,感興趣的可以看看ants,短小精悍的開(kāi)源項(xiàng)目!
到此這篇關(guān)于詳解Go語(yǔ)言如何實(shí)現(xiàn)一個(gè)最簡(jiǎn)化的協(xié)程池的文章就介紹到這了,更多相關(guān)Go協(xié)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- golang協(xié)程池設(shè)計(jì)詳解
- golang協(xié)程池模擬實(shí)現(xiàn)群發(fā)郵件功能
- GO實(shí)現(xiàn)協(xié)程池管理的方法
- Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn)
- Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例
- Golang協(xié)程池的實(shí)現(xiàn)與應(yīng)用
- Go高級(jí)特性探究之協(xié)程池詳解
- Golang線程池與協(xié)程池的使用
- golang實(shí)現(xiàn)協(xié)程池的方法示例
- go協(xié)程池實(shí)現(xiàn)原理小結(jié)
相關(guān)文章
Golang打印復(fù)雜結(jié)構(gòu)體兩種方法詳解
在?Golang?語(yǔ)言開(kāi)發(fā)中,我們經(jīng)常會(huì)使用結(jié)構(gòu)體類型,如果我們使用的結(jié)構(gòu)體類型的變量包含指針類型的字段,我們?cè)谟涗浫罩镜臅r(shí)候,指針類型的字段的值是指針地址,將會(huì)給我們?debug?代碼造成不便2022-10-10gin自定義中間件解決requestBody不可重讀(請(qǐng)求體取值)
這篇文章主要介紹了gin自定義中間件解決requestBody不可重讀,確保控制器能夠獲取請(qǐng)求體值,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10golang使用map支持高并發(fā)的方法(1000萬(wàn)次操作14ms)
這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬(wàn)次操作14ms),本文給大家詳細(xì)講解,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11使用Gorm操作Oracle數(shù)據(jù)庫(kù)踩坑記錄
gorm是目前用得最多的go語(yǔ)言orm庫(kù),本文主要介紹了使用Gorm操作Oracle數(shù)據(jù)庫(kù)踩坑記錄,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06Golang定制化zap日志庫(kù)使用過(guò)程分析
Zap是我個(gè)人比較喜歡的日志庫(kù),是uber開(kāi)源的,有較好的性能,在項(xiàng)目開(kāi)發(fā)中,經(jīng)常需要把程序運(yùn)行過(guò)程中各種信息記錄下來(lái),有了詳細(xì)的日志有助于問(wèn)題排查和功能優(yōu)化,但如何選擇和使用性能好功能強(qiáng)大的日志庫(kù),這個(gè)就需要我們從多角度考慮2023-03-03