Go使用TimerController解決timer過多的問題
背景
- 在Go里面我們實(shí)現(xiàn)超時(shí)需要起一個(gè)goroutine才能實(shí)現(xiàn),但是當(dāng)我有大量的任務(wù)需要做超時(shí)控制就需要起大量的goroutine,實(shí)際上是一種開銷和負(fù)擔(dān)!
- 有些時(shí)候需要注冊(cè)一些Timer也是有需要起大量的 goroutine才能實(shí)現(xiàn),比如我要異步定期刷新一個(gè)配置,異步的監(jiān)聽啥東西,此時(shí)簡(jiǎn)單做法就是使用大量的 goroutine + timer/sleep實(shí)現(xiàn)!
解決思路
多路復(fù)用,實(shí)際上Go底層也是一種多路復(fù)用的思想去實(shí)現(xiàn)的timer,但是它是底層的timer,我們需要解決的問題就過多的timer問題!
我們的思路是實(shí)現(xiàn)一個(gè) TimerController 可以幫助我們管理很多個(gè)timer,并且可以開銷做到最低!因此使用一個(gè) 小頂堆 + Timer調(diào)度器即可實(shí)現(xiàn)!
實(shí)現(xiàn)
小頂堆(最小堆)
使用Go自帶的 container/heap
實(shí)現(xiàn) 小頂堆
import ( "container/heap" ) type HeapItem[T any] interface { Less(HeapItem[T]) bool GetValue() T } // 參考 IntHeap type heapQueue[T any] []HeapItem[T] func (h heapQueue[T]) Len() int { return len(h) } func (h heapQueue[T]) Less(i, j int) bool { return h[i].Less(h[j]) } func (h heapQueue[T]) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *heapQueue[T]) Push(x any) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. *h = append(*h, x.(HeapItem[T])) } func (h *heapQueue[T]) Pop() any { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } type HeapQueue[T any] struct { queue heapQueue[T] } func (h *HeapQueue[T]) ptr() *heapQueue[T] { return &h.queue } // NewHeapQueue 非并發(fā)安全 func NewHeapQueue[T any](items ...HeapItem[T]) *HeapQueue[T] { queue := make(heapQueue[T], len(items)) for index, item := range items { queue[index] = item } heap.Init(&queue) return &HeapQueue[T]{queue: queue} } func (h *HeapQueue[T]) Push(item HeapItem[T]) { heap.Push(h.ptr(), item) } func (h *HeapQueue[T]) Pop() (T, bool ) { if h.ptr().Len() == 0 { var Nil T return Nil, false } return heap.Pop(h.ptr()).(HeapItem[T]).GetValue(), true } // Peek 方法用于返回堆頂元素而不移除它 func (h *HeapQueue[T]) Peek() (T, bool ) { if h.ptr().Len() > 0 { return h.queue[0].GetValue(), true } var Nil T return Nil, false } func (h *HeapQueue[T]) Len() int { return h.ptr().Len() }
調(diào)度器
type Timer struct { Timeout time.Time Name string NotifyFunc func() } func (t *Timer) GetCurTimeout() time.Duration { return t.Timeout.Sub(time.Now()) } // Notify todo support async notify func (t *Timer) Notify() { if t.NotifyFunc != nil { t.NotifyFunc() } } func (t *Timer) IsExpired() bool { return t.Timeout.Before(time.Now()) } func (t *Timer) Less(v HeapItem[*Timer]) bool { return t.Timeout.Before(v.GetValue().Timeout) } func (t *Timer) GetValue() *Timer { return t } type TimerController struct { timers chan *Timer minHeap *HeapQueue[*Timer] closeOnce sync.Once close chan struct{} } func (t *TimerController) AddTimer(timer *Timer) bool { if timer == nil { return false } select { case <-t.close: return false default: t.timers <- timer return true } } func (t *TimerController) Close() { t.closeOnce.Do(func() { close(t.close) }) } func NewTimerController(bufferSize int) *TimerController { return &TimerController{ timers: make(chan *Timer, bufferSize), minHeap: NewHeapQueue[*Timer](), close: make(chan struct{}), } } func (t *TimerController) Start() { go t._start() } func (t *TimerController) _start() { const defaultTimeout = time.Hour * 24 var ( curMinTimer *Timer timeout = time.NewTimer(defaultTimeout) ) for { select { case <-t.close: close(t.timers) timeout.Stop() return case timer := <-t.timers: t.minHeap.Push(timer) curMinTimer, _ = t.minHeap.Peek() timeout.Reset(curMinTimer.GetCurTimeout()) //fmt.Printf("timeout.Reset-1 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout()) case <-timeout.C: if curMinTimer != nil { curMinTimer.Notify() curMinTimer = nil t.minHeap.Pop() } curMinTimer, _ = t.minHeap.Peek() if curMinTimer == nil { timeout.Reset(defaultTimeout) continue } timeout.Reset(curMinTimer.GetCurTimeout()) //fmt.Printf("timeout.Reset-2 name: %s, timeout: %s\n", curMinTimer.Name, curMinTimer.GetCurTimeout()) } } }
測(cè)試
func TestTimerController(t *testing.T) { controller := NewTimerController(1024) controller.Start() defer controller.Close() now := time.Now() arrs := make([]string, 0) NewTimer := func(num int) *Timer { return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: strconv.Itoa(num), NotifyFunc: func() { arrs = append(arrs, strconv.Itoa(num)) }} } // 這里亂序的注冊(cè)了8個(gè)timer controller.AddTimer(NewTimer(5)) controller.AddTimer(NewTimer(6)) controller.AddTimer(NewTimer(3)) controller.AddTimer(NewTimer(4)) controller.AddTimer(NewTimer(7)) controller.AddTimer(NewTimer(8)) controller.AddTimer(NewTimer(1)) controller.AddTimer(NewTimer(2)) time.Sleep(time.Second * 1) t.Logf("%#v\n", arrs) // 最終我們可以獲取到 順序執(zhí)行的! assert.Equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"}) } func TestTimerController_Stable(t *testing.T) { controller := NewTimerController(1024) controller.Start() defer controller.Close() now := time.Now() arrs := make(map[string]bool, 0) NewTimer := func(num int, name string) *Timer { return &Timer{Timeout: now.Add(time.Duration(num) * time.Millisecond), Name: name, NotifyFunc: func() { arrs[name] = true }} } // 我們重復(fù)注冊(cè)了相同實(shí)現(xiàn)執(zhí)行的 timer,那么預(yù)期是每次執(zhí)行的結(jié)果和注冊(cè)順序一致 controller.AddTimer(NewTimer(2, "1")) controller.AddTimer(NewTimer(2, "2")) controller.AddTimer(NewTimer(2, "3")) controller.AddTimer(NewTimer(2, "4")) controller.AddTimer(NewTimer(2, "5")) time.Sleep(time.Second * 1) t.Logf("%#v\n", arrs) assert.Equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true}) }
以上就是Go使用TimerController解決timer過多的問題的詳細(xì)內(nèi)容,更多關(guān)于Go TimerController解決timer過多的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
go語言yaml轉(zhuǎn)map、map遍歷的實(shí)現(xiàn)
本文主要介紹了go語言yaml轉(zhuǎn)map、map遍歷的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09從Context到go設(shè)計(jì)理念輕松上手教程
這篇文章主要為大家介紹了從Context到go設(shè)計(jì)理念輕松上手教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09Go使用Gin+mysql實(shí)現(xiàn)增刪改查的詳細(xì)實(shí)例
golang本身沒有提供連接mysql的驅(qū)動(dòng),但是定義了標(biāo)準(zhǔn)接口供第三方開發(fā)驅(qū)動(dòng),下面這篇文章主要給大家介紹了關(guān)于Go使用Gin+mysql實(shí)現(xiàn)增刪改查的相關(guān)資料,需要的朋友可以參考下2022-12-12Go結(jié)合MQTT實(shí)現(xiàn)通信的示例代碼
本文主要介紹了Go結(jié)合MQTT實(shí)現(xiàn)通信的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05Go項(xiàng)目在linux服務(wù)器的部署詳細(xì)步驟
在今天的軟件開發(fā)中,使用Linux作為操作系統(tǒng)的比例越來越高,而Golang語言則因?yàn)槠涓咝?、?jiǎn)潔和并發(fā)性能等特點(diǎn),也被越來越多的開發(fā)者所青睞,這篇文章主要給大家介紹了關(guān)于Go項(xiàng)目在linux服務(wù)器的部署詳細(xì)步驟,需要的朋友可以參考下2023-09-09