Go底層select的原理分析
背景
select多路復(fù)用在go的異步和并發(fā)控制場景中非常好用,對(duì)于無case和只有單個(gè)case的情況,編譯器在編譯的時(shí)候就會(huì)對(duì)其做優(yōu)化,無case就相當(dāng)于調(diào)用了一個(gè)阻塞函數(shù),單個(gè)case就相當(dāng)于對(duì)一個(gè)通道進(jìn)行讀寫操作,如果單個(gè)case中有default分支時(shí),就相當(dāng)于是一個(gè)if else邏輯,對(duì)于多個(gè)case的情況,是在運(yùn)行時(shí)調(diào)用selectgo函數(shù)決定的,接下來我們就來研究一下selectgo函數(shù)。
go版本
$ go version go version go1.21.4 windows/386
selectgo函數(shù)解釋
【1】函數(shù)參數(shù)解釋
selectgo函數(shù)位于:src/runtime/select.go中,定義如下:
//cas0:case數(shù)組地址,按照往通道寫數(shù)據(jù)在前,從通道讀數(shù)據(jù)在后的排列順序(編譯時(shí)編譯器優(yōu)化行為操作的) //nsends:往通道寫數(shù)據(jù)的case數(shù)量 //nrecvs:從通道讀數(shù)據(jù)的case數(shù)量 //block:是否阻塞 //返回值分別代表選中規(guī)定case位置和是否成功從通道接收數(shù)據(jù),如果選中的是default,第一個(gè)返回值就返回-1 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
select中每一個(gè)case都對(duì)應(yīng)一個(gè)scase結(jié)構(gòu),定義如下:
type scase struct { c *hchan //case對(duì)應(yīng)的讀或?qū)懲ǖ? elem unsafe.Pointer //指向要寫入元素或存放讀取元素的地址 }
【2】函數(shù)具體解釋
selectgo函數(shù)中會(huì)遍歷所有的case,為確保遍歷case的隨機(jī)性和安全性,有兩個(gè)關(guān)鍵的順序:pollorder和lockorder,不用關(guān)心其具體實(shí)現(xiàn),明白其的作用就行。
pollorder
:隨機(jī)的case順序,確保公平的處理每一個(gè)case。lockorder
:加鎖的case順序,確保并發(fā)安全。
計(jì)算出pollorder和lockorder順序之后,會(huì)根據(jù)這2個(gè)順序進(jìn)行遍歷分為了3步。
第一步:遍歷pollorder,選出準(zhǔn)備好的case
第一部分的代碼如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... var casi int //準(zhǔn)備好的case位置 var cas *scase //case對(duì)象 var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool //如果是從通道讀取數(shù)據(jù),是否讀取成功 for _, casei := range pollorder { //遍歷隨機(jī)順序的case casi = int(casei) //case的位置 cas = &scases[casi] //case對(duì)象 c = cas.c //case通道 if casi >= nsends { //前面講過,寫通道在前,讀通道在后,所以這里是讀通道case sg = c.sendq.dequeue() //取出往讀通道寫數(shù)據(jù)的協(xié)程隊(duì)列中的第一個(gè)協(xié)程 if sg != nil { //如果存在往通道寫數(shù)據(jù)的協(xié)程 goto recv //從往通道寫數(shù)據(jù)的協(xié)程中讀取數(shù)據(jù)并返回case位置和讀取結(jié)果 } if c.qcount > 0 { //如果緩沖區(qū)還有數(shù)據(jù) goto bufrecv //從緩沖區(qū)讀取數(shù)據(jù)并返回case位置和讀取結(jié)果 } if c.closed != 0 { //如果通道已關(guān)閉 goto rclose //釋放相關(guān)資源 } } else { //寫通道的case if raceenabled { racereadpc(c.raceaddr(), casePC(casi), chansendpc) } if c.closed != 0 { //如果通道已經(jīng)關(guān)閉 goto sclose //直接panic } sg = c.recvq.dequeue() //從正在往通道讀數(shù)據(jù)的協(xié)程隊(duì)列中取得第一個(gè) if sg != nil { //如果往通道讀數(shù)據(jù)的協(xié)程存在 goto send //發(fā)送數(shù)據(jù)到讀通道的協(xié)程 } if c.qcount < c.dataqsiz { //緩沖區(qū)還有位置 goto bufsend } } } if !block { //如果不阻塞,也就是帶default分支 selunlock(scases, lockorder) casi = -1 //case位置為-1 goto retc //直接返回,不用進(jìn)入下一步 } ... }
bufrecv標(biāo)簽:
bufrecv: recvOK = true //返回讀數(shù)據(jù)成功 qp = chanbuf(c, c.recvx) //緩沖區(qū)中要讀取數(shù)據(jù)的地址 if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) //將讀取的緩沖區(qū)數(shù)據(jù)拷貝到case中的elem位置 } typedmemclr(c.elemtype, qp) //清理緩沖區(qū)被讀的數(shù)據(jù) c.recvx++ //讀取緩沖區(qū)的位置+1 if c.recvx == c.dataqsiz { //下一個(gè)要讀取緩沖區(qū)的位置如果等于緩沖區(qū)大小就將下次要讀取的緩沖區(qū)位置置為0 c.recvx = 0 } c.qcount-- //緩沖區(qū)中元素個(gè)數(shù)-1 selunlock(scases, lockorder) goto retc
bufsend標(biāo)簽:
bufsend: typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) //將case中要寫入的元素寫到緩沖區(qū) c.sendx++ //寫入緩沖區(qū)的位置+1 if c.sendx == c.dataqsiz { //如果下次要寫入緩沖區(qū)的位置等于緩沖區(qū)的大小就將緩沖區(qū)寫入位置置為開頭 c.sendx = 0 } c.qcount++ //緩沖區(qū)元素?cái)?shù)量+1 selunlock(scases, lockorder) goto retc
recv標(biāo)簽:
recv: recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //從寫通道的協(xié)程讀取數(shù)據(jù) if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true //返回成功讀取 goto retc
rclose標(biāo)簽:
rclose: selunlock(scases, lockorder) recvOK = false //從通道中讀取數(shù)據(jù)失敗 if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) //釋放case中元素的空間 } if raceenabled { raceacquire(c.raceaddr()) } goto retc
send標(biāo)簽:
send: send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //發(fā)送數(shù)據(jù)到往通道讀數(shù)據(jù)的協(xié)程 if debugSelect { print("syncsend: cas0=", cas0, " c=", c, "\n") } goto retc
retc標(biāo)簽:
retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK //返回case位置和是否從通道成功讀取數(shù)據(jù)
sclose標(biāo)簽:
sclose: selunlock(scases, lockorder) panic(plainError("send on closed channel"))
上面就是selectgo函數(shù)第一部分的邏輯,第一部分就是遍歷一個(gè)隨機(jī)的case順序,如果有符合條件的case就返回case的位置并且返回讀數(shù)據(jù)的結(jié)果,如果沒有case符合條件但是有default分支就返回-1,如果沒default分支就進(jìn)入下一步。
第二步:將當(dāng)前goroutine放到所有case通道中對(duì)應(yīng)的收發(fā)隊(duì)列上
第二部分的代碼如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... gp = getg() //獲取當(dāng)前協(xié)程 if gp.waiting != nil { throw("gp.waiting != nil") } nextp = &gp.waiting for _, casei := range lockorder { //按照對(duì)case加鎖的順序遍歷case casi = int(casei) //case的位置 cas = &scases[casi] //case對(duì)象 c = cas.c //case對(duì)象中的通道 sg := acquireSudog() //初始化一個(gè)協(xié)程等待結(jié)構(gòu) sg.g = gp //協(xié)程等待結(jié)構(gòu)綁定協(xié)程 sg.isSelect = true //表示該協(xié)程等待結(jié)構(gòu)與select操作相關(guān) sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c *nextp = sg nextp = &sg.waitlink if casi < nsends { //如果case上是往通道寫數(shù)據(jù),就將綁定當(dāng)前協(xié)程的等待對(duì)象插入當(dāng)前case通道的發(fā)送隊(duì)列中 c.sendq.enqueue(sg) } else { //如果case上是往通道讀數(shù)據(jù),就將綁定當(dāng)前協(xié)程的等待對(duì)象插入當(dāng)前case通道的接收隊(duì)列中 c.recvq.enqueue(sg) } } ... }
第二部分就是將當(dāng)前協(xié)程放到每個(gè)case中的通道對(duì)應(yīng)的收發(fā)隊(duì)列中去。
第三步:喚醒groutine
第三部分代碼如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... sg = (*sudog)(gp.param) //被喚醒的協(xié)程等待結(jié)構(gòu) gp.param = nil casi = -1 //case位置 cas = nil //case對(duì)象 caseSuccess = false sglist = gp.waiting //lockorder順序的協(xié)程等待結(jié)構(gòu)隊(duì)列,這里是隊(duì)列中的第一個(gè)協(xié)程等待結(jié)構(gòu) for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { //清空協(xié)程等待結(jié)構(gòu)隊(duì)列中元素便于進(jìn)行垃圾回收 sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { //根據(jù)對(duì)case的加鎖順序進(jìn)行遍歷 k = &scases[casei] //當(dāng)前case if sg == sglist { //喚醒的協(xié)程等待結(jié)構(gòu)是當(dāng)前case的 casi = int(casei) //喚醒的case位置 cas = k //喚醒的case對(duì)象 caseSuccess = sglist.success //往通道讀取或?qū)憯?shù)據(jù)結(jié)果 if sglist.releasetime > 0 { caseReleaseTime = sglist.releasetime } } else { //喚醒的協(xié)程等待結(jié)構(gòu)不是當(dāng)前case的 c = k.c if int(casei) < nsends { //case為發(fā)送通道,就是釋放當(dāng)前case通道里sendq隊(duì)列的協(xié)程等待結(jié)構(gòu)對(duì)象 c.sendq.dequeueSudoG(sglist) } else { //case為讀取通道,就是釋放當(dāng)前case通道里recvq隊(duì)列的協(xié)程等待結(jié)構(gòu)對(duì)象 c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink //下一個(gè)協(xié)程等待結(jié)構(gòu) sglist.waitlink = nil releaseSudog(sglist) //釋放上一個(gè)協(xié)程等待結(jié)構(gòu) sglist = sgnext } ... }
第三部分就是某一個(gè)case上的協(xié)程等待結(jié)構(gòu)被喚醒時(shí),會(huì)先執(zhí)行通道上對(duì)應(yīng)的收發(fā)操作, 然后去將所有case上的協(xié)程等待結(jié)構(gòu)釋放掉。
總結(jié)
select雖然使用起來簡單,但其實(shí)現(xiàn)邏輯還是比較復(fù)雜的,通過熟悉其實(shí)現(xiàn),我們能理解對(duì)多個(gè)通道進(jìn)行操作時(shí)候,可以為每一個(gè)通道創(chuàng)建一個(gè)協(xié)程去操作,這無疑增加了GC開銷,但是使用select采用了多路復(fù)用的思想,將一個(gè)協(xié)程綁定在多個(gè)協(xié)程等待對(duì)象上,而且對(duì)case使用了隨機(jī)順序,確保每一個(gè)case都能公平的被執(zhí)行。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Go設(shè)計(jì)模式之狀態(tài)模式講解和代碼示例
狀態(tài)是一種行為設(shè)計(jì)模式,?讓你能在一個(gè)對(duì)象的內(nèi)部狀態(tài)變化時(shí)改變其行為,該模式將與狀態(tài)相關(guān)的行為抽取到獨(dú)立的狀態(tài)類中,?讓原對(duì)象將工作委派給這些類的實(shí)例,?而不是自行進(jìn)行處理,本文將通過代碼示例給大家簡單的介紹一下Go狀態(tài)模式2023-08-08如何解析golang中Context在HTTP服務(wù)中的角色
這篇文章主要介紹了如何解析golang中Context在HTTP服務(wù)中的角色問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-03-03基于Go語言實(shí)現(xiàn)一個(gè)并發(fā)下載器
這篇文章主要為大家詳細(xì)介紹了如何利用GO語言實(shí)現(xiàn)一個(gè)并發(fā)的文件下載器,可以在不重新啟動(dòng)整個(gè)下載的情況下處理錯(cuò)誤,感興趣的小伙伴可以了解一下2023-10-10Go基礎(chǔ)教程系列之defer、panic和recover詳解
這篇文章主要介紹了Go基礎(chǔ)教程系列之defer、panic和recover,需要的朋友可以參考下2022-04-04go語言中的udp協(xié)議及TCP通訊實(shí)現(xiàn)示例
這篇文章主要為大家介紹了go語言中的udp協(xié)議及TCP通訊的實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04