Go channel實(shí)現(xiàn)原理分析
channel
單純地將函數(shù)并發(fā)執(zhí)行是沒(méi)有意義的。函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義。
雖然可以使用共享內(nèi)存進(jìn)行數(shù)據(jù)交換,但是共享內(nèi)存在不同的goroutine中容易發(fā)生競(jìng)態(tài)問(wèn)題。為了保證數(shù)據(jù)交換的正確性,必須使用互斥量對(duì)內(nèi)存進(jìn)行加鎖,這種做法勢(shì)必造成性能問(wèn)題。
Go語(yǔ)言的并發(fā)模型是CSP(Communicating Sequential Processes),提倡通過(guò)通信共享內(nèi)存而不是通過(guò)共享內(nèi)存而實(shí)現(xiàn)通信。
如果說(shuō)goroutine是Go程序并發(fā)的執(zhí)行體,channel就是它們之間的連接。channel是可以讓一個(gè)goroutine發(fā)送特定值到另一個(gè)goroutine的通信機(jī)制。
Go 語(yǔ)言中的通道(channel)是一種特殊的類型。通道像一個(gè)傳送帶或者隊(duì)列,總是遵循先入先出(First In First Out)的規(guī)則,保證收發(fā)數(shù)據(jù)的順序。每一個(gè)通道都是一個(gè)具體類型的導(dǎo)管,也就是聲明channel的時(shí)候需要為其指定元素類型。
channel(管道)底層是一個(gè)環(huán)形隊(duì)列(先進(jìn)先出),send(插入)和recv(取走)從同一個(gè)位置沿同一個(gè)方向順序執(zhí)行。sendx表示最后一次插入元素的位置,recvx表示最后一次取走元素的位置。
var ch chan int //管道的聲明 ch = make(chan int, 8) //管道的初始化,環(huán)形隊(duì)列里可容納8個(gè)int ch <- 1 //往管道里寫入(send)數(shù)據(jù) ch <- 2 ch <- 3 ch <- 4 ch <- 5 v := <-ch //從管道里取走(recv)數(shù)據(jù) fmt.Println(v) v = <-ch fmt.Println(v)
channel類型
channel是一種類型,一種引用類型。聲明通道類型的格式如下:
var 變量 chan 元素類型
舉幾個(gè)例子:
var ch1 chan int // 聲明一個(gè)傳遞整型的通道 var ch2 chan bool // 聲明一個(gè)傳遞布爾型的通道 var ch3 chan []int // 聲明一個(gè)傳遞int切片的通道
創(chuàng)建channel
通道是引用類型,通道類型的空值是nil。
var ch chan int fmt.Println(ch) // <nil>
聲明的通道后需要使用make函數(shù)初始化之后才能使用。
創(chuàng)建channel的格式如下:
make(chan 元素類型, [緩沖大小])
channel的緩沖大小是可選的。
舉幾個(gè)例子:
ch4 := make(chan int) ch5 := make(chan bool) ch6 := make(chan []int)
channel操作
通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作。
發(fā)送和接收都使用<-符號(hào)。
現(xiàn)在我們先使用以下語(yǔ)句定義一個(gè)通道:
ch := make(chan int)
發(fā)送
將一個(gè)值發(fā)送到通道中。
ch <- 10 // 把10發(fā)送到ch中
接收
從一個(gè)通道中接收值。
x := <- ch // 從ch中接收值并賦值給變量x <-ch // 從ch中接收值,忽略結(jié)果
關(guān)閉
我們通過(guò)調(diào)用內(nèi)置的close函數(shù)來(lái)關(guān)閉通道。
close(ch)
關(guān)于關(guān)閉通道需要注意的事情是,只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時(shí)候才需要關(guān)閉通道。通道是可以被垃圾回收機(jī)制回收的,它和關(guān)閉文件是不一樣的,在結(jié)束操作之后關(guān)閉文件是必須要做的,但關(guān)閉通道不是必須的。
關(guān)閉后的通道有以下特點(diǎn):
- 對(duì)一個(gè)關(guān)閉的通道再發(fā)送值就會(huì)導(dǎo)致panic。
- 對(duì)一個(gè)關(guān)閉的通道進(jìn)行接收會(huì)一直獲取值直到通道為空。
- 對(duì)一個(gè)關(guān)閉的并且沒(méi)有值的通道執(zhí)行接收操作會(huì)得到對(duì)應(yīng)類型的零值。
- 關(guān)閉一個(gè)已經(jīng)關(guān)閉的通道會(huì)導(dǎo)致panic。
無(wú)緩沖的通道
無(wú)緩沖的通道又稱為阻塞的通道。我們來(lái)看一下下面的代碼:
func main() { ch := make(chan int) ch <- 10 fmt.Println("發(fā)送成功") }
上面這段代碼能夠通過(guò)編譯,但是執(zhí)行的時(shí)候會(huì)出現(xiàn)以下錯(cuò)誤:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
為什么會(huì)出現(xiàn)deadlock錯(cuò)誤呢?
因?yàn)槲覀兪褂胏h := make(chan int)創(chuàng)建的是無(wú)緩沖的通道,無(wú)緩沖的通道只有在有人接收值的時(shí)候才能發(fā)送值。就像你住的小區(qū)沒(méi)有快遞柜和代收點(diǎn),快遞員給你打電話必須要把這個(gè)物品送到你的手中,簡(jiǎn)單來(lái)說(shuō)就是無(wú)緩沖的通道必須有接收才能發(fā)送。
上面的代碼會(huì)阻塞在ch <- 10這一行代碼形成死鎖,那如何解決這個(gè)問(wèn)題呢?
一種方法是啟用一個(gè)goroutine去接收值,例如:
func recv(c chan int) { ret := <-c fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 啟用goroutine從通道接收值 ch <- 10 fmt.Println("發(fā)送成功") }
無(wú)緩沖通道上的發(fā)送操作會(huì)阻塞,直到另一個(gè)goroutine在該通道上執(zhí)行接收操作,這時(shí)值才能發(fā)送成功,兩個(gè)goroutine將繼續(xù)執(zhí)行。相反,如果接收操作先執(zhí)行,接收方的goroutine將阻塞,直到另一個(gè)goroutine在該通道上發(fā)送一個(gè)值。
使用無(wú)緩沖通道進(jìn)行通信將導(dǎo)致發(fā)送和接收的goroutine同步化。因此,無(wú)緩沖通道也被稱為同步通道。
有緩沖的通道
解決上面問(wèn)題的方法還有一種就是使用有緩沖區(qū)的通道。
我們可以在使用make函數(shù)初始化通道的時(shí)候?yàn)槠渲付ㄍǖ赖娜萘浚纾?/p>
func main() { ch := make(chan int, 1) // 創(chuàng)建一個(gè)容量為1的有緩沖區(qū)通道 ch <- 10 fmt.Println("發(fā)送成功") }
只要通道的容量大于零,那么該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數(shù)量。就像你小區(qū)的快遞柜只有那么個(gè)多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個(gè)快遞員就能往里面放一個(gè)。
我們可以使用內(nèi)置的len函數(shù)獲取通道內(nèi)元素的數(shù)量,使用cap函數(shù)獲取通道的容量,雖然我們很少會(huì)這么做。
close()
可以通過(guò)內(nèi)置的close()函數(shù)關(guān)閉channel(如果你的管道不往里存值或者取值的時(shí)候一定記得關(guān)閉管道)
package main import "fmt" func main() { c := make(chan int) go func() { for i := 0; i < 5; i++ { c <- i } close(c) }() for { if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("main結(jié)束") }
如何優(yōu)雅的從通道循環(huán)取值
當(dāng)通過(guò)通道發(fā)送有限的數(shù)據(jù)時(shí),我們可以通過(guò)close函數(shù)關(guān)閉通道來(lái)告知從該通道接收值的goroutine停止等待。當(dāng)通道被關(guān)閉時(shí),往該通道發(fā)送值會(huì)引發(fā)panic,從該通道里接收的值一直都是類型零值。那如何判斷一個(gè)通道是否被關(guān)閉了呢?
我們來(lái)看下面這個(gè)例子:
// channel 練習(xí) func main() { ch1 := make(chan int) ch2 := make(chan int) // 開啟goroutine將0~100的數(shù)發(fā)送到ch1中 go func() { for i := 0; i < 100; i++ { ch1 <- i } close(ch1) }() // 開啟goroutine從ch1中接收值,并將該值的平方發(fā)送到ch2中 go func() { for { i, ok := <-ch1 // 通道關(guān)閉后再取值ok=false if !ok { break } ch2 <- i * i } close(ch2) }() // 在主goroutine中從ch2中接收值打印 for i := range ch2 { // 通道關(guān)閉后會(huì)退出for range循環(huán) fmt.Println(i) } }
從上面的例子中我們看到有兩種方式在接收值的時(shí)候判斷通道是否被關(guān)閉,我們通常使用的是for range的方式。
單向通道
有的時(shí)候我們會(huì)將通道作為參數(shù)在多個(gè)任務(wù)函數(shù)間傳遞,很多時(shí)候我們?cè)诓煌娜蝿?wù)函數(shù)中使用通道都會(huì)對(duì)其進(jìn)行限制,比如限制通道在函數(shù)中只能發(fā)送或只能接收。
Go語(yǔ)言中提供了單向通道來(lái)處理這種情況。例如,我們把上面的例子改造如下:
func counter(out chan<- int) { for i := 0; i < 100; i++ { out <- i } close(out) } func squarer(out chan<- int, in <-chan int) { for i := range in { out <- i * i } close(out) } func printer(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := make(chan int) ch2 := make(chan int) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
其中,
1.chan<- int是一個(gè)只能發(fā)送的通道,可以發(fā)送但是不能接收;
2.<-chan int是一個(gè)只能接收的通道,可以接收但是不能發(fā)送。
在函數(shù)傳參及任何賦值操作中將雙向通道轉(zhuǎn)換為單向通道是可以的,但反過(guò)來(lái)是不可以的。
read_only := make (<-chan int) //定義只讀的channel write_only := make (chan<- int) //定義只寫的channel
定義只讀和只寫的channel意義不大,一般用于在參數(shù)傳遞中。
//只能向channel里寫數(shù)據(jù) func send(c chan<- int) { c <- 1 } //只能取channel中的數(shù)據(jù) func recv(c <-chan int) { _ = <-c } //返回一個(gè)只讀channel func (c *Context) Done() <-chan struct{} { return nil }
通道遍歷
可以通過(guò)for range的方式遍歷管道,遍歷前必須先關(guān)閉管道,禁止再寫入元素。
close(ch) //遍歷前必須先關(guān)閉管道,禁止再寫入元素 //遍歷管道里剩下的元素 for ele := range ch { fmt.Println(ele) }
slice、map和channel是go語(yǔ)言里的3種引用類型,都可以通過(guò)make函數(shù)來(lái)進(jìn)行初始化(申請(qǐng)內(nèi)存分配)。因?yàn)樗鼈兌及粋€(gè)指向底層數(shù)據(jù)結(jié)構(gòu)的指針,所以稱之為“引用”類型。引用類型未初始化時(shí)都是nil,可以對(duì)它們執(zhí)行l(wèi)en()函數(shù),返回0。
異步通道
異步管道
asynChann := make(chan int, 8)
channel底層維護(hù)一個(gè)環(huán)形隊(duì)列(先進(jìn)先出),make初始化時(shí)指定隊(duì)列的長(zhǎng)度。隊(duì)列滿時(shí),寫阻塞;隊(duì)列空時(shí),讀阻塞。sendx指向下一次寫入的位置, recvx指向下一次讀取的位置。 recvq維護(hù)因讀管道而被阻塞的協(xié)程,sendq維護(hù)因?qū)懝艿蓝蛔枞膮f(xié)程。
同步管道可以認(rèn)為隊(duì)列容量為0,當(dāng)讀協(xié)程和寫協(xié)程同時(shí)就緒時(shí)它們才會(huì)彼此幫對(duì)方解除阻塞。
syncChann := make(chan int)
channel僅作為協(xié)程間同步的工具,不需要傳遞具體的數(shù)據(jù),管道類型可以用struct{}。空結(jié)構(gòu)體變量的內(nèi)存占用為0,因此struct{}類型的管道比bool類型的管道還要省內(nèi)存。
sc := make(chan struct{}) sc <- struct{}{}
關(guān)于channel的死鎖與阻塞
- Channel滿了,就阻塞寫;Channel空了,就阻塞讀。
- 阻塞之后會(huì)交出cpu,去執(zhí)行其他協(xié)程,希望其他協(xié)程能幫自己解除阻塞。
- 如果阻塞發(fā)生在main協(xié)程里,并且沒(méi)有其他子協(xié)程可以執(zhí)行,那就可以確定“希望永遠(yuǎn)等不來(lái)”,自已把自己殺掉,報(bào)一個(gè)fatal error:deadlock出來(lái)。
- 如果阻塞發(fā)生在子協(xié)程里,就不會(huì)發(fā)生死鎖,因?yàn)橹辽賛ain協(xié)程是一個(gè)值得等待的“希望”,會(huì)一直等(阻塞)下去。
package main import ( "fmt" "time" ) func main() { ch := make(chan struct{}, 1) ch <- struct{}{} //有1個(gè)緩沖可以用,無(wú)需阻塞,可以立即執(zhí)行 go func() { //子協(xié)程1 time.Sleep(5 * time.Second) //sleep一個(gè)很長(zhǎng)的時(shí)間 <-ch //如果把本行代碼注釋掉,main協(xié)程5秒鐘后會(huì)報(bào)fatal error fmt.Println("sub routine 1 over") }() ch <- struct{}{} //由于子協(xié)程1已經(jīng)啟動(dòng),寄希望于子協(xié)程1幫自己解除阻塞,所以會(huì)一直等子協(xié)程1執(zhí)行結(jié)束。如果子協(xié)程1執(zhí)行結(jié)束后沒(méi)幫自己解除阻塞,則希望完全破滅,報(bào)出deadlock fmt.Println("send to channel in main routine") go func() { //子協(xié)程2 time.Sleep(2 * time.Second) ch <- struct{}{} //channel已滿,子協(xié)程2會(huì)一直阻塞在這一行 fmt.Println("sub routine 2 over") }() time.Sleep(3 * time.Second) fmt.Println("main routine exit") }
send to channel in main routine
sub routine 1 over
main routine exit
關(guān)閉channel
- 只有當(dāng)管道關(guān)閉時(shí),才能通過(guò)range遍歷管道里的數(shù)據(jù),否則會(huì)發(fā)生fatal error。
- 管道關(guān)閉后讀操作會(huì)立即返回,如果緩沖已空會(huì)返回“0值”。
- ele, ok := <-ch ok==true代表ele是管道里的真實(shí)數(shù)據(jù)。
- 向已關(guān)閉的管道里send數(shù)據(jù)會(huì)發(fā)生panic。
- 不能重復(fù)關(guān)閉管道,不能關(guān)閉值為nil的管道,否則都會(huì)panic。
package main import ( "fmt" "time" ) var cloch = make(chan int, 1) var cloch2 = make(chan int, 1) func traverseChannel() { for ele := range cloch { fmt.Printf("receive %d\n", ele) } fmt.Println() } func traverseChannel2() { for { if ele, ok := <-cloch2; ok { //ok==true代表管道還沒(méi)有close fmt.Printf("receive %d\n", ele) } else { //管道關(guān)閉后,讀操作會(huì)立即返回“0值” fmt.Printf("channel have been closed, receive %d\n", ele) break } } } func main() { cloch <- 1 close(cloch) traverseChannel() //如果不close就直接通過(guò)range遍歷管道,會(huì)發(fā)生fatal error: all goroutines are asleep - deadlock! fmt.Println("==================") go traverseChannel2() cloch2 <- 1 close(cloch2) time.Sleep(10 * time.Millisecond) }
channel在并發(fā)編程中有多種玩法,經(jīng)常用channel來(lái)實(shí)現(xiàn)協(xié)程間的同步。
package main import ( "fmt" "time" ) func upstream(ch chan struct{}) { time.Sleep(15 * time.Millisecond) fmt.Println("一個(gè)上游協(xié)程執(zhí)行結(jié)束") ch <- struct{}{} } func downstream(ch chan struct{}) { <-ch fmt.Println("下游協(xié)程開始執(zhí)行") } func main() { upstreamNum := 4 //上游協(xié)程的數(shù)量 downstreamNum := 5 //下游協(xié)程的數(shù)量 upstreamCh := make(chan struct{}, upstreamNum) downstreamCh := make(chan struct{}, downstreamNum) //啟動(dòng)上游協(xié)程和下游協(xié)程,實(shí)際下游協(xié)程會(huì)先阻塞 for i := 0; i < upstreamNum; i++ { go upstream(upstreamCh) } for i := 0; i < downstreamNum; i++ { go downstream(downstreamCh) } //同步點(diǎn) for i := 0; i < upstreamNum; i++ { <-upstreamCh } //通過(guò)管道讓下游協(xié)程開始執(zhí)行 for i := 0; i < downstreamNum; i++ { downstreamCh <- struct{}{} } time.Sleep(10 * time.Millisecond) //等下游協(xié)程執(zhí)行結(jié)束 }
通道總結(jié)
channel常見(jiàn)的異??偨Y(jié),如下圖:
注意:關(guān)閉已經(jīng)關(guān)閉的channel也會(huì)引發(fā)panic。
到此這篇關(guān)于Go channel實(shí)現(xiàn)原理分析的文章就介紹到這了,更多相關(guān)Go channel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang限流器time/rate設(shè)計(jì)與實(shí)現(xiàn)詳解
在?Golang?庫(kù)中官方給我們提供了限流器的實(shí)現(xiàn)golang.org/x/time/rate,它是基于令牌桶算法(Token?Bucket)設(shè)計(jì)實(shí)現(xiàn)的,下面我們就來(lái)看看他的具體使用吧2024-03-03go 下載非標(biāo)準(zhǔn)庫(kù)包(部份包被墻了)到本地使用的方法
今天小編就為大家分享一篇go 下載非標(biāo)準(zhǔn)庫(kù)包(部份包被墻了)到本地使用的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-06-06Go語(yǔ)言開發(fā)技巧必知的小細(xì)節(jié)提升效率
這篇文章主要介紹了Go語(yǔ)言開發(fā)技巧必知的小細(xì)節(jié)提升效率,分享幾個(gè)你可能不知道的Go語(yǔ)言小細(xì)節(jié),希望能幫助大家更好地學(xué)習(xí)這門語(yǔ)言2024-01-01go開源Hugo站點(diǎn)構(gòu)建三步曲之集結(jié)渲染
這篇文章主要為大家介紹了go開源Hugo站點(diǎn)構(gòu)建三步曲之集結(jié)渲染詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02深入解析Go語(yǔ)言的io.ioutil標(biāo)準(zhǔn)庫(kù)使用
這篇文章主要介紹了Go語(yǔ)言的io.ioutil標(biāo)準(zhǔn)庫(kù)使用,是Golang入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10golang socket斷點(diǎn)續(xù)傳大文件的實(shí)現(xiàn)方法
今天小編就為大家分享一篇golang socket斷點(diǎn)續(xù)傳大文件的實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07在Colaboratory上運(yùn)行Go程序的詳細(xì)過(guò)程
這篇文章主要介紹了在Colaboratory上運(yùn)行Go程序,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08