Go語言協(xié)程池的實現(xiàn)示例
Go語言雖然有著高效的GMP調度模型,理論上支持成千上萬的goroutine,但是goroutine過多,對調度,gc以及系統(tǒng)內存都會造成壓力,這樣會使我們的服務性能不升反降。常用做法可以用池化技術,構造一個協(xié)程池,把進程中的協(xié)程控制在一定的數(shù)量,防止系統(tǒng)中goroutine過多,影響服務性能。
協(xié)程池模型
協(xié)程池簡單理解就是有一個池子一樣的東西,里面裝有固定數(shù)量的goroutine,當有一個任務到來的時候,會將這個任務交給池子里的一個空閑的goroutine去處理,如果池子里沒有空閑的goroutine了,任務就會阻塞等待。所以協(xié)程池有三個角色Worker,Task,Pool。
屬性定義
- Worker:用于執(zhí)行任務的goroutine
- Task: 具體的任務
- Pool: 池子
下面看一下各個角色的定義:
Task定義
Task有一個函數(shù)成員,表示這個task具體的執(zhí)行邏輯:
type Task struct {
f func() error // 具體的執(zhí)行邏輯
}Pool定義
Pool有兩個成員,Capacity表示池子里的worker的數(shù)量,即工作的goroutine的數(shù)量,JobCh表示任務隊列用于存放任務,goroutine從這個JobCh獲取任務執(zhí)行任務邏輯:
type Pool struct {
RunningWorkers int64 // 運行著的worker數(shù)量
Capacity int64 // 協(xié)程池worker容量---goroutine數(shù)量
JobCh chan *Task // 用于worker取任務
sync.Mutex
}Worker 定義
// p為Pool對象指針
for task := range p.JobCh {
do ...
}執(zhí)行任務單元,簡單理解就是干活的goroutine,這個worker其實只做一件事情,就是不斷的從任務隊列里面取任務執(zhí)行,而worker的數(shù)量就是協(xié)程池里協(xié)程的數(shù)量,由Pool的參數(shù)指定。
方法定義
NewTask用于創(chuàng)建一個任務,參數(shù)是一個函數(shù),返回值是一個Task類型。
func NewTask(funcArg func() error) *Task
NewPool返回一個協(xié)程數(shù)量固定為Capacity協(xié)程池對象指針,其任務隊列的長度為taskNum。
func NewPool(Capacity int, taskNum int) *Pool
接下來主要介紹協(xié)程池的各個方法:
AddTask方法是往協(xié)程池添加任務,如果當前運行著的worker數(shù)量小于協(xié)程池worker容量,則立即啟動一個協(xié)程worker來處理任務,否則將任務添加到任務隊列。
func (p *Pool) AddTask(task *Task)
Run方法將協(xié)程池跑起來,啟動一個worker來處理任務。
func (p *Pool) Run()
協(xié)程池處理任務流程圖:

協(xié)程池實現(xiàn):
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Task struct {
f func() error // 具體的任務邏輯
}
func NewTask(funcArg func() error) *Task {
return &Task{
f: funcArg,
}
}
type Pool struct {
RunningWorkers int64 // 運行著的worker數(shù)量
Capacity int64 // 協(xié)程池worker容量
JobCh chan *Task // 用于worker取任務
sync.Mutex
}
func NewPool(capacity int64, taskNum int) *Pool {
return &Pool{
Capacity: capacity,
JobCh: make(chan *Task, taskNum),
}
}
func (p *Pool) GetCap() int64 {
return p.Capacity
}
func (p *Pool) incRunning() { // runningWorkers + 1
atomic.AddInt64(&p.RunningWorkers, 1)
}
func (p *Pool) decRunning() { // runningWorkers - 1
atomic.AddInt64(&p.RunningWorkers, -1)
}
func (p *Pool) GetRunningWorkers() int64 {
return atomic.LoadInt64(&p.RunningWorkers)
}
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
}()
for task := range p.JobCh {
task.f()
}
}()
}
// AddTask 往協(xié)程池添加任務
func (p *Pool) AddTask(task *Task) {
// 加鎖防止啟動多個 worker
p.Lock()
defer p.Unlock()
if p.GetRunningWorkers() < p.GetCap() { // 如果任務池滿, 則不再創(chuàng)建 worker
// 創(chuàng)建啟動一個 worker
p.run()
}
// 將任務推入隊列, 等待消費
p.JobCh <- task
}
func main() {
// 創(chuàng)建任務池
pool := NewPool(3, 10)
for i := 0; i < 20; i++ {
// 任務放入池中
pool.AddTask(NewTask(func() error {
fmt.Printf("I am Task\n")
return nil
}))
}
time.Sleep(1e9) // 等待執(zhí)行
}運行結果:
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
I am Task
程序創(chuàng)建了一個Worker數(shù)量為3,任務隊列長度為10的協(xié)程池,往里面添加了20個任務,可以看到輸出,一直只有3個worker在做任務,起到了控制goroutine數(shù)量的作用。
到此這篇關于Go語言協(xié)程池的實現(xiàn)示例的文章就介紹到這了,更多相關Go語言協(xié)程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
go select編譯期的優(yōu)化處理邏輯使用場景分析
select 是 Go 中的一個控制結構,類似于用于通信的 switch 語句。每個 case 必須是一個通信操作,要么是發(fā)送要么是接收。接下來通過本文給大家介紹go select編譯期的優(yōu)化處理邏輯使用場景分析,感興趣的朋友一起看看吧2021-06-06
Go語言時間管理利器之深入解析time模塊的實戰(zhàn)技巧
本文深入解析了Go語言標準庫中的time模塊,揭示了其高效用法和實用技巧,通過學習time模塊的三大核心類型(Time、Duration、Timer/Ticker)以及高頻使用場景,開發(fā)者可以更好地處理時間相關的任務,感興趣的朋友一起看看吧2025-03-03
Golang?Gin解析JSON請求數(shù)據(jù)避免出現(xiàn)EOF錯誤
這篇文章主要為大家介紹了Golang?Gin?優(yōu)雅地解析JSON請求數(shù)據(jù),避免ShouldBindBodyWith出現(xiàn)EOF錯誤的源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04

