Go并發(fā)編程sync.Cond的具體使用
簡(jiǎn)介
Go
標(biāo)準(zhǔn)庫(kù)提供 Cond
原語(yǔ)的目的是,為等待 / 通知場(chǎng)景下的并發(fā)問(wèn)題提供支持。Cond
通常應(yīng)用于等待某個(gè)條件的一組 goroutine
,等條件變?yōu)?true
的時(shí)候,其中一個(gè) goroutine
或者所有的 goroutine
都會(huì)被喚醒執(zhí)行。
Cond
是和某個(gè)條件相關(guān),這個(gè)條件需要一組 goroutine
協(xié)作共同完成,在條件還沒(méi)有滿足的時(shí)候,所有等待這個(gè)條件的 goroutine
都會(huì)被阻塞住,只有這一組 goroutine
通過(guò)協(xié)作達(dá)到了這個(gè)條件,等待的 goroutine 才可能繼續(xù)進(jìn)行下去。
這個(gè)條件可以是我們自定義的 true/false
邏輯表達(dá)式。
但是 Cond
使用的比較少,因?yàn)樵诖蟛糠謭?chǎng)景下是可以被 Channel
和 WaitGroup
來(lái)替換的。
詳細(xì)介紹
下面就是 Cond
的數(shù)據(jù)結(jié)構(gòu)和對(duì)外提供的方法,Cond
內(nèi)部維護(hù)了一個(gè)等待隊(duì)列和鎖實(shí)例。
type Cond struct { noCopy noCopy // 鎖 L Locker // 等待隊(duì)列 notify notifyList checker copyChecker } func NeWCond(l Locker) *Cond func (c *Cond) Broadcast() func (c *Cond) Signal() func (c *Cond) Wait()
NeWCond:
NeWCond
方法需要調(diào)用者傳入一個(gè)Locker
接口,這個(gè)接口就Lock/UnLock
方法,所以我們可以傳入一個(gè)sync.Metex
對(duì)象Signal:允許調(diào)用者喚醒一個(gè)等待當(dāng)前
Cond
的goroutine
。如果Cond
等待隊(duì)列中有一個(gè)或者多個(gè)等待的goroutine
,則從等待隊(duì)列中移除第一個(gè)goroutine
并把它喚醒Broadcast:允許調(diào)用者喚醒所有等待當(dāng)前
Cond
的goroutine
。如果 Cond 等待隊(duì)列中有一個(gè)或者多個(gè)等待的goroutine
,則清空所有等待的goroutine
,并全部喚醒Wait:會(huì)把調(diào)用者放入
Cond
的等待隊(duì)列中并阻塞,直到被Signal
或者Broadcast
的方法從等待隊(duì)列中移除并喚醒
案例:Redis連接池
可以看一下下面的代碼,使用了 Cond
實(shí)現(xiàn)一個(gè) Redis
的連接池,最關(guān)鍵的代碼就是在鏈表為空的時(shí)候需要調(diào)用 Cond
的 Wait
方法,將 gorutine
進(jìn)行阻塞。然后 goruntine
在使用完連接后,將連接返回池子后,需要通知其他阻塞的 goruntine
來(lái)獲取連接。
package main import ( "container/list" "fmt" "math/rand" "sync" "time" ) // 連接池 type Pool struct { lock sync.Mutex // 鎖 clients list.List // 連接 cond *sync.Cond // cond實(shí)例 close bool // 是否關(guān)閉 } // Redis Client type Client struct { id int32 } // 創(chuàng)建Redis Client func NewClient() *Client { return &Client{ id: rand.Int31n(100000), } } // 關(guān)閉Redis Client func (this *Client) Close() { fmt.Printf("Client:%d 正在關(guān)閉", this.id) } // 創(chuàng)建連接池 func NewPool(maxConnNum int) *Pool { pool := new(Pool) pool.cond = sync.NewCond(&pool.lock) // 創(chuàng)建連接 for i := 0; i < maxConnNum; i++ { client := NewClient() pool.clients.PushBack(client) } return pool } // 從池子中獲取連接 func (this *Pool) Pull() *Client { this.lock.Lock() defer this.lock.Unlock() // 已關(guān)閉 if this.close { fmt.Println("Pool is closed") return nil } // 如果連接池沒(méi)有連接 需要阻塞 for this.clients.Len() <= 0 { this.cond.Wait() } // 從鏈表中取出頭節(jié)點(diǎn),刪除并返回 ele := this.clients.Remove(this.clients.Front()) return ele.(*Client) } // 將連接放回池子 func (this *Pool) Push(client *Client) { this.lock.Lock() defer this.lock.Unlock() if this.close { fmt.Println("Pool is closed") return } // 向鏈表尾部插入一個(gè)連接 this.clients.PushBack(client) // 喚醒一個(gè)正在等待的goruntine this.cond.Signal() } // 關(guān)閉池子 func (this *Pool) Close() { this.lock.Lock() defer this.lock.Unlock() // 關(guān)閉連接 for e := this.clients.Front(); e != nil; e = e.Next() { client := e.Value.(*Client) client.Close() } // 重置數(shù)據(jù) this.close = true this.clients.Init() } func main() { var wg sync.WaitGroup pool := NewPool(3) for i := 1; i <= 10; i++ { wg.Add(1) go func(index int) { defer wg.Done() // 獲取一個(gè)連接 client := pool.Pull() fmt.Printf("Time:%s | 【goruntine#%d】獲取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id) time.Sleep(time.Second * 5) fmt.Printf("Time:%s | 【goruntine#%d】使用完畢,將client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id) // 將連接放回池子 pool.Push(client) }(i) } wg.Wait() }
運(yùn)行結(jié)果:
Time:15:10:25 | 【goruntine#7】獲取到client[31847]
Time:15:10:25 | 【goruntine#5】獲取到client[27887]
Time:15:10:25 | 【goruntine#10】獲取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完畢,將client[27887]放回池子
Time:15:10:30 | 【goruntine#6】獲取到client[27887]
Time:15:10:30 | 【goruntine#10】使用完畢,將client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完畢,將client[31847]放回池子
Time:15:10:30 | 【goruntine#1】獲取到client[31847]
Time:15:10:30 | 【goruntine#9】獲取到client[98081]
Time:15:10:35 | 【goruntine#6】使用完畢,將client[27887]放回池子
Time:15:10:35 | 【goruntine#3】獲取到client[27887]
Time:15:10:35 | 【goruntine#1】使用完畢,將client[31847]放回池子
Time:15:10:35 | 【goruntine#4】獲取到client[31847]
Time:15:10:35 | 【goruntine#9】使用完畢,將client[98081]放回池子
Time:15:10:35 | 【goruntine#2】獲取到client[98081]
Time:15:10:40 | 【goruntine#3】使用完畢,將client[27887]放回池子
Time:15:10:40 | 【goruntine#8】獲取到client[27887]
Time:15:10:40 | 【goruntine#2】使用完畢,將client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完畢,將client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完畢,將client[27887]放回池子
注意點(diǎn)
- 在調(diào)用
Wait
方法前,需要先加鎖,就像我上面例子中Pull
方法也是先加鎖
看一下源碼就知道了,因?yàn)?Wait
方法的執(zhí)行邏輯是先將 goruntine
添加到等待隊(duì)列中,然后釋放鎖,然后阻塞,等喚醒后,會(huì)繼續(xù)加鎖。如果在調(diào)用 Wait
前不加鎖,但是里面會(huì)解鎖,執(zhí)行的時(shí)候就會(huì)報(bào)錯(cuò)。
// // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // func (c *Cond) Wait() { c.checker.check() // 添加到等待隊(duì)列 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 阻塞 runtime_notifyListWait(&c.notify, t) c.L.Lock() }
- 還是
Wait
方法,在喚醒后需要繼續(xù)檢查Cond
條件
就拿上面的 redis
連接案例來(lái)進(jìn)行說(shuō)明吧,我這里是使用了 for
循環(huán)來(lái)進(jìn)行檢測(cè)。如果將 for
循環(huán)改成使用 if
,也就是只判斷一次,會(huì)有什么問(wèn)題?可以停下來(lái)先想想
上面說(shuō)了調(diào)用者也可以使用 Broadcast
方法來(lái)喚醒 goruntine
,如果使用的是 Broadcast
方法,所有的 goruntine
都會(huì)被喚醒,然后大家都去鏈表中去獲取 redis
連接了,就會(huì)出現(xiàn)部分 goruntine
拿不到連接,實(shí)際上沒(méi)有那么多連接可以獲取,因?yàn)槊看沃粫?huì)放回一個(gè)連接到池子中。
// 如果連接池沒(méi)有連接 需要阻塞 for this.clients.Len() <= 0 { this.cond.Wait() } // 獲取連接 ele := this.clients.Remove(this.clients.Front()) return ele.(*Client)
到此這篇關(guān)于Go并發(fā)編程sync.Cond的具體使用的文章就介紹到這了,更多相關(guān)Go sync.Cond內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語(yǔ)言驅(qū)動(dòng)低代碼應(yīng)用引擎工具Yao開(kāi)發(fā)管理系統(tǒng)
這篇文章主要為大家介紹了Go語(yǔ)言驅(qū)動(dòng)低代碼應(yīng)用引擎工具Yao開(kāi)發(fā)管理系統(tǒng)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Go編寫定時(shí)器與定時(shí)任務(wù)詳解(附第三方庫(kù)gocron用法)
當(dāng)需要每天執(zhí)行定時(shí)任務(wù)的時(shí)候就需要定時(shí)器來(lái)處理了,周期任務(wù),倒計(jì)時(shí)任務(wù),定點(diǎn)任務(wù)等,下面這篇文章主要給大家介紹了關(guān)于Go編寫定時(shí)器與定時(shí)任務(wù)的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Go檢查結(jié)構(gòu)體中是否存在某個(gè)字段及創(chuàng)建結(jié)構(gòu)體切片或映射
這篇文章主要為大家介紹了Go檢查結(jié)構(gòu)體中是否存在某個(gè)字段及創(chuàng)建結(jié)構(gòu)體切片或映射實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01Go實(shí)現(xiàn)簡(jiǎn)易R(shí)PC框架的方法步驟
本文旨在講述 RPC 框架設(shè)計(jì)中的幾個(gè)核心問(wèn)題及其解決方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-03-03Golang中函數(shù)(Function)和方法(Method)的區(qū)別詳解
在Golang中,大家必然會(huì)頻繁使用到函數(shù)(Function)和方法(Method),但是有的同學(xué)可能并沒(méi)有注意過(guò)函數(shù)和方法的異同點(diǎn),函數(shù)和方法都是用來(lái)執(zhí)行特定任務(wù)的代碼塊,雖然很相似,但也有很大的區(qū)別,所以本文將詳細(xì)講解函數(shù)和方法的定義以及它們的異同點(diǎn)2023-07-07一文搞懂Go語(yǔ)言中defer關(guān)鍵字的使用
defer是golang中用的比較多的一個(gè)關(guān)鍵字,也是go面試題里經(jīng)常出現(xiàn)的問(wèn)題。今天就來(lái)整理一下關(guān)于defer的學(xué)習(xí)使用,希望對(duì)需要的朋友有所幫助2022-09-09