詳解Go?sync?同步原語
Go 中不僅有 channel 這種 CSP 同步機制,還有 sync.Mutex、sync.WaitGroup 等比較原始的同步原語。使用它們,可以更靈活的控制數(shù)據(jù)同步和多協(xié)程并發(fā)。
- sync.Mutex
- sync.RWMutex
- sync.WaitGroup
- sync.Once
- sync.Cond
- sync.Map
在一個 goroutine 中,如果分配的內(nèi)存沒有被其他 goroutine 訪問,只在該 goroutine 中被使用,不存在資源競爭的問題。但如果同一塊內(nèi)存被多個 goroutine 同時訪問,就會不知道誰先訪問,也無法預料最后結(jié)果。這就產(chǎn)生了資源競爭,這塊內(nèi)存就是共享資源。channel 是并發(fā)安全的,內(nèi)部自加了鎖,但是很多變量或者資源沒有加鎖,就需要 sync 同步原語了。
eg. 啟動100個協(xié)程,讓 nSum 加10,期待的結(jié)果是1000。
package main import ( "fmt" "time" ) var nSum = 0 func add(i int) { nSum += i } func main() { for i := 0; i < 100; i++ { go add(10) } time.Sleep(time.Second) fmt.Println("sum=", nSum) }
運行完之后,輸出的結(jié)果可能是1000,也可能是990,或是980。
$ while true; do go run gosrc.go; done;
類似 go build、go run、go test,這種 Go 工具鏈命令,添加 -race 標識,幫助檢查 Go 語言代碼是否存在資源競爭。
$ go run -race gosrc.go
導致這種現(xiàn)象的原因是,資源 nSum 并不是并發(fā)安全的,因為同時會有多個協(xié)程執(zhí)行 nSum += i,產(chǎn)生不可預料的結(jié)果。所以需要確保同時只有一個協(xié)程執(zhí)行 nSum += i 操作,互斥鎖可以實現(xiàn)。
sync.Mutex
互斥鎖,是指在同一時刻只有一個協(xié)程執(zhí)行某段代碼,其他協(xié)程都要等待該協(xié)程執(zhí)行完畢后才能繼續(xù)執(zhí)行。
下面的實例中,聲明一個互斥鎖,然后修改 add 函數(shù),對 nSum += i 執(zhí)行加鎖保護,這樣這段代碼在并發(fā)的時候就安全了,可以得到正確的結(jié)果。
上面這段加鎖保護的代碼,稱為臨界區(qū)。在同步程序設計中,臨界區(qū)指的是一個訪問共享資源的程序片段,而這些共享資源又無法同時被多個協(xié)程訪問的特性。當一個協(xié)程獲得了鎖后,其他的協(xié)程只有等待鎖釋放,才能再去獲得鎖。鎖的 Lock 和 Unlock 方法總是成對的出現(xiàn)。
package main import ( "fmt" "sync" "time" ) var ( nSum int mutex sync.Mutex ) func add(i int) { mutex.Lock() defer mutex.Unlock() nSum += i } func main() { for i := 0; i < 100; i++ { go add(10) } time.Sleep(2*time.Second) fmt.Println("nSum=", nSum) }
運行結(jié)果如下,
$ count=0;while (($count < 10)); do go run gomutex.go;((count=$count+1)); done
sync.RWMutex
互斥鎖是完全互斥的,但是有很多實際的場景下是讀多寫少的,當我們并發(fā)的讀取一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在 Go 語言中使用 sync.RWMutex 類型。
讀寫鎖分為兩種:讀鎖和寫鎖。當一個 goroutine 獲取讀鎖之后,其他 goroutine 如果是獲取讀鎖會繼續(xù)獲得鎖,如果是獲取寫鎖就會等待;當一個 goroutine 獲取寫鎖之后,其他 goroutine 無論是獲取讀鎖還是寫鎖都會等待。
這里有一個性能問題,每次讀寫共享資源都要加鎖,性能低下,怎么解決?現(xiàn)在分析這個特殊的場景,會有以下三種情況,寫的時候不能同時讀(讀未提交),讀的時候不能同時寫(讀已提交),讀的時候可以同時讀(可重復讀)。
- 可能讀到臟數(shù)據(jù),臟讀
- 會產(chǎn)生不可預料的結(jié)果,幻讀
- 不管多少協(xié)程讀,都是并發(fā)安全的,可重復讀。
可以通過讀寫鎖提升性能,對比互斥鎖,讀寫鎖改動有兩個地方,
- 把鎖的聲明換成讀寫鎖 RWMutex
- 把讀取數(shù)據(jù)的代碼(函數(shù) readSum)換成讀鎖
這樣性能有很大提升,多個協(xié)程可以同時讀取數(shù)據(jù),不用相互等待。
sync.WaitGroup
用于最終完成的場景,關鍵點在于一定是等待所有協(xié)程都執(zhí)行完畢。
在前面的程序里邊,為了防止主函數(shù)返回,使用了 time.Sleep 語句強制程序睡眠,因為一旦 main goroutine 返回,函數(shù)就退出了。
但這里是有問題的。如果這100個協(xié)程在兩秒內(nèi)執(zhí)行完畢,main 函數(shù)本該提前返回,但是還是要等夠兩秒才能返回,存在性能問題。如果執(zhí)行超過2秒,函數(shù)返回,有些協(xié)程不會執(zhí)行,產(chǎn)生不可預知的結(jié)果。
有沒有辦法監(jiān)聽所有 goroutine 的執(zhí)行?一旦全部執(zhí)行完畢,程序馬上退出,既可以保證所有協(xié)程執(zhí)行完畢,又可以及時退出節(jié)省時間,提升性能。
通道 channel 可以實現(xiàn),但比較復雜。所以,Go 提供了 WaitGroup。對上面的例子代碼進行改造,分三步執(zhí)行,
- 聲明一個 WaitGroup,通過 Add 方法設置一個計數(shù)器的值,需要跟蹤多少協(xié)程就設置多少。
- 每個協(xié)程在執(zhí)行完畢的時候,一定要調(diào) Done 方法,讓計數(shù)器減1,告訴 WaitGroup 該協(xié)程已經(jīng)執(zhí)行完畢。
- 最后調(diào)用 Wait 方法,一直等待,直到計數(shù)器的值變?yōu)?,也就是所有跟蹤的協(xié)程執(zhí)行完畢了。
通過 WaitGroup 可以很好地跟蹤協(xié)程,在協(xié)程執(zhí)行完畢后,整個 main 函數(shù)才能執(zhí)行完畢。
package main import ( "fmt" "sync" ) var ( nSum int mutex sync.RWMutex ) func add(i int) { mutex.Lock() defer mutex.Unlock() nSum += i } func main() { var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { go func() { defer wg.Done() add(10) }() } wg.Wait() fmt.Println("nSum=", nSum) }
運行結(jié)果,會發(fā)現(xiàn)輸出執(zhí)行速度方面會清爽很多。
sync.WaitGroup適合協(xié)調(diào)多個goroutine共同做一件事情的場景。比如下載較大的文件時,為了加快下載速度,我們會使用多線程(協(xié)程)下載。假設使用10個協(xié)程,每個協(xié)程下載文件的1/10大小,只有10個協(xié)程都下載好了整個文件才算是下載好了。再比如流水線上,下個階段需要上個階段把所有數(shù)據(jù)準備好,10個協(xié)程準備數(shù)據(jù),等所有協(xié)程處理完后,統(tǒng)一進入下個階段繼續(xù)執(zhí)行.....
sync.Once
讓代碼只執(zhí)行一次,哪怕是在高并發(fā)的情況下,比如創(chuàng)建一個單例。
先看個例子
package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("Only once") } done := make(chan bool) // 用于等待協(xié)程執(zhí)行完畢 for i := 0; i < 10; i++ { // 啟動 10 個協(xié)程 go func(n int) { fmt.Println(n) once.Do(onceBody) done<-true }(i) } for i := 0; i < 10; i++ { <-done } }
運行結(jié)果如下,
使用 WaitGroup 來保證子協(xié)程執(zhí)行完畢,也可以這樣寫,
package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("Only once") } var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func(n int) { fmt.Println(n) once.Do(onceBody) wg.Done() }(i) } wg.Wait() }
sync.Cond
可以用做發(fā)令槍,關鍵點在于 goroutine 開始的時候是等待的。Cond 一聲令下,所有 goroutine 都開始執(zhí)行。sync.Cond 從字面意思看是條件變量,除此之外,還具有阻塞和喚醒協(xié)程的功能,所以可以在滿足一定條件的情況下喚醒協(xié)程。
sync.Cond有三個方法,
- Wait,阻塞當前協(xié)程,直到其他協(xié)程調(diào)用signal或broadcast來喚醒,使用時需要加鎖
- Signal,喚醒一個等待時間最長的協(xié)程
- Broadcast就是廣播,喚醒所有等待的協(xié)程
注意,在調(diào)用 Signal 或者 Broadcast 之前,一定要確保目標協(xié)程要處于等待 Wait 阻塞狀態(tài),不然會出現(xiàn)死鎖問題。和 java 里邊的 wait、notify、notifyall 類似。
package main import ( "fmt" "sync" "time" ) func main() { cond := sync.NewCond(&sync.Mutex{}) var wg sync.WaitGroup wg.Add(11) for i := 0; i < 10; i++ { go func(n int) { defer wg.Done() fmt.Println("ready", n) cond.L.Lock() cond.Wait() fmt.Println("go", n) cond.L.Unlock() }(i) } time.Sleep(time.Second) go func() { defer wg.Done() fmt.Println("beng beng...") // 發(fā)令槍響 cond.Broadcast() }() wg.Wait() }
運行結(jié)果如下,
sync.Map
Go 中的 map 類型是并發(fā)不安全的,在實際開發(fā)中,這種類型不能用在并發(fā)寫的場景,并發(fā)讀還是可以的。不過 slice 是并發(fā)安全的,有時候可以使用 slice 來代替 map,但需要迭代元素進行轉(zhuǎn)換。這時 sync.Map 也是一個不錯的選擇。
- Store,存儲一對 kv;
- Load,根據(jù) key 獲取對應的 value,并可以判斷 key 是否存在;
- LoadOrStore,如果 key 對應的 value 存在,則返回 value;否則存儲相應的value;
- Delete,刪除一對 kv;
- Range,循環(huán)迭代 sync.Map,效果與 for range 一樣。
到此這篇關于Go sync 同步原語的文章就介紹到這了,更多相關Go sync 同步原語內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Go語言配置數(shù)據(jù)庫連接池的實現(xiàn)
本文內(nèi)容我們將解釋連接池背后是如何工作的,并探索如何配置數(shù)據(jù)庫能改變或優(yōu)化其性能。文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12