GoLang中的timer定時器實現原理分析
// NewTimer creates a new Timer that will send // the current time on its channel after at least duration d. func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t } // The Timer type represents a single event. // When the Timer expires, the current time will be sent on C, // unless the Timer was created by AfterFunc. // A Timer must be created with NewTimer or AfterFunc. type Timer struct { C <-chan Time r runtimeTimer }
func NewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t } type Ticker struct { C <-chan Time // The channel on which the ticks are delivered. r runtimeTimer }
ticker 跟 timer 的初始化過程差不多,但是 ticker 比 timer 多了一個 period 參數,意為間隔的意思。
// Interface to timers implemented in package runtime. // Must be in sync with ../runtime/time.go:/^type timer type runtimeTimer struct { pp uintptr when int64 //觸發(fā)時間 period int64 //執(zhí)行周期性任務的時間間隔 f func(any, uintptr) // 執(zhí)行的回調函數,NOTE: must not be closure arg any //執(zhí)行任務的參數 seq uintptr //回調函數的參數,該參數僅在 netpoll 的應用場景下使用 nextwhen int64 //如果是周期性任務,下次執(zhí)行任務時間 status uint32 //狀態(tài) } // sendTime does a non-blocking send of the current time on c. func sendTime(c any, seq uintptr) { select { case c.(chan Time) <- Now(): default: } }
sendTime 采用非阻塞的形式,意為,不管是否存在接收方,此定時器一旦到時間了就要觸發(fā)掉。
// runtime/runtime2.go type p struct { ..... // The when field of the first entry on the timer heap. // This is updated using atomic functions. // This is 0 if the timer heap is empty. // 堆頂元素什么時候執(zhí)行 timer0When uint64 // The earliest known nextwhen field of a timer with // timerModifiedEarlier status. Because the timer may have been // modified again, there need not be any timer with this value. // This is updated using atomic functions. // This is 0 if there are no timerModifiedEarlier timers. // 如果有timer修改為更早執(zhí)行時間了,將會將執(zhí)行時間更新到更早時間 timerModifiedEarliest uint64 // Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. // 操作timer的互斥鎖 timersLock mutex // Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. //該p 上的所有timer,必須加鎖去操作這個字段,因為不同的p 操作這個字段會有競爭關系 timers []*timer // Number of timers in P's heap. // Modified using atomic instructions. //p 堆上所有的timer數 numTimers uint32 // Number of timerDeleted timers in P's heap. // Modified using atomic instructions. //被標記為刪除的timer,要么是我們調用stop,要么是timer 自己觸發(fā)后過期導致的刪除 deletedTimers uint32 } // runtime/time.go type timer struct { // If this timer is on a heap, which P's heap it is on. // puintptr rather than *p to match uintptr in the versions // of this struct defined in other packages. pp puintptr // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. // // when must be positive on an active timer. when int64 period int64 f func(any, uintptr) arg any seq uintptr // What to set the when field to in timerModifiedXX status. nextwhen int64 // The status field holds one of the values below. status uint32 } // startTimer adds t to the timer heap. //go:linkname startTimer time.startTimer func startTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t) } // stopTimer stops a timer. // It reports whether t was stopped before being run. //go:linkname stopTimer time.stopTimer func stopTimer(t *timer) bool { return deltimer(t) } // addtimer adds a timer to the current P. // This should only be called with a newly created timer. // That avoids the risk of changing the when field of a timer in some P's heap, // which could cause the heap to become unsorted. func addtimer(t *timer) { // when must be positive. A negative value will cause runtimer to // overflow during its delta calculation and never expire other runtime // timers. Zero will cause checkTimers to fail to notice the timer. if t.when <= 0 { throw("timer when must be positive") } if t.period < 0 { throw("timer period must be non-negative") } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaiting when := t.when // Disable preemption while using pp to avoid changing another P's heap. // 如果M在此之后被別的P搶占了,那么后續(xù)操作的就是別的P上的timers,這是不允許的 mp := acquirem() pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) // 清理掉已經過期的timer,以提高添加和刪除timer的效率。 doaddtimer(pp, t) // 執(zhí)行添加操作 unlock(&pp.timersLock) // 調用 wakeNetPoller 方法,喚醒網絡輪詢器,檢查計時器被喚醒的時間(when)是 // 否在當前輪詢預期運行的時間(pollerPollUntil)內,若是喚醒。 // 有的定時器是伴隨著網絡輪訓器的,比如設置的 i/o timeout // This can have a spurious wakeup but should never miss a wakeup // 寧愿出現錯誤的喚醒,也不能漏掉一個喚醒 wakeNetPoller(when) releasem(mp) } // 將0位置的timer與下面的子節(jié)點比較,如果比子節(jié)點大則下移。子節(jié)點i*4 + 1,i*4 + 2,i*4 + 3,i*4 + 4 siftdownTimer(pp.timers, 0) // 將i位置的timer與上面的父節(jié)點比較,如果比父節(jié)點小則上移。父節(jié)點是(i - 1) / 4 siftupTimer(pp.timers, i)
timer
存儲在P
中的 timers []*timer
成員屬性上。timers看起來是一個切片,但是它是按照runtimeTimer.when
這個數值排序的小頂堆四叉樹,觸發(fā)時間越早越排在前面。
整體來講就是父節(jié)點一定比其子節(jié)點小,子節(jié)點之間沒有任何關系和大小的要求。
關于acquirem
和releasem
//go:nosplit func acquirem() *m { _g_ := getg() _g_.m.locks++ return _g_.m } //go:nosplit func releasem(mp *m) { _g_ := getg() mp.locks-- if mp.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } }
acquirem函數獲取當前M,并禁止M被搶占,因為M被搶占時的判斷如下
//C:\Go\src\runtime\preempt.go +287 func canPreemptM(mp *m) bool { return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning }
- 運行時沒有禁止搶占(
m.locks == 0
) - 運行時沒有在執(zhí)行內存分配(
m.mallocing == 0
) - 運行時沒有關閉搶占機制(
m.preemptoff == ""
) - M 與 P 綁定且沒有進入系統調用(
p.status == _Prunning
)
timers的觸發(fā)
// runtime/proc.go func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) // runtime/time.go func runtimer(pp *p, now int64) int64 func runOneTimer(pp *p, t *timer, now int64)
runtime/time.go
文件中提供了checkTimers/runtimer/runOneTimer
三個方法。checkTimers方法中,如果當前p的timers長度不為0,就不斷地調用runtimers。runtimes會根據堆頂的timer的狀態(tài)判斷其能否執(zhí)行,如果可以執(zhí)行就調用runOneTimer實際執(zhí)行。
觸發(fā)定時器的途徑有兩個
- 通過調度器在調度時進行計時器的觸發(fā),findrunnable, schedule, stealWork。
- 通過系統監(jiān)控檢查并觸發(fā)計時器(到期未執(zhí)行),sysmon。
調度器的觸發(fā)一共分兩種情況,一種是在調度循環(huán)的時候調用 checkTimers 方法進行計時器的觸發(fā)。另外一種是當前處理器 P 沒有可執(zhí)行的 Timer,且沒有可執(zhí)行的 G。那么按照調度模型,就會去竊取其他計時器和 G。
即使是通過每次調度器調度和竊取的時候觸發(fā),但畢竟是具有一定的隨機和不確定性,因此系統監(jiān)控觸發(fā)依然是一個兜底保障,在 Go 語言中 runtime.sysmon 方法承擔了這一個責任,存在觸發(fā)計時器的邏輯,在每次進行系統監(jiān)控時,都會在流程上調用 timeSleepUntil 方法去獲取下一個計時器應觸發(fā)的時間,以及保存該計時器已打開的計時器堆的 P。
在獲取完畢后會馬上檢查當前是否存在 GC,若是正在 STW 則獲取調度互斥鎖。若發(fā)現下一個計時器的觸發(fā)時間已經過去,則重新調用 timeSleepUntil 獲取下一個計時器的時間和相應 P 的地址。檢查 sched.sysmonlock 所花費的時間是否超過 50μs。若是,則有可能前面所獲取的下一個計時器觸發(fā)時間已過期,因此重新調用 timeSleepUntil 方法再次獲取。如果發(fā)現超過 10ms 的時間沒有進行 netpoll 網絡輪詢,則主動調用 netpoll 方法觸發(fā)輪詢。同時如果存在不可搶占的處理器 P,則調用 startm 方法來運行那些應該運行,但沒有在運行的計時器。
到此這篇關于GoLang中的timer定時器實現原理分析的文章就介紹到這了,更多相關Go timer定時器內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
GO?CountMinSketch計數器(布隆過濾器思想的近似計數器)
這篇文章主要介紹了GO?CountMinSketch計數器(布隆過濾器思想的近似計數器),文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的朋友可以參考一下2022-09-09詳解Golang time包中的time.Duration類型
在日常開發(fā)過程中,會頻繁遇到對時間進行操作的場景,使用 Golang 中的 time 包可以很方便地實現對時間的相關操作,本文講解一下 time 包中的 time.Duration 類型,需要的朋友可以參考下2023-07-07