Go實(shí)現(xiàn)后臺(tái)任務(wù)調(diào)度系統(tǒng)的實(shí)例代碼
一、背景
平常我們?cè)陂_(kāi)發(fā)API的時(shí)候,前端傳遞過(guò)來(lái)的大批數(shù)據(jù)需要經(jīng)過(guò)后端處理,如果后端處理的速度快,前端響應(yīng)就快,反之則很慢,影響用戶(hù)體驗(yàn)。針對(duì)這種場(chǎng)景我們一般都是后臺(tái)異步處理,不需要前端等待所有的都執(zhí)行完才返回。為了解決這一問(wèn)題,需要我們自己實(shí)現(xiàn)后臺(tái)任務(wù)調(diào)度系統(tǒng)。
二、任務(wù)調(diào)度器實(shí)現(xiàn)
poll.go
package poller import ( "context" "fmt" "log" "sync" "time" ) type Poller struct { routineGroup *goroutineGroup // 并發(fā)控制 workerNum int // 記錄同時(shí)在運(yùn)行的最大goroutine數(shù) sync.Mutex ready chan struct{} // 某個(gè)goroutine已經(jīng)準(zhǔn)備好了 metric *metric // 統(tǒng)計(jì)當(dāng)前在運(yùn)行中的goroutine數(shù)量 } func NewPoller(workerNum int) *Poller { return &Poller{ routineGroup: newRoutineGroup(), workerNum: workerNum, ready: make(chan struct{}, 1), metric: newMetric(), } } // 調(diào)度器 func (p *Poller) schedule() { p.Lock() defer p.Unlock() if int(p.metric.BusyWorkers()) >= p.workerNum { return } select { case p.ready <- struct{}{}: // 只要滿(mǎn)足當(dāng)前goroutine數(shù)量小于最大goroutine數(shù)量 那么就通知poll去調(diào)度goroutine執(zhí)行任務(wù) default: } } func (p *Poller) Poll(ctx context.Context) error { for { // step01 p.schedule() // 調(diào)度 select { case <-p.ready: // goroutine準(zhǔn)備好之后 這里就會(huì)有消息 case <-ctx.Done(): return nil } LOOP: for { select { case <-ctx.Done(): break LOOP default: // step02 task, err := p.fetch(ctx) // 獲取任務(wù) if err != nil { log.Println("fetch task error:", err.Error()) break } fmt.Println(task) p.metric.IncBusyWorker() // 當(dāng)前正在運(yùn)行的goroutine+1 // step03 p.routineGroup.Run(func() { // 執(zhí)行任務(wù) if err := p.execute(ctx, task); err != nil { log.Println("execute task error:", err.Error()) } }) break LOOP } } } } func (p *Poller) fetch(ctx context.Context) (string, error) { time.Sleep(1000 * time.Millisecond) return "task", nil } func (p *Poller) execute(ctx context.Context, task string) error { defer func() { p.metric.DecBusyWorker() // 執(zhí)行完成之后 goroutine數(shù)量-1 p.schedule() // 重新調(diào)度下一個(gè)goroutine去執(zhí)行任務(wù) 這一步是必須的 }() return nil }
metric.go
package poller import "sync/atomic" type metric struct { busyWorkers uint64 } func newMetric() *metric { return &metric{} } func (m *metric) IncBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, 1) } func (m *metric) DecBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) } func (m *metric) BusyWorkers() uint64 { return atomic.LoadUint64(&m.busyWorkers) }
goroutine_group.go
package poller import "sync" type goroutineGroup struct { waitGroup sync.WaitGroup } func newRoutineGroup() *goroutineGroup { return new(goroutineGroup) } func (g *goroutineGroup) Run(fn func()) { g.waitGroup.Add(1) go func() { defer g.waitGroup.Done() fn() }() } func (g *goroutineGroup) Wait() { g.waitGroup.Wait() }
三、測(cè)試
package main import ( "context" "fmt" "ta/poller" "go.uber.org/goleak" "testing" ) func TestMain(m *testing.M) { fmt.Println("start") goleak.VerifyTestMain(m) } func TestPoller(t *testing.T) { producer := poller.NewPoller(5) producer.Poll(context.Background()) }
結(jié)果:
四、總結(jié)
大家用別的方式也可以實(shí)現(xiàn),核心要點(diǎn)就是控制并發(fā)節(jié)奏,防止大量請(qǐng)求打到task service
,在這里起到核心作用的就是schedule
,它控制著整個(gè)任務(wù)系統(tǒng)的調(diào)度。同時(shí)還封裝了WaitGroup
,這在大多數(shù)開(kāi)源代碼中都比較常見(jiàn),大家可以去嘗試。另外就是test case
一定得跟上,防止goroutine
泄漏。
以上就是Go實(shí)現(xiàn)后臺(tái)任務(wù)調(diào)度系統(tǒng)的實(shí)例代碼的詳細(xì)內(nèi)容,更多關(guān)于Go后臺(tái)任務(wù)調(diào)度系統(tǒng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過(guò)案例詳細(xì)聊聊Go語(yǔ)言的變量與常量
在任何一門(mén)現(xiàn)代的高級(jí)語(yǔ)言中,變量和常量都是它非?;A(chǔ)的程序結(jié)構(gòu)的組成部分,下面這篇文章主要給大家介紹了關(guān)于如何通過(guò)案例詳細(xì)聊聊Go語(yǔ)言的變量與常量的相關(guān)資料,需要的朋友可以參考下2023-03-03Go?slice切片make生成append追加copy復(fù)制示例
這篇文章主要為大家介紹了Go使用make生成切片、使用append追加切片元素、使用copy復(fù)制切片使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06一文帶你了解Golang中select的實(shí)現(xiàn)原理
select是go提供的一種跟并發(fā)相關(guān)的語(yǔ)法,非常有用。本文將介紹?Go?語(yǔ)言中的?select?的實(shí)現(xiàn)原理,包括?select?的結(jié)構(gòu)和常見(jiàn)問(wèn)題、編譯期間的多種優(yōu)化以及運(yùn)行時(shí)的執(zhí)行過(guò)程2023-02-02Golang?int函數(shù)使用實(shí)例全面教程
這篇文章主要為大家介紹了Golang?int函數(shù)使用實(shí)例全面教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10golang優(yōu)化目錄遍歷的實(shí)現(xiàn)方法
對(duì)于go1.16的新變化,大家印象最深的可能是io包的大規(guī)模重構(gòu),但這個(gè)重構(gòu)實(shí)際上還引進(jìn)了一個(gè)優(yōu)化,這篇文章要說(shuō)的就是這個(gè)優(yōu)化,所以本將給大家介紹golang是如何優(yōu)化目錄遍歷的,需要的朋友可以參考下2024-08-08Go語(yǔ)言實(shí)現(xiàn)一個(gè)Http?Server框架(一)?http庫(kù)的使用
本文主要介紹用Go語(yǔ)言實(shí)現(xiàn)一個(gè)Http?Server框架中對(duì)http庫(kù)的基本使用說(shuō)明,文中有詳細(xì)的代碼示例,感興趣的同學(xué)可以借鑒一下2023-04-04Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解
這篇文章主要為大家介紹了Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12