golang協(xié)程池設(shè)計(jì)詳解
Why Pool
go自從出生就身帶“高并發(fā)”的標(biāo)簽,其并發(fā)編程就是由groutine實(shí)現(xiàn)的,因其消耗資源低,性能高效,開(kāi)發(fā)成本低的特性而被廣泛應(yīng)用到各種場(chǎng)景,例如服務(wù)端開(kāi)發(fā)中使用的HTTP服務(wù),在golang net/http包中,每一個(gè)被監(jiān)聽(tīng)到的tcp鏈接都是由一個(gè)groutine去完成處理其上下文的,由此使得其擁有極其優(yōu)秀的并發(fā)量吞吐量
for { // 監(jiān)聽(tīng)tcp rw, e := l.Accept() if e != nil { ....... } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return // 啟動(dòng)協(xié)程處理上下文 go c.serve(ctx) }
雖然創(chuàng)建一個(gè)groutine占用的內(nèi)存極小(大約2KB左右,線程通常2M左右),但是在實(shí)際生產(chǎn)環(huán)境無(wú)限制的開(kāi)啟協(xié)程顯然是不科學(xué)的,比如上圖的邏輯,如果來(lái)幾千萬(wàn)個(gè)請(qǐng)求就會(huì)開(kāi)啟幾千萬(wàn)個(gè)groutine,當(dāng)沒(méi)有更多內(nèi)存可用時(shí),go的調(diào)度器就會(huì)阻塞groutine最終導(dǎo)致內(nèi)存溢出乃至嚴(yán)重的崩潰,所以本文將通過(guò)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的協(xié)程池,以及剖析幾個(gè)開(kāi)源的協(xié)程池源碼來(lái)探討一下對(duì)groutine的并發(fā)控制以及多路復(fù)用的設(shè)計(jì)和實(shí)現(xiàn)。
一個(gè)簡(jiǎn)單的協(xié)程池
過(guò)年前做過(guò)一波小需求,是將主播管理系統(tǒng)中信息不完整的主播找出來(lái)然后再到其相對(duì)應(yīng)的直播平臺(tái)爬取完整信息并補(bǔ)全,當(dāng)時(shí)考慮到每一個(gè)主播的數(shù)據(jù)都要訪問(wèn)一次直播平臺(tái)所以就用應(yīng)對(duì)每一個(gè)主播開(kāi)啟一個(gè)groutine去抓取數(shù)據(jù),雖然這個(gè)業(yè)務(wù)量還遠(yuǎn)遠(yuǎn)遠(yuǎn)遠(yuǎn)達(dá)不到能造成groutine性能瓶頸的地步,但是心里總是不舒服,于是放假回來(lái)后將其優(yōu)化成從協(xié)程池中控制groutine數(shù)量再開(kāi)啟爬蟲(chóng)進(jìn)行數(shù)據(jù)抓取。思路其實(shí)非常簡(jiǎn)單,用一個(gè)channel當(dāng)做任務(wù)隊(duì)列,初始化groutine池時(shí)確定好并發(fā)量,然后以設(shè)置好的并發(fā)量開(kāi)啟groutine同時(shí)讀取channel中的任務(wù)并執(zhí)行, 模型如下圖
實(shí)現(xiàn)
type SimplePool struct { wg sync.WaitGroup work chan func() //任務(wù)隊(duì)列 } func NewSimplePoll(workers int) *SimplePool { p := &SimplePool{ wg: sync.WaitGroup{}, work: make(chan func()), } p.wg.Add(workers) //根據(jù)指定的并發(fā)量去讀取管道并執(zhí)行 for i := 0; i < workers; i++ { go func() { defer func() { // 捕獲異常 防止waitGroup阻塞 if err := recover(); err != nil { fmt.Println(err) p.wg.Done() } }() // 從workChannel中取出任務(wù)執(zhí)行 for fn := range p.work { fn() } p.wg.Done() }() } return p } // 添加任務(wù) func (p *SimplePool) Add(fn func()) { p.work <- fn } // 執(zhí)行 func (p *SimplePool) Run() { close(p.work) p.wg.Wait() }
測(cè)試
測(cè)試設(shè)定為在并發(fā)數(shù)量為20的協(xié)程池中并發(fā)抓取一百個(gè)人的信息, 因?yàn)榇a包含較多業(yè)務(wù)邏輯所以sleep 1秒模擬爬蟲(chóng)過(guò)程,理論上執(zhí)行時(shí)間為5秒
func TestSimplePool(t *testing.T) { p := NewSimplePoll(20) for i := 0; i < 100; i++ { p.Add(parseTask(i)) } p.Run() } func parseTask(i int) func() { return func() { // 模擬抓取數(shù)據(jù)的過(guò)程 time.Sleep(time.Second * 1) fmt.Println("finish parse ", i) } }
這樣一來(lái)最簡(jiǎn)單的一個(gè)groutine池就完成了
go-playground/pool
上面的groutine池雖然簡(jiǎn)單,但是對(duì)于每一個(gè)并發(fā)任務(wù)的狀態(tài),pool的狀態(tài)缺少控制,所以又去看了一下go-playground/pool的源碼實(shí)現(xiàn),先從每一個(gè)需要執(zhí)行的任務(wù)入手,該庫(kù)中對(duì)并發(fā)單元做了如下的結(jié)構(gòu)體,可以看到除工作單元的值,錯(cuò)誤,執(zhí)行函數(shù)等,還用了三個(gè)分別表示,取消,取消中,寫 的三個(gè)并發(fā)安全的原子操作值來(lái)標(biāo)識(shí)其運(yùn)行狀態(tài)。
// 需要加入pool 中執(zhí)行的任務(wù) type WorkFunc func(wu WorkUnit) (interface{}, error) // 工作單元 type workUnit struct { value interface{} // 任務(wù)結(jié)果 err error // 任務(wù)的報(bào)錯(cuò) done chan struct{} // 通知任務(wù)完成 fn WorkFunc cancelled atomic.Value // 任務(wù)是否被取消 cancelling atomic.Value // 是否正在取消任務(wù) writing atomic.Value // 任務(wù)是否正在執(zhí)行 }
接下來(lái)看Pool的結(jié)構(gòu)
type limitedPool struct { workers uint // 并發(fā)量 work chan *workUnit // 任務(wù)channel cancel chan struct{} // 用于通知結(jié)束的channel closed bool // 是否關(guān)閉 m sync.RWMutex // 讀寫鎖,主要用來(lái)保證 closed值的并發(fā)安全 }
初始化groutine池, 以及啟動(dòng)設(shè)定好數(shù)量的groutine
// 初始化pool,設(shè)定并發(fā)量 func NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'") } p := &limitedPool{ workers: workers, } p.initialize() return p } func (p *limitedPool) initialize() { p.work = make(chan *workUnit, p.workers*2) p.cancel = make(chan struct{}) p.closed = false for i := 0; i < int(p.workers); i++ { // 初始化并發(fā)單元 p.newWorker(p.work, p.cancel) } } // passing work and cancel channels to newWorker() to avoid any potential race condition // betweeen p.work read & write func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕獲異常,結(jié)束掉異常的工作單元,并將其再次作為新的任務(wù)啟動(dòng) if err := recover(); err != nil { trace := make([]byte, 1<<16) n := runtime.Stack(trace, true) s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))])) iwu := wu iwu.err = &ErrRecovery{s: s} close(iwu.done) // need to fire up new worker to replace this one as this one is exiting p.newWorker(p.work, p.cancel) } }(p) var value interface{} var err error for { select { // workChannel中讀取任務(wù) case wu = <-work: // 防止channel 被關(guān)閉后讀取到零值 if wu == nil { continue } // 先判斷任務(wù)是否被取消 if wu.cancelled.Load() == nil { // 執(zhí)行任務(wù) value, err = wu.fn(wu) wu.writing.Store(struct{}{}) // 任務(wù)執(zhí)行完在寫入結(jié)果時(shí)需要再次檢查工作單元是否被取消,防止產(chǎn)生競(jìng)爭(zhēng)條件 if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil { wu.value, wu.err = value, err close(wu.done) } } // pool是否被停止 case <-cancel: return } } }(p) }
往POOL中添加任務(wù),并檢查pool是否關(guān)閉
func (p *limitedPool) Queue(fn WorkFunc) WorkUnit { w := &workUnit{ done: make(chan struct{}), fn: fn, } go func() { p.m.RLock() if p.closed { w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done) } p.m.RUnlock() return } // 將工作單元寫入workChannel, pool啟動(dòng)后將由上面newWorker函數(shù)中讀取執(zhí)行 p.work <- w p.m.RUnlock() }() return w }
在go-playground/pool包中, limitedPool的批量并發(fā)執(zhí)行還需要借助batch.go來(lái)完成
// batch contains all information for a batch run of WorkUnits type batch struct { pool Pool // 上面的limitedPool實(shí)現(xiàn)了Pool interface m sync.Mutex // 互斥鎖,用來(lái)判斷closed units []WorkUnit // 工作單元的slice, 這個(gè)主要用在不設(shè)并發(fā)限制的場(chǎng)景,這里忽略 results chan WorkUnit // 結(jié)果集,執(zhí)行完后的workUnit會(huì)更新其value,error,可以從結(jié)果集channel中讀取 done chan struct{} // 通知batch是否完成 closed bool wg *sync.WaitGroup }
// go-playground/pool 中有設(shè)置并發(fā)量和不設(shè)并發(fā)量的批量任務(wù),都實(shí)現(xiàn)Pool interface,初始化batch批量任務(wù)時(shí)會(huì)將之前創(chuàng)建好的Pool傳入newBatch func newBatch(p Pool) Batch { return &batch{ pool: p, units: make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times. results: make(chan WorkUnit), done: make(chan struct{}), wg: new(sync.WaitGroup), } } // 往批量任務(wù)中添加workFunc任務(wù) func (b *batch) Queue(fn WorkFunc) { b.m.Lock() if b.closed { b.m.Unlock() return } //往上述的limitPool中添加workFunc wu := b.pool.Queue(fn) b.units = append(b.units, wu) // keeping a reference for cancellation purposes b.wg.Add(1) b.m.Unlock() // 執(zhí)行完后將workUnit寫入結(jié)果集channel go func(b *batch, wu WorkUnit) { wu.Wait() b.results <- wu b.wg.Done() }(b, wu) } // 通知批量任務(wù)不再接受新的workFunc, 如果添加完workFunc不執(zhí)行改方法的話將導(dǎo)致取結(jié)果集時(shí)done channel一直阻塞 func (b *batch) QueueComplete() { b.m.Lock() b.closed = true close(b.done) b.m.Unlock() } // 獲取批量任務(wù)結(jié)果集 func (b *batch) Results() <-chan WorkUnit { go func(b *batch) { <-b.done b.m.Lock() b.wg.Wait() b.m.Unlock() close(b.results) }(b) return b.results }
測(cè)試
func SendMail(int int) pool.WorkFunc { fn := func(wu pool.WorkUnit) (interface{}, error) { // sleep 1s 模擬發(fā)郵件過(guò)程 time.Sleep(time.Second * 1) // 模擬異常任務(wù)需要取消 if int == 17 { wu.Cancel() } if wu.IsCancelled() { return false, nil } fmt.Println("send to", int) return true, nil } return fn } func TestBatchWork(t *testing.T) { // 初始化groutine數(shù)量為20的pool p := pool.NewLimited(20) defer p.Close() batch := p.Batch() // 設(shè)置一個(gè)批量任務(wù)的過(guò)期超時(shí)時(shí)間 t := time.After(10 * time.Second) go func() { for i := 0; i < 100; i++ { batch.Queue(SendMail(i)) } batch.QueueComplete() }() // 因?yàn)?batch.Results 中要close results channel 所以不能將其放在LOOP中執(zhí)行 r := batch.Results() LOOP: for { select { case <-t: // 登臺(tái)超時(shí)通知 fmt.Println("recived timeout") break LOOP case email, ok := <-r: // 讀取結(jié)果集 if ok { if err := email.Error(); err != nil { fmt.Println("err", err.Error()) } fmt.Println(email.Value()) } else { fmt.Println("finish") break LOOP } } } }
接近理論值5s, 通知模擬被取消的work也正常取消
go-playground/pool在比起之前簡(jiǎn)單的協(xié)程池的基礎(chǔ)上, 對(duì)pool, worker的狀態(tài)有了很好的管理。但是,但是問(wèn)題來(lái)了,在第一個(gè)實(shí)現(xiàn)的簡(jiǎn)單groutine池和go-playground/pool中,都是先啟動(dòng)預(yù)定好的groutine來(lái)完成任務(wù)執(zhí)行,在并發(fā)量遠(yuǎn)小于任務(wù)量的情況下確實(shí)能夠做到groutine的復(fù)用,如果任務(wù)量不多則會(huì)導(dǎo)致任務(wù)分配到每個(gè)groutine不均勻,甚至可能出現(xiàn)啟動(dòng)的groutine根本不會(huì)執(zhí)行任務(wù)從而導(dǎo)致浪費(fèi),而且對(duì)于協(xié)程池也沒(méi)有動(dòng)態(tài)的擴(kuò)容和縮小。所以我又去看了一下ants的設(shè)計(jì)和實(shí)現(xiàn)。
ants
ants是一個(gè)受fasthttp啟發(fā)的高性能協(xié)程池, fasthttp號(hào)稱是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各種池化技術(shù)(這個(gè)日后再開(kāi)新坑去讀源碼), ants相比之前兩種協(xié)程池,其模型更像是之前接觸到的數(shù)據(jù)庫(kù)連接池,需要從空余的worker中取出一個(gè)來(lái)執(zhí)行任務(wù), 當(dāng)無(wú)可用空余worker的時(shí)候再去創(chuàng)建,而當(dāng)pool的容量達(dá)到上線之后,剩余的任務(wù)阻塞等待當(dāng)前進(jìn)行中的worker執(zhí)行完畢將worker放回pool, 直至pool中有空閑worker。 ants在內(nèi)存的管理上做得很好,除了定期清除過(guò)期worker(一定時(shí)間內(nèi)沒(méi)有分配到任務(wù)的worker),ants還實(shí)現(xiàn)了一種適用于大批量相同任務(wù)的pool, 這種pool與一個(gè)需要大批量重復(fù)執(zhí)行的函數(shù)鎖綁定,避免了調(diào)用方不停的創(chuàng)建,更加節(jié)省內(nèi)存。
先看一下ants的pool 結(jié)構(gòu)體 (pool.go)
type Pool struct { // 協(xié)程池的容量 (groutine數(shù)量的上限) capacity int32 // 正在執(zhí)行中的groutine running int32 // 過(guò)期清理間隔時(shí)間 expiryDuration time.Duration // 當(dāng)前可用空閑的groutine workers []*Worker // 表示pool是否關(guān)閉 release int32 // lock for synchronous operation. lock sync.Mutex // 用于控制pool等待獲取可用的groutine cond *sync.Cond // 確保pool只被關(guān)閉一次 once sync.Once // worker臨時(shí)對(duì)象池,在復(fù)用worker時(shí)減少新對(duì)象的創(chuàng)建并加速worker從pool中的獲取速度 workerCache sync.Pool // pool引發(fā)panic時(shí)的執(zhí)行函數(shù) PanicHandler func(interface{}) }
接下來(lái)看pool的工作單元 worker (worker.go)
type Worker struct { // worker 所屬的poo; pool *Pool // 任務(wù)隊(duì)列 task chan func() // 回收時(shí)間,即該worker的最后一次結(jié)束運(yùn)行的時(shí)間 recycleTime time.Time }
執(zhí)行worker的代碼 (worker.go)
func (w *Worker) run() { // pool中正在執(zhí)行的worker數(shù)+1 w.pool.incRunning() go func() { defer func() { if p := recover(); p != nil { //若worker因各種問(wèn)題引發(fā)panic, //pool中正在執(zhí)行的worker數(shù) -1, //如果設(shè)置了Pool中的PanicHandler,此時(shí)會(huì)被調(diào)用 w.pool.decRunning() if w.pool.PanicHandler != nil { w.pool.PanicHandler(p) } else { log.Printf("worker exits from a panic: %v", p) } } }() // worker 執(zhí)行任務(wù)隊(duì)列 for f := range w.task { //任務(wù)隊(duì)列中的函數(shù)全部被執(zhí)行完后, //pool中正在執(zhí)行的worker數(shù) -1, //將worker 放回對(duì)象池 if f == nil { w.pool.decRunning() w.pool.workerCache.Put(w) return } f() //worker 執(zhí)行完任務(wù)后放回Pool //使得其余正在阻塞的任務(wù)可以獲取worker w.pool.revertWorker(w) } }() }
了解了工作單元worker如何執(zhí)行任務(wù)以及與pool交互后,回到pool中查看其實(shí)現(xiàn), pool的核心就是取出可用worker提供給任務(wù)執(zhí)行 (pool.go)
// 向pool提交任務(wù) func (p *Pool) Submit(task func()) error { if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } // 獲取pool中的可用worker并向其任務(wù)隊(duì)列中寫入任務(wù) p.retrieveWorker().task <- task return nil } // **核心代碼** 獲取可用worker func (p *Pool) retrieveWorker() *Worker { var w *Worker p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 // 當(dāng)前pool中有可用worker, 取出(隊(duì)尾)worker并執(zhí)行 if n >= 0 { w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() // 當(dāng)前pool中無(wú)空閑worker,且pool數(shù)量未達(dá)到上線 // pool會(huì)先從臨時(shí)對(duì)象池中尋找是否有已完成任務(wù)的worker, // 若臨時(shí)對(duì)象池中不存在,則重新創(chuàng)建一個(gè)worker并將其啟動(dòng) if cacheWorker := p.workerCache.Get(); cacheWorker != nil { w = cacheWorker.(*Worker) } else { w = &Worker{ pool: p, task: make(chan func(), workerChanCap), } } w.run() } else { // pool中沒(méi)有空余worker且達(dá)到并發(fā)上限 // 任務(wù)會(huì)阻塞等待當(dāng)前運(yùn)行的worker完成任務(wù)釋放會(huì)pool for { p.cond.Wait() // 等待通知, 暫時(shí)阻塞 l := len(p.workers) - 1 if l < 0 { continue } // 當(dāng)有可用worker釋放回pool之后, 取出 w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] break } p.lock.Unlock() } return w } // 釋放worker回pool func (p *Pool) revertWorker(worker *Worker) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) // 通知pool中已經(jīng)獲取鎖的groutine, 有一個(gè)worker已完成任務(wù) p.cond.Signal() p.lock.Unlock() }
在批量并發(fā)任務(wù)的執(zhí)行過(guò)程中, 如果有超過(guò)5納秒(ants中默認(rèn)worker過(guò)期時(shí)間為5ns)的worker未被分配新的任務(wù),則將其作為過(guò)期worker清理掉,從而保證pool中可用的worker都能發(fā)揮出最大的作用以及將任務(wù)分配得更均勻
(pool.go)
// 該函數(shù)會(huì)在pool初始化后在協(xié)程中啟動(dòng) func (p *Pool) periodicallyPurge() { // 創(chuàng)建一個(gè)5ns定時(shí)的心跳 heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() for range heartbeat.C { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { p.lock.Unlock() return } n := -1 for i, w := range idleWorkers { // 因?yàn)閜ool 的worker隊(duì)列是先進(jìn)后出的,所以正序遍歷可用worker時(shí)前面的往往里當(dāng)前時(shí)間越久 if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } // 如果worker最后一次運(yùn)行時(shí)間距現(xiàn)在超過(guò)5納秒,視為過(guò)期,worker收到nil, 執(zhí)行上述worker.go中 if n == nil 的操作 n = i w.task <- nil idleWorkers[i] = nil } if n > -1 { // 全部過(guò)期 if n >= len(idleWorkers)-1 { p.workers = idleWorkers[:0] } else { // 部分過(guò)期 p.workers = idleWorkers[n+1:] } } p.lock.Unlock() } }
測(cè)試
func TestAnts(t *testing.T) { wg := sync.WaitGroup{} pool, _ := ants.NewPool(20) defer pool.Release() for i := 0; i < 100; i++ { wg.Add(1) pool.Submit(sendMail(i, &wg)) } wg.Wait() } func sendMail(i int, wg *sync.WaitGroup) func() { return func() { time.Sleep(time.Second * 1) fmt.Println("send mail to ", i) wg.Done() } }
這里雖只簡(jiǎn)單的測(cè)試批量并發(fā)任務(wù)的場(chǎng)景, 如果大家有興趣可以去看看ants的壓力測(cè)試, ants的吞吐量能夠比原生groutine高出N倍,內(nèi)存節(jié)省10到20倍, 可謂是協(xié)程池中的神器。
借用ants作者的原話來(lái)說(shuō):
然而又有多少場(chǎng)景是單臺(tái)機(jī)器需要扛100w甚至1000w同步任務(wù)的?基本沒(méi)有??!結(jié)果就是造出了屠龍刀,可是世界上沒(méi)有龍??!也是無(wú)情…
Over
一口氣從簡(jiǎn)單到復(fù)雜總結(jié)了三個(gè)協(xié)程池的實(shí)現(xiàn),受益匪淺, 感謝各開(kāi)源庫(kù)的作者, 雖然世界上沒(méi)有龍,但是屠龍技是必須練的,因?yàn)樗拖翊婵?,不一定要全部都用了,但是一定不能沒(méi)有!
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- golang協(xié)程池模擬實(shí)現(xiàn)群發(fā)郵件功能
- GO實(shí)現(xiàn)協(xié)程池管理的方法
- Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn)
- Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例
- Golang協(xié)程池的實(shí)現(xiàn)與應(yīng)用
- Go高級(jí)特性探究之協(xié)程池詳解
- 詳解Go語(yǔ)言如何實(shí)現(xiàn)一個(gè)最簡(jiǎn)化的協(xié)程池
- Golang線程池與協(xié)程池的使用
- golang實(shí)現(xiàn)協(xié)程池的方法示例
- go協(xié)程池實(shí)現(xiàn)原理小結(jié)
相關(guān)文章
Go語(yǔ)言基礎(chǔ)語(yǔ)法之結(jié)構(gòu)體及方法詳解
結(jié)構(gòu)體類型可以用來(lái)保存不同類型的數(shù)據(jù),也可以通過(guò)方法的形式來(lái)聲明它的行為。本文將介紹go語(yǔ)言中的結(jié)構(gòu)體和方法,以及“繼承”的實(shí)現(xiàn)方法2021-09-09Golang高性能持久化解決方案BoltDB數(shù)據(jù)庫(kù)介紹
這篇文章主要為大家介紹了Golang高性能持久化解決方案BoltDB數(shù)據(jù)庫(kù)介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-11-11