Go語言七篇入門教程四通道及Goroutine
1. 前言
在go社區(qū)有這樣一句話
不要通過共享內(nèi)存來通信,而是通過通信來共享內(nèi)存。
go官方是建議使用管道通信的方式來進行并發(fā)。
通道 是用于協(xié)程間交流的通信載體。嚴格地來說,通道就是數(shù)據(jù)傳輸?shù)墓艿?,?shù)據(jù)通過這根管道被 “傳入” 或被 “讀出”。 因此協(xié)程可以發(fā)送數(shù)據(jù)到通道中,而另一個協(xié)程可以從該通道中讀取數(shù)據(jù)。
在這里就要引入一個新名詞:協(xié)程
將線程再細分為多個協(xié)程,比如說是一條流水線上的多人協(xié)作。那么就可以減少各個線程內(nèi)部的等待時間。
2. 通道簡介
Go 提供一個 chan 關(guān)鍵詞去創(chuàng)建一個通道。一個通道只能傳入一種類型的數(shù)據(jù),其他的數(shù)據(jù)類型不允許被傳輸。
將線程再分成更細的協(xié)程,使得中間等待時候更少,提高效率!
2.1 聲明
package main import "fmt" func main(){ var channel chan int //聲明了一個可以傳入 int 類型數(shù)據(jù)的通道 channel 。 fmt.Println(channel) //程序會打印nil, 因為通道的 0 值是 nil。 }
一個 nil 通道是沒有用的。你不能向它傳遞數(shù)據(jù)或者讀取數(shù)據(jù)。
因此,我們必須使用 make 函數(shù)器創(chuàng)建一個可以使用的通道。
package main import "fmt" func main(){ channel := make(chan int) //聲明了一個可以傳入 int 類型數(shù)據(jù)的通道 channel 。 fmt.Println(channel) //程序會打印channel的地址。 0xc0000180c0 }
它是一個指針內(nèi)存地址。通道變量默認是一個指針。多數(shù)情況下,當你想要和一個協(xié)程溝通的時候,你可以給函數(shù)或者方法傳遞一個通道作為參數(shù)。當從協(xié)程接收到通道參數(shù)后,你不需要再對其進行解引用就可以從通道接收或者發(fā)送數(shù)據(jù)。
2.1 讀寫
Go 語言提供一個非常簡潔的左箭頭語法 <-
去從通道讀寫數(shù)據(jù)。
有變量接受管道值
channel <- data
上面的代碼意味著我們想要把 data 數(shù)據(jù)推入到通道 channel 中,注意看箭頭的指向。
它表明是從 data數(shù)據(jù) to到 通道 channel。
因此我們可以當作我們正在把 data 推入到通道 channel。
無變量接受管道值
<- data
這個語句不會把數(shù)據(jù)傳輸給任何變量,但是仍然是一個有效的語句。
上面的通道操作默認是阻塞的。
在以前的課程中,我們知道可以使用 time.Sleep 去阻塞一個通道。通道操作本質(zhì)上是阻塞的。當一些數(shù)據(jù)被寫入通道,對應(yīng)的協(xié)程將阻塞直到有其他的協(xié)程可以從此通道接收數(shù)據(jù)。
通道操作會通知調(diào)度器去調(diào)度其他的協(xié)程,這就是為什么程序不會一直阻塞在一個協(xié)程。通道的這些特性在不同的協(xié)程溝通的時候非常有用,它避免了我們使用鎖或者一些 hack 手段去達到阻塞協(xié)程的目的。
2.3 通道詳解
2.3.1 例子
package main import "fmt" func Rush(c chan string) { fmt.Println("Hello "+ <-c + "!") // 聲明一個函數(shù) greet, 這個函數(shù)的參數(shù) c 是一個 string 類型的通道。 // 在這個函數(shù)中,我們從通道 c 中接收數(shù)據(jù)并打印到控制臺上。 } func main(){ fmt.Println("Main Start") // main 函數(shù)的第一個語句是打印 main start 到控制臺。 channel := make(chan string) // 在 main 函數(shù)中使用 make 函數(shù)創(chuàng)建一個 string 類型的通道賦值給 ‘ channel ' 變量 go Rush(channel) // 把 channel 通道傳遞給 greet 函數(shù)并用 go 關(guān)鍵詞以協(xié)程方式運行它。 // 此時,程序有兩個協(xié)程并且正在調(diào)度運行的是 main goroutine 主函數(shù) channel <- "DEMO" // 給通道 channel 傳入一個數(shù)據(jù) DEMO. // 此時主線程將阻塞直到有協(xié)程接收這個數(shù)據(jù). Go 的調(diào)度器開始調(diào)度 greet 協(xié)程接收通道 channel 的數(shù)據(jù) fmt.Println("Main Stop") // 然后主線程激活并且執(zhí)行后面的語句,打印 main stopped } /* Main Start Hello DEMO! Main Stop */
2.3.2 死鎖
當通道讀寫數(shù)據(jù)時,所在協(xié)程會阻塞并且調(diào)度控制權(quán)會轉(zhuǎn)移到其他未阻塞的協(xié)程。
如果當前協(xié)程正在從一個沒有任何值的通道中讀取數(shù)據(jù),那么當前協(xié)程會阻塞并且等待其他協(xié)程往此通道寫入值。
因此,讀操作將被阻塞。類似的,如果你發(fā)送數(shù)據(jù)到一個通道,它將阻塞當前協(xié)程直到有其他協(xié)程從通道中讀取數(shù)據(jù)。此時寫操作將阻塞 。
下面是一個主線程在進行通道操作的時候造成死鎖的例子
package main import "fmt" func main() { fmt.Println("main start") // main 函數(shù)的第一個語句是打印 main start 到控制臺。 channel := make(chan string) // 在 main 函數(shù)中使用 make 函數(shù)創(chuàng)建一個 string 類型的通道賦值給 ‘ channel ' 變量 channel <- "GoLang" // 給通道 channel 傳入一個數(shù)據(jù) DEMO. // 此時主線程將阻塞直到有協(xié)程接收這個數(shù)據(jù). Go 的調(diào)度器開始調(diào)度協(xié)程接收通道 channel 的數(shù)據(jù) // 但是由于沒有協(xié)程接受,沒有協(xié)程是可被調(diào)度的。所有協(xié)程都進入休眠狀態(tài),即是主程序阻塞了。 fmt.Println("main stop") } /* 報錯 main start fatal error: all goroutines are asleep - deadlock! //所有協(xié)程都進入休眠狀態(tài),死鎖 goroutine 1 [chan send]: main.main() */
2.3.3 關(guān)閉通道
package main import "fmt" func RushChan(c chan string) { <- c fmt.Println("1") <- c fmt.Println("2") } func main() { fmt.Println("main start") c := make(chan string, 1) go RushChan(c) c <- "Demo1" close(c) /* 不能向一個關(guān)了的channel發(fā)信息 main start panic: send on closed channel */ c <- "Demo2" //close(c) /* close 放這里的話可以 main start 1 2 Main Stop */ fmt.Println("Main Stop") }
第一個操作 c <- "Demo2"
將阻塞協(xié)程直到有其他協(xié)程從此通道中讀取數(shù)據(jù),因此 greet 會被調(diào)度器調(diào)度執(zhí)行。
第一個操作 <-c
是非阻塞的 因為現(xiàn)在通道c
有數(shù)據(jù)可讀。
第二個操作 <-c
將被阻塞因為通道c
已經(jīng)沒數(shù)據(jù)可讀.
此時main
協(xié)程將被激活并且程序執(zhí)行close(c)
關(guān)閉通道操作。
2.3.4 緩沖區(qū)
c := make(chan Type, n)
當緩沖區(qū)參數(shù)不是 0 的時候。協(xié)程將不會阻塞除非緩沖區(qū)被填滿。
當緩沖區(qū)滿了之后,想要再往緩沖區(qū)發(fā)送數(shù)據(jù)只有等到有其他協(xié)程從緩沖區(qū)接收數(shù)據(jù), 此時的發(fā)送協(xié)程是阻塞的。
有一點需要注意, 讀緩沖區(qū)的操作是渴望式讀取,意味著一旦讀操作開始它將讀取緩沖區(qū)所有數(shù)據(jù),直到緩沖區(qū)為空。
原理上來說讀操作的協(xié)程將不會阻塞直到緩沖區(qū)為空。
package main import "fmt" func RushChan(c chan string) { for { val ,_ := <-c fmt.Println(val) } } func main() { fmt.Println("Main Start") c := make(chan string, 1) go RushChan(c) c <- "Demo1" //結(jié)果1 //c <- "Demo2" //結(jié)果2 fmt.Println("Main Stop") } /* 結(jié)果1: Main Start Main Stop */ /* 結(jié)果2: Main Start Join Mike Main Stop */
由于這是一個緩沖的通道,當我只有c <- Demo1
的時候,這里面只是滿了,但是是不會阻塞的。所以子協(xié)程接受到了這個數(shù)據(jù)Demo1
,但是由于是非阻塞,所以主線程沒有被阻塞,并沒有等子協(xié)程完成就結(jié)束了,結(jié)果1就是這樣出現(xiàn)了。
當加多一個c <- Demo2
的時候,這時就要等緩沖區(qū)空了,也就是等有協(xié)程把Demo1
讀取,所以就會導(dǎo)致主線程阻塞,此時的結(jié)果就是結(jié)果2了。
package main import "fmt" func RushChan(c chan string) { for { val ,_ := <-c fmt.Println(val) } } func main() { c := make(chan int,3) c <- 1 c <- 2 c <- 3 close(c) for elem := range c { fmt.Println(elem) } }
這里雖然關(guān)閉了通道,但是其實數(shù)據(jù)不僅在通道里面,數(shù)據(jù)還在緩沖區(qū)中的,我們依然可以讀取到這個數(shù)據(jù)。
2.3.5 通道的長度和容量
和切片類似,一個緩沖通道也有長度和容量。
通道的長度是其內(nèi)部緩沖隊列未讀的數(shù)據(jù)量,而通道的容量是緩沖區(qū)可最大盛放的數(shù)據(jù)量。
我們可以使用 len 函數(shù)去計算通道的長度,使用 cap 函數(shù)去獲得通道的容量。和切片用法神似
package main import "fmt" func RushChan(c chan string) { for { val ,_ := <-c fmt.Println(val) } } func main() { c := make(chan int,3) c <- 1 c <- 2 fmt.Println("長度: ",len(c)) fmt.Println(<-c) fmt.Println("長度: ",len(c)) fmt.Println(<-c) fmt.Println("長度: ",len(c)) fmt.Println("容量: ",cap(c)) } /* 結(jié)果: 長度: 2 1 長度: 1 2 長度: 0 容量: 3 */
這個 c 通道容量為 3,但只盛放了 2 個數(shù)據(jù)。Go 就不用去阻塞主線程去調(diào)度其他協(xié)程。你也可以在主線程中去讀取這些數(shù)據(jù),因為雖然通道沒有放滿,也不會阻止你去從通道讀取數(shù)據(jù)。
2.3.6 單向通道
目前為止,我們已經(jīng)學習到可以雙向傳遞數(shù)據(jù)的通道,或者說,我們可以對通道做讀操作和寫操作。但是事實上我們也可以創(chuàng)建單向通道。比如只讀通道只允許讀操作,只寫通道只允許寫操作。
單向通道也可以使用 make 函數(shù)創(chuàng)建,不過需要額外加一個箭頭語法。
roc := make(<-chan int) soc := make(chan<- int)
在上面的程序中, roc 是一個只讀通道,<- 在 chan 關(guān)鍵詞前面。 soc is 只寫通道,<- 在 chan 關(guān)鍵詞后面。 他們也算不同的數(shù)據(jù)類型。
但是單向通道有什么作用呢 ?
使用單向通道可以 提高程序的類型安全性, 使得程序不容易出錯。
但是假如你在一個協(xié)程中只需要讀操作某通道,但是在主線程中卻需要讀寫操作這個通道該怎么辦呢?
幸運的是 Go 提供了一個簡單的語法去把雙向通道轉(zhuǎn)化為單向通道。
package main import "fmt" func greet(roc <-chan string) { fmt.Println("Hello " + <-roc ,"!") } func main() { fmt.Println("Main Start") c := make(chan string) go greet(c) c <- "Demo" fmt.Println("Main Stop") } /* 結(jié)果 Main Start Hello Demo ! Main Stop */
我們修改 greet 協(xié)程函數(shù),把參數(shù) c 類型從雙向通道改成單向接收通道。
現(xiàn)在我們只能從通道中讀取數(shù)據(jù),通道上的任何寫入操作將會發(fā)生錯誤:
“invalid operation: roc <- “Temp” (send to receive-only type <-chan string)”.
2.3.7 Select
select 和 switch 很像,它不需要輸入?yún)?shù),并且僅僅被使用在通道操作上。
Select 語句被用來執(zhí)行多個通道操作的一個和其附帶的 case 塊代碼。
原理
讓我們來看下面的例子,討論下其執(zhí)行原理
package main import ( "fmt" "time" ) var start time.Time func init() { start = time.Now() } func service1(c chan string) { time.Sleep(3 * time.Second) c <- "Hello from service 1" } func service2(c chan string) { time.Sleep(5 * time.Second) c <- "Hello from service 2" } func main() { fmt.Println("main start", time.Since(start)) chan1 := make(chan string) chan2 := make(chan string) go service1(chan1) go service2(chan2) select { case res := <-chan1: fmt.Println("Response form service 1", res, time.Since(start)) case res := <-chan2: fmt.Println("Response form service 2", res, time.Since(start)) } fmt.Println("main stop ",time.Since(start)) } /* 結(jié)果: main start 0s Response form service 1 Hello from service 1 3.0018445s main stop 3.0019815s */
從上面的程序來看,我們知道 select 語句和 switch 很像,不同點是用通道讀寫操作代替了布爾操作。通道將被阻塞,除非它有默認的 default 塊 (之后將介紹)。一旦某個 case 條件執(zhí)行,它將不阻塞。
所以一個 case 條件什么時候執(zhí)行呢 ?
如果所有的 case 語句(通道操作)被阻塞,那么 select 語句將阻塞直到這些 case 條件的一個不阻塞(通道操作),case 塊執(zhí)行。
如果有多個 case 塊(通道操作)都沒有阻塞,那么運行時將隨機選擇一個不阻塞的 case 塊立即執(zhí)行。
為了演示上面的程序,我們開啟兩個協(xié)程并傳入對應(yīng)的通道變量。然后我們寫一個帶有兩個 case 操作的 select 語句。 一個 case 操作從 chan1 讀數(shù)據(jù),另外一個從 chan2 讀數(shù)據(jù)。這兩個通道都是無緩沖的 , 讀操作將被阻塞 。所以 select 語句將阻塞。因此 select 將等待,直到有 case 語句不阻塞。
- 當程序執(zhí)行到
select
語句后,主線程將阻塞并開始調(diào)度service1
和service2
協(xié)程。service1
休眠 3 秒 后未阻塞的把數(shù)據(jù)寫入通道chan1
與其類似,service2
等待 5 秒 后未阻塞的把數(shù)據(jù)寫入通道chan2
- 因為
service1
比service2
早一步執(zhí)行完畢,case 1
將首先調(diào)度執(zhí)行,其他的cases
塊 (這里指case 2
) 將被忽略。 一旦case
塊執(zhí)行完畢,main
線程將開始繼續(xù)執(zhí)行。
所以并沒有輸出case2的結(jié)果
上述程序真實模擬了一個數(shù)百萬請求的服務(wù)器負載均衡的例子,它從多個有效服務(wù)中返回其中一個響應(yīng)。
使用協(xié)程,通道和 select 語句,我們可以向多個服務(wù)器請求數(shù)據(jù)并獲取其中最快響應(yīng)的那個。
為了模擬上面哪個 case 塊率先返回數(shù)據(jù),我們可以直接去掉 Sleep 函數(shù)調(diào)用。
package main import ( "fmt" "time" ) var start time.Time func init() { start = time.Now() } func service1(c chan string) { c <- "Hello from service 1" } func service2(c chan string) { c <- "Hello from service 2" } func main() { fmt.Println("main start", time.Since(start)) chan1 := make(chan string) chan2 := make(chan string) go service1(chan1) go service2(chan2) select { case res := <-chan1: fmt.Println("Response form service 1", res, time.Since(start)) case res := <-chan2: fmt.Println("Response form service 2", res, time.Since(start)) } fmt.Println("main stop ",time.Since(start)) }
結(jié)果一:
main start 0s
Response form service 1 Hello from service 1 539.3µs
main stop 539.3µs
結(jié)果二:
main start 0s
Response form service 2 Hello from service 2 0s
main stop 0s
結(jié)果一共有2個不同的結(jié)果
為了證明當所有 case 塊都是非阻塞的時候,golang 會隨機選擇一個代碼塊執(zhí)行打印 response,我們使用緩沖通道來改造程序。
package main import ( "fmt" "time" ) var start time.Time func init() { start = time.Now() } func service1(c chan string) { c <- "Hello from service 1" } func service2(c chan string) { c <- "Hello from service 2" } func main() { fmt.Println("main start", time.Since(start)) chan1 := make(chan string,2) chan2 := make(chan string,2) chan1 <- "Value 1" chan1 <- "Value 2" chan2 <- "Value 1" chan2 <- "Value 2" select { case res := <-chan1: fmt.Println("Response form service 1", res, time.Since(start)) case res := <-chan2: fmt.Println("Response form service 2", res, time.Since(start)) } fmt.Println("main stop ",time.Since(start)) }
上述的程序的結(jié)果是有不同的
結(jié)果一:
main start 0s
Response form service 1 Value 1 496.2µs
main stop 496.2µs
結(jié)果二:
main start 0s
Response form service 2 Value 1 0s
main stop 0s
在上面的程序中,兩個通道在其緩沖區(qū)中都有兩個值。因為我們向容量為 2 的緩沖區(qū)通道分別發(fā)送了兩個值,所以這些通道發(fā)送操作不會阻塞并且會執(zhí)行下面的 select 塊。 select 塊中的所有 case 操作都不會阻塞,因為每個通道中都有兩個值,而我們的 case 操作只需要取出其中一個值。因此,go 運行時會隨機選擇一個 case 操作并執(zhí)行其中的代碼。
2.3.8 default case 塊
像 switch
一樣, select
語句也有 default case
塊。default case
塊 是非阻塞的,不僅如此, default case 塊可以使 select 語句永不阻塞,這意味著, 任何通道的 發(fā)送 和 接收 操作 (不管是緩沖或者非緩沖) 都不會阻塞當前線程。
如果有 case
塊的通道操作是非阻塞,那么 select
會執(zhí)行其case
塊。如果沒有那么 select
將默認執(zhí)行 default
塊.
package main import ( "fmt" "time" ) var start time.Time func init() { start = time.Now() } func service1(c chan string) { c <- "Hello from service 1" } func service2(c chan string) { c <- "Hello from service 2" } func main() { fmt.Println("main start", time.Since(start)) chan1 := make(chan string) chan2 := make(chan string) go service1(chan1) go service2(chan2) select { case res := <-chan1: fmt.Println("Response form service 1", res, time.Since(start)) case res := <-chan2: fmt.Println("Response form service 2", res, time.Since(start)) default: fmt.Println("No Response received",time.Since(start)) } fmt.Println("main stop ",time.Since(start)) } /* 結(jié)果: main start 0s No Response received 0s main stop 0s */
- 在上面的程序中,因為通道是非緩沖的,case 塊的通道操作都是阻塞的,所有 default 塊將被執(zhí)行。
- 如果上面的 select 語句沒有 default 塊,select 將阻塞,沒有 response 會被打印出來,知道通道變成非阻塞。
- 如果帶有 default, select 將是非阻塞的,調(diào)度器將不會從主線程轉(zhuǎn)而調(diào)度其他協(xié)程。
- 但是我們可以使用 time.Sleep 改變這一點。 通過這種方式,主線程將把調(diào)度權(quán)轉(zhuǎn)移到其他協(xié)程,在其他協(xié)程執(zhí)行完畢后,調(diào)度權(quán)從新回到主線程手里。
- 當主線程重新執(zhí)行的時候,通道里面已經(jīng)有值了,case 操作將不會阻塞。
package main import ( "fmt" "time" ) var start time.Time func init() { start = time.Now() } func service1(c chan string) { fmt.Println("service1 start") c <- "Hello from service 1" } func service2(c chan string) { fmt.Println("service2 start") c <- "Hello from service 2" } func main() { fmt.Println("main start", time.Since(start)) chan1 := make(chan string) chan2 := make(chan string) go service1(chan1) go service2(chan2) time.Sleep(3*time.Second) select { case res := <-chan1: fmt.Println("Response form service 1", res, time.Since(start)) case res := <-chan2: fmt.Println("Response form service 2", res, time.Since(start)) default: fmt.Println("No Response received",time.Since(start)) } fmt.Println("main stop ",time.Since(start)) } /* 結(jié)果不唯一。 main start 0s service2 start service1 start Response form service 1 Hello from service 1 3.0006729s main stop 3.0006729s */
2.3.9 空 select
和 for{} 這樣的空循環(huán)很像,空 select{} 語法也是有效的。但是有一點必須要說明。
我們知道 select 將被阻塞除非有 case 塊沒有阻塞。因為 select{} 沒有 case 非阻塞語句,主線程將阻塞并可能會導(dǎo)致死鎖。
package main import "fmt" func service() { fmt.Println("Hello from service") } func main() { fmt.Println("main started") go service() select {} fmt.Println("main stop") } /* 結(jié)果 main started Hello from service fatal error: all goroutines are asleep - deadlock! goroutine 1 [select (no cases)]: */
在上面的程序中我們知道 select 將阻塞 main 線程,調(diào)度器將會調(diào)度 service 這個協(xié)程。在 service 執(zhí)行完畢后,調(diào)度器會再去調(diào)度其他可用的協(xié)程,但是此時已經(jīng)沒有可用的協(xié)程,主線程也正在阻塞,所以最后的結(jié)果就是發(fā)生死鎖.
2.3.10 Deadlock
default
塊在通道操作阻塞的時候是非常有用的,他可以避免死鎖。 同時由于 default
塊的非阻塞特性,Go 可以避免在其他協(xié)程阻塞的時候去調(diào)度其他協(xié)程,從而避免死鎖。
通道的發(fā)送操作也類似,, default 可以在其他協(xié)程不能被調(diào)度的時候被執(zhí)行,從而避免死鎖。
2.3.11 nil通道
2.4 多協(xié)程協(xié)同工作
寫兩個協(xié)程,一個用來計算數(shù)字的平方,另一個用來計算數(shù)字的立方。
package main import "fmt" func square(c chan int) { fmt.Println("[square] reading") num := <-c c <- num * num } func cube(c chan int) { fmt.Println("[cube] reading") num := <-c c <- num * num * num } func main() { fmt.Println("[main] main started") squareChan := make(chan int) cubeChan := make(chan int) go square(squareChan) go cube(cubeChan) testNum := 3 fmt.Println("[main] send testNum to squareChan") squareChan <- testNum fmt.Println("[main] resuming") fmt.Println("[main] send testNum to cubeChane") cubeChan <- testNum fmt.Println("[main] resuming") fmt.Println("[main] reading from channels") squareVal,cubeVal := <-squareChan, <-cubeChan sum := squareVal + cubeVal fmt.Println("[main] sum of square and cube of",testNum," is",sum) fmt.Println("[main] main stop") } /* 結(jié)果: [main] main started [main] send testNum to squareChan [cube] reading [square] reading [main] resuming [main] send testNum to cubeChane [main] resuming [main] reading from channels [main] sum of square and cube of 3 is 36 [main] main stop */
流程:
創(chuàng)建兩個函數(shù) square
和 cube
作為協(xié)程運行。
兩個函數(shù)都有一個 int
類型通道參數(shù)c
,從 c
中讀取數(shù)據(jù)到變量num
,最后把計算的數(shù)據(jù)再寫入到通道 c
中。
在主線程中使用 make
函數(shù)創(chuàng)建兩個 int
類型通道 squareChan and cubeChan
然后分別運行square
和cube
協(xié)程。因為調(diào)度權(quán)還在主線程,所以執(zhí)行testNumb
賦值為 3。
然后我們把數(shù)據(jù)放入通道 squareChan
。主線程將阻塞直到通道的數(shù)據(jù)被讀取。 一旦通道的數(shù)據(jù)被讀取,主線程將繼續(xù)執(zhí)行。
在主線程中我們試圖從這兩個通道中讀取數(shù)據(jù),此時線程可能阻塞直到有數(shù)據(jù)寫入到通道。這里我們使用:=
語法來接收多個通道的值。
一旦這些協(xié)程把數(shù)據(jù)寫入到通道,主線程將阻塞。當數(shù)據(jù)被寫入通道中,主線程將繼續(xù)執(zhí)行,最后我們計算出數(shù)字的總和并打印到控制臺。
2.5 WaitGroup
有一種業(yè)務(wù)場景是你需要知道所有的協(xié)程是否已執(zhí)行完成他們的任務(wù)。這個和只需要隨機選擇一個條件為true
的 select
不同,他需要你滿足所有的條件都是 true
才可以激活主線程繼續(xù)執(zhí)行。 這里的條件指的是非阻塞的通道操作。
2.5.1 簡介
WaitGroup
是一個帶著計數(shù)器的結(jié)構(gòu)體,這個計數(shù)器可以追蹤到有多少協(xié)程創(chuàng)建,有多少協(xié)程完成了其工作。當計數(shù)器為 0 的時候說明所有協(xié)程都完成了其工作。
package main import ( "fmt" "sync" "time" ) func service(wg *sync.WaitGroup, instance int) { time.Sleep(2 * time.Second) fmt.Println("Service called on instance",instance) wg.Done() //協(xié)程數(shù)-1 } func main() { fmt.Println("main started") var wg sync.WaitGroup for i:=1;i<= 3; i++{ wg.Add(1) go service(&wg,i) } wg.Wait()//阻塞 fmt.Println("main stop") } /* 結(jié)果:(結(jié)果是不唯一的,一共有3!次可能的結(jié)果) main started Service called on instance 2 Service called on instance 1 Service called on instance 3 main stop */
在上面的程序中,我們創(chuàng)建了一個sync.WaitGroup
類型的空結(jié)構(gòu)體 (帶著 0 值字段) wg 。 WaitGroup
結(jié)構(gòu)體有一些像 noCopy
, state1
和 sema
這樣的內(nèi)部字段。 這個結(jié)構(gòu)體也有三個公開方法: Add, Wait 和 Done.
Add
方法的參數(shù)是一個變量名叫delta
的int
類型參數(shù),主要用來內(nèi)部計數(shù)。 內(nèi)部計數(shù)器默認值為 0. 它用于記錄多少個協(xié)程在運行。- 當
WaitGroup
創(chuàng)建后,計數(shù)器值為 0,我們可以通過給Add
方法傳int
類型值來增加它的數(shù)量。 記住, 當協(xié)程建立后,計數(shù)器的值不會自動遞增 ,因此需要我們手動遞增它。 Wait
方法用來阻塞當前協(xié)程。一旦計數(shù)器為 0, 協(xié)程將恢復(fù)運行。 因此,我們需要一個方法去降低計數(shù)器的值。Done
方法可以降低計數(shù)器的值。他不接受任何參數(shù),因此,它每執(zhí)行一次計數(shù)器就減 1。
上面的例子中,我們在創(chuàng)建 wg
變量后,運行了三次 for
循環(huán),每次運行的時候我們創(chuàng)建一個協(xié)程并給計數(shù)器加 1
。
這意味著現(xiàn)在我們有三個協(xié)程在等待運行并且 WaitGroup
的計數(shù)器值為 3
。注意我們傳給協(xié)程函數(shù)的是一個指針,這是因為一旦在協(xié)程內(nèi)部工作完成后,我們需要通過調(diào)用Done
方法去降低計數(shù)器的值。
如果 wg
通過值復(fù)制方式傳過去, 因為傳遞的是一個拷貝,主線程中的 wg
將不會得到修改。
在 for 循環(huán)執(zhí)行完成后,我們通過調(diào)用 wg.Wait()
去阻塞當前主線程,并把調(diào)度權(quán)讓給其他協(xié)程,直到計數(shù)器值為 0 之后,主線程才會被再次調(diào)度。
我們在另外三個協(xié)程中通過Done
方法把計數(shù)器值降為 0,此時主線程將再次被調(diào)度并開始執(zhí)行之后的代碼。
2.5.2工作池
顧名思義,一個工作池
并發(fā)執(zhí)行某項工作的協(xié)程集合。 在上面,我們已經(jīng)用到的多個協(xié)程執(zhí)行一個任務(wù),但是他們并沒有執(zhí)行特定的工作,只是 sleep
了一下。 如果你向協(xié)程中傳一個通道,他們可以去完成一些工作,變成一個工作池。
所以工作池其實就是維護了多個工作協(xié)程,這些協(xié)程的功能是可以收到任務(wù),執(zhí)行任務(wù)并返回結(jié)果。他們完成任務(wù)后我們就可以收到結(jié)果。這些協(xié)程使用相同的通道來達到自己的目的。
package main import ( "fmt" "time" ) func sqrWorker(tasks <-chan int, results chan <-int, instance int) { for num := range tasks { time.Sleep(time.Millisecond) //阻塞 fmt.Printf("[worker %v ] Sending result by worker %v \n",instance,instance) results <- num*num } } func main() { fmt.Println("main started") tasks := make(chan int,10) results := make(chan int,10) for i:=0;i<3;i++{ go sqrWorker(tasks,results,i) } for i := 0; i < 5; i++ { tasks <- i*2 } fmt.Println("[main] write 5 tasks") close(tasks) for i := 0; i < 5; i++ { result := <-results fmt.Println("[main] Result" , i , ":", result) } fmt.Println("main stop") } /* //結(jié)果之一 [main] write 5 tasks [worker 0 ] Sending result by worker 0 [worker 1 ] Sending result by worker 1 [worker 2 ] Sending result by worker 2 [main] Result 0 : 4 [main] Result 1 : 16 [main] Result 2 : 0 [worker 1 ] Sending result by worker 1 [main] Result 3 : 64 [worker 0 ] Sending result by worker 0 [main] Result 4 : 36 main stop */
sqrWorker
是一個帶有 tasks
通道,results
通道 和 id
三個參數(shù)的協(xié)程函數(shù)。這個協(xié)程函數(shù)的任務(wù)是把從 tasks
通道接收到的數(shù)字的平方發(fā)送到 results
通道。
在主函數(shù)中,我們創(chuàng)建了兩個帶緩沖區(qū),容量為 10 的通道tasks and result
。因此在緩沖區(qū)被充滿之前,任何操作都是非阻塞的。所以有時候設(shè)置一個大點的緩沖區(qū)是個好辦法。
然后我們循環(huán)創(chuàng)建多個 sqrWorker
協(xié)程,并傳入 tasks
通道, results
通道 和 id
三個參數(shù),用來傳遞和獲取協(xié)程執(zhí)行前后的數(shù)據(jù)。
接著我們向 tasks
非阻塞通道放入 5 個任務(wù)數(shù)據(jù)。
因為我們已經(jīng)向任務(wù)通道放入的數(shù)據(jù),所以我們可以關(guān)閉它,雖然這個操作不是必須的,但是如果以后運行中出現(xiàn)錯誤的話可以防止通道 range
帶來的死鎖問題。
然后我們開啟循環(huán) 5 次從 results
通道接收數(shù)據(jù),因為目前通道緩沖區(qū)沒有數(shù)據(jù),所以通道讀取操作造成主線程阻塞,調(diào)度器將調(diào)度工作池的協(xié)程,直到有數(shù)據(jù)添加到 results
通道。
當前我們有 3 個work
協(xié)程在工作,我們使用了 sleep
操作來模擬阻塞操作,所以調(diào)度器在某一個阻塞的時候會去調(diào)用其他的 work
協(xié)程,當某個 work
協(xié)程 sleep
完成后會把計算數(shù)字的平方的結(jié)果數(shù)據(jù)放入 results
緩沖無阻塞通道。
當 3 個協(xié)程依次交替把 task
通道的任務(wù)都完成后,for range
循環(huán)將完成,并且因為之前我們已經(jīng)關(guān)閉了任務(wù)通道,所以協(xié)程也不會發(fā)生死鎖。調(diào)度器將繼續(xù)返回調(diào)度主線程。
有時候所有的工作協(xié)程可能都在阻塞,此時調(diào)度器將去調(diào)度主線程,直到 results
通道再次為空。
當所有 work
協(xié)程都完成任務(wù)退出后,主線程將繼續(xù)拿到調(diào)度權(quán)并打印 results
通道剩下的數(shù)據(jù),繼續(xù)之后代碼的執(zhí)行。
2.5.3 Mutex
互斥是 Go 中一個簡單的概念。在我解釋它之前,先要明白什么是競態(tài)條件。 goroutines 都有自己的獨立的調(diào)用棧,因此他們之間不分享任何數(shù)據(jù)。但是有一種情況是數(shù)據(jù)存放在堆上,并且被多個 goroutines 使用。 多個 goroutines 試圖去操作一個內(nèi)存區(qū)域的數(shù)據(jù)會造成意想不到的后果.
package main import ( "fmt" "sync" ) var i int func worker(wg *sync.WaitGroup) { i = i+1 wg.Done() } func main() { fmt.Println("main started") var wg sync.WaitGroup for i:=0;i<1000;i++{ wg.Add(1) go worker(&wg) } wg.Wait() fmt.Println("main stop",i) } /* 結(jié)果是不同的??! main started main stop 985 */
i = i + 1 這個計算有 3 步
(1) 得到 i 的值
(2) 給 i 的值加 1
(3) 更新 i 的值
這里發(fā)生很多事情,因為go是協(xié)程,這三步里面不一定都是同時順序執(zhí)行的。有可能A是順利執(zhí)行,使得i=2
,但是B是讀取的是A沒更新的之前的i
也就是1
,所以就是結(jié)果會小于等于1000的,
除非一個協(xié)程阻塞,否則其他協(xié)程是沒有機會獲得調(diào)度的。那么 i = i + 1 也沒有阻塞,為什么 Go 的調(diào)度器會去調(diào)度其他協(xié)程呢?
在任何情況下,都不應(yīng)該依賴 Go 的調(diào)度算法,而應(yīng)該實現(xiàn)自己的邏輯來同步不同的 goroutine.
實現(xiàn)方法之一就是使用我們上面提到的互斥鎖?;コ怄i是一個編程概念,它保證了在同一時間只能有一個線程或者協(xié)程去操作同一個數(shù)據(jù)。當一個協(xié)程想要操作數(shù)據(jù)的時候,必須獲取該數(shù)據(jù)的一個鎖,操作完成后必須釋放鎖,如果沒有獲取到該數(shù)據(jù)的鎖,那么就不能操作這個數(shù)據(jù)。
在 Go 中,互斥數(shù)據(jù)結(jié)構(gòu) ( map) 由 sync 包提供。在 Go 中,多協(xié)程去操作一個值都可能會引起競態(tài)條件。我們需要在操作數(shù)據(jù)之前使用 mutex.Lock() 去鎖定它,一旦我們完成操作,比如上面提到的 i = i + 1, 我們就可以使用 mutext.Unlock() 方法解鎖。
如果在鎖定的時候,有一個協(xié)程想要讀寫 i 的值,那么此協(xié)程將阻塞 直到前面的協(xié)程完成操作并解鎖數(shù)據(jù)。因此在某一時刻有且僅有一個協(xié)程可以操作數(shù)據(jù),從而避免競態(tài)條件。記住,任何鎖之間的變量在解鎖之前對于其他協(xié)程都不是可用的。
讓我們使用互斥鎖修改上面的例子
package main import ( "fmt" "sync" ) var i int func worker(wg *sync.WaitGroup,m *sync.Mutex) { m.Lock() i = i+1 m.Unlock() wg.Done() } func main() { fmt.Println("main started") var wg sync.WaitGroup var m sync.Mutex for i:=0;i<1000;i++{ wg.Add(1) go worker(&wg,&m) } wg.Wait() fmt.Println("main stop",i) } /*結(jié)果 main started main stop 1000 */
在上面的程序中,我們創(chuàng)建了一個互斥鎖變量 m
,并把它的指針傳遞給所有已創(chuàng)建的協(xié)程。
在協(xié)程內(nèi)部,當我們要開始操作 i
變量的時候,我們先通過 m.Lock()
獲得鎖,操作完成后我們使用 m.Unlock()
釋放鎖。
互斥鎖可以幫助我們解決競態(tài)條件。 但首要規(guī)則是避免 goroutine
之間共享資源。
所以官方建議不要共享內(nèi)存并發(fā),而是通過管道通信的方式并發(fā)。
3. 結(jié)語
后部分go并發(fā)知識是參考作者summar的go并發(fā)以及書上的知識點,非常感謝作者的翻譯工作,使得我能更好的理解go的channel并發(fā)機制!鏈接點這里channel
隨著業(yè)務(wù)的不斷擴大,并發(fā)能更好的發(fā)揮服務(wù)器的性能。
以上就是Go語言七篇入門教程四通道及Goroutine的詳細內(nèi)容,更多關(guān)于Go語言通道及Goroutine的資料請關(guān)注腳本之家其它相關(guān)文章!
如何學習Go
如果你是小白,你可以這樣學習Go語言~
七篇入門Go語言
第一篇:Go簡介初識
第二篇:程序結(jié)構(gòu)&&數(shù)據(jù)類型的介紹
第三篇:函數(shù)方法接口的介紹
第五篇:文件及包的操作與處理
第六篇:網(wǎng)絡(luò)編程
第七篇:GC垃圾回收三色標記
相關(guān)文章
關(guān)于Golang中range指針數(shù)據(jù)的坑詳解
這篇文章主要給大家介紹了關(guān)于Golang中range指針數(shù)據(jù)的坑的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-02-02go并發(fā)編程sync.Cond使用場景及實現(xiàn)原理
這篇文章主要為大家介紹了go并發(fā)編程sync.Cond使用場景及實現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08Golang中的Slice與數(shù)組及區(qū)別詳解
數(shù)組是一種具有固定長度的基本數(shù)據(jù)結(jié)構(gòu),在golang中與C語言一樣數(shù)組一旦創(chuàng)建了它的長度就不允許改變,數(shù)組的空余位置用0填補,不允許數(shù)組越界。今天小編通過實例代碼操作給大家詳細介紹lang中的Slice與數(shù)組的相關(guān)知識,一起看看吧2020-02-02