一文教你學(xué)會(huì)Go中singleflight的使用
寫(xiě)作背景
緩存在項(xiàng)目中使用應(yīng)該是非常頻繁的,提到緩存只要了解過(guò) singleflight ,基本都會(huì)用于緩存實(shí)現(xiàn)的一部分吧?但 singleflight 要用好也不容易。
名稱(chēng)解釋
singleflight 來(lái)源于準(zhǔn)官方庫(kù)(也可以說(shuō)官方擴(kuò)展庫(kù))golang.org/x/sync/singleflight 包中。它的作用是避免同一個(gè) key 對(duì)下游發(fā)起多次請(qǐng)求,降低下游流量。
源碼剖析
3 個(gè)結(jié)構(gòu)體
Group 是 singleflight 的核心,代表一個(gè)組,用于執(zhí)行具有重復(fù)抑制的工作單元。
type Group struct {
mu sync.Mutex
m map[string]*call
}
mu 是保護(hù) m 字段的互斥鎖,確保對(duì)調(diào)用信息的訪問(wèn)是線(xiàn)程安全的。m 是一個(gè) map,鍵是函數(shù)的唯一標(biāo)識(shí)符,值是 call 結(jié)構(gòu)體,代表一次函數(shù)調(diào)用的信息,包括函數(shù)的返回值和錯(cuò)誤。
call 代表一次函數(shù)調(diào)用的信息,把函數(shù)的調(diào)用結(jié)果封裝到 call 中
type call struct {
wg sync.WaitGroup
// 這些字段在 WaitGroup 完成之前只被寫(xiě)入一次,并且在 WaitGroup 完成之后只被讀取
val interface{} // 函數(shù)調(diào)用的返回值
err error // 函數(shù)調(diào)用可能出現(xiàn)的錯(cuò)誤
dups int // 相同 key 調(diào)用次數(shù)
chans []chan<- Result // 結(jié)果通道列表,僅調(diào)用 DoChan() 方法時(shí)返回
}
Result 結(jié)構(gòu)體用于保存 DoChan() 方法的執(zhí)行結(jié)果,以便將結(jié)果傳遞給通道。
type Result struct {
Val interface{}
Err error
Shared bool
}4 個(gè)方法
Group 主要提供了 3 個(gè)公開(kāi)方法和 1 個(gè)非公開(kāi)方法。
Do() 方法,相同的 key 對(duì)應(yīng)的 fn 函數(shù)只會(huì)調(diào)用一次。返回值 v 調(diào)用 fn() 方法返回的結(jié)果;err 調(diào)用 fn() 返回的 err;shared:表示在多次調(diào)用的結(jié)果是否共享。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
源碼比較簡(jiǎn)單,如果 key 對(duì)應(yīng)的 fn 函數(shù)已被調(diào)用,則等待 fn 函數(shù)調(diào)用完成直接返回結(jié)果。如果 fn 未被調(diào)用,new(call) 存入 m 中,執(zhí)行 doCal() 方法。
doCall() 方法,調(diào)用 key 對(duì)應(yīng)的 fn 方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {}
} else {
panic(e)
}
} else if c.err == errGoexit {
} else {
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
doCall() 代碼比較簡(jiǎn)單,double defer 雙延遲機(jī)制區(qū)分 panic 和 runtime.Goexit。第二個(gè) defer 會(huì)先執(zhí)行調(diào)用 fn() 函數(shù),如果未正常返回將會(huì)補(bǔ)獲異常,并將堆棧信息存入 err 中。
第一個(gè) defer 先將 key 從 m 中移除,再就是異常處理,如果是 Goexit 正常退出,如果斷言是 panicError 將對(duì)外拋出 Panic。若正常退出將結(jié)果發(fā)送到 chans 通道列表中。
DoChan() 方法類(lèi)似于 Do() 方法,返回通道(chan),通過(guò)通道接收數(shù)據(jù)。另外通道不會(huì)被關(guān)閉。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
Forget() 方法,可以理解為丟棄某一個(gè) key,后面該 key 會(huì)被立即調(diào)用,而不是等待先前的調(diào)用完成。
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
經(jīng)典案例
緩存場(chǎng)景在大家的業(yè)務(wù)場(chǎng)景中應(yīng)該是被廣泛使用的,大部分的場(chǎng)景使用應(yīng)該都是下圖吧?

從單體應(yīng)用到微服務(wù)化,調(diào)用下游服務(wù)一般如下圖吧?

假設(shè)緩存 Miss 所有流量會(huì)瞬間打到數(shù)據(jù)庫(kù),或者所有流量都會(huì)打到 server2,如果學(xué)習(xí)過(guò) singleflight 的同學(xué),肯定會(huì)把它用在 reids->db 或 server->server2 之間,包括我也是。如下圖(只舉數(shù)據(jù)庫(kù)案例)。

在使用 singleflight 之前你先確定下你的業(yè)務(wù)場(chǎng)景,key 相同的情況多嗎?(可以統(tǒng)計(jì)一些數(shù)據(jù),我們業(yè)務(wù)場(chǎng)景同一個(gè) key 多次調(diào)用下游概率是比較高的)如果 key 相同的情況比較少,singleflight 對(duì)你的幫助可能不大。
上面列舉 2 種方案。
1、 singleflight 介于 redis 和 db 之間,redis 是內(nèi)存緩存 qps 高、響應(yīng)也快。大部分情況不會(huì)成為瓶頸,但數(shù)據(jù)庫(kù)就不一樣了,所以這種方案可以防止緩存被擊穿流量打到數(shù)據(jù)庫(kù)。
2、 singleflight 介于 server 和 redis 之間,網(wǎng)上挺多推薦這種用法的,有必要用此方案嗎?大家可以思考下,文章末尾我給出我的想法。
我更傾向方案一。代碼如下:
func TestSingleFlight(t *testing.T) {
var (
n = 10
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
)
for i := 0; i < n; i++ {
go func() {
wg.Add(1)
defer wg.Done()
r, err, shared := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
panic(err)
}
fmt.Printf("r=%v,shared=%v\n", r, shared)
}()
}
wg.Wait()
}
func get(key string) (interface{}, error) {
time.Sleep(time.Microsecond) // todo 模擬業(yè)務(wù)處理
return key, nil
}
輸出結(jié)果如下
=== RUN TestSingleFlight
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=true
--- PASS: TestSingleFlight (0.00s)
PASS
打印結(jié)果中為 true 都代表 調(diào)用 get() 函數(shù)返回結(jié)果被共享。get 函數(shù)調(diào)用明顯降低了。
這種寫(xiě)法在函數(shù)正常返回情況下是能拿到正確的結(jié)果,如果下游返回異常了呢?(業(yè)務(wù)上遇過(guò)下游返回3-4s的拉低業(yè)務(wù)處理速度)因?yàn)?nbsp;Do() 方法是以阻塞的方式來(lái)控制對(duì)下游的調(diào)用的,如果某一個(gè)請(qǐng)求被阻塞了,同一個(gè) key 后面的請(qǐng)求都會(huì)被阻塞。
假設(shè)有一場(chǎng)景(SOP),消費(fèi) kafka 消息處理業(yè)務(wù)邏輯,業(yè)務(wù)高峰期某一時(shí)間段生產(chǎn)消息量為 100 w,單 pod 消費(fèi)速度 500/s ,請(qǐng)求下游用 singleflight 控制對(duì)下游(三方接口)的并發(fā)量,假設(shè)下游某一次請(qǐng)求耗時(shí) 2s。這時(shí)會(huì)有幾個(gè)問(wèn)題:
1、若某一個(gè) key 被阻塞后續(xù)該 key 大量請(qǐng)求被阻塞,若這批請(qǐng)求失敗從而導(dǎo)致消息處理失敗,如果對(duì)消息重試會(huì)加劇業(yè)務(wù)下游壓力。
2、單 pod 消費(fèi)速度從 500/s,降低到個(gè)位數(shù),消費(fèi)時(shí)間拉長(zhǎng),消息堆積(如果消息堆積對(duì)實(shí)時(shí)性要求場(chǎng)景影響視頻很大的)。
造成這個(gè)問(wèn)題主要原因如下:
singleflight 是同步阻塞且缺乏超時(shí)控制機(jī)制,若某一個(gè) key 阻塞后面次 key 都會(huì)被阻塞并且等待第一次結(jié)束。
singleflight 雖然能降低對(duì)下游的請(qǐng)求量,但在某些場(chǎng)景失敗的情況也增加了。
我們有辦法給 singleflight 加一個(gè)超時(shí)時(shí)間嗎?答案是肯定有的
下面這段代碼 singleflight 沒(méi)有增加超時(shí)控制
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf("總請(qǐng)求數(shù)=%d,請(qǐng)求成功率=%d,請(qǐng)求失敗率=%d", n, n-failCnt, failCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長(zhǎng),超時(shí)失敗了
time.Sleep(time.Microsecond * 500)
err = fmt.Errorf("耗時(shí)長(zhǎng)")
}
return key, err
}
結(jié)果輸出如下
=== RUN TestSingleFlight
總請(qǐng)求數(shù)=1000,請(qǐng)求成功率=792,請(qǐng)求失敗率=208--- PASS: TestSingleFlight (0.00s)
PASS
singleflight 增加超時(shí)控制代碼如下
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*30)
go func(_ctx context.Context) {
<-_ctx.Done()
sf.Forget(k)
}(ctx)
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf("總請(qǐng)求數(shù)=%d,請(qǐng)求成功率=%d,請(qǐng)求失敗率=%d", n, n-failCnt, failCnt)
}
利用 context.WithTimeout() 方法控制超時(shí),并且調(diào)用 Forget() 方法移除超時(shí) key 結(jié)果輸出如下
=== RUN TestSingleFlight
總請(qǐng)求數(shù)=1000,請(qǐng)求成功率=992,請(qǐng)求失敗率=8--- PASS: TestSingleFlight (0.00s)
PASS
成功率提高了失敗率明顯降低了。
下面我用 DoChan() 函數(shù)實(shí)現(xiàn)
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000 // n 越大,效果越明顯
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
successCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
ch := sf.DoChan(k, func() (interface{}, error) {
return get(k)
})
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*100)
select {
case <-ctx.Done():
sf.Forget(k)
return
case ret := <-ch:
if ret.Err != nil {
return
}
atomic.AddInt32(&successCnt, 1)
}
}()
}
wg.Wait()
fmt.Printf("總請(qǐng)求數(shù)=%d,請(qǐng)求成功率=%d,請(qǐng)求失敗率=%d", n, successCnt, n-successCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長(zhǎng),超時(shí)失敗了
time.Sleep(time.Microsecond * 400)
err = fmt.Errorf("耗時(shí)長(zhǎng)")
}
return key, err
}
大家自行驗(yàn)證
總結(jié)
1、singleflight 使用得當(dāng)確實(shí)能有效降低下游流量,我也推薦大家使用,但一定要注意同步阻塞問(wèn)題,防止下游長(zhǎng)耗時(shí)造成業(yè)務(wù)異常或高延遲,一定要做好正確性與降低業(yè)務(wù)下游流量權(quán)衡。
2、上面我留了一個(gè)問(wèn)題,singleflight 有必要放在 server 應(yīng)用和 redis 之間嗎?我認(rèn)為沒(méi)必要,redis 是內(nèi)存數(shù)據(jù)庫(kù),響應(yīng)快,高 qps 本身不會(huì)是瓶頸,保護(hù) redis 沒(méi)有意義。另外 singleflight 用途是防止 redis 擊穿流量打到數(shù)據(jù)庫(kù),如果你業(yè)務(wù) qps 非常高并且對(duì)數(shù)據(jù)實(shí)時(shí)性要求高,為啥不通過(guò)其他手段把數(shù)據(jù)庫(kù)數(shù)據(jù)刷新到 redis 中?比如數(shù)據(jù)創(chuàng)建同步寫(xiě)入 redis、或通過(guò) binlog 寫(xiě)入。
到此這篇關(guān)于一文教你學(xué)會(huì)Go中singleflight的使用的文章就介紹到這了,更多相關(guān)Go singleflight內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GoLang?socket網(wǎng)絡(luò)編程傳輸數(shù)據(jù)包時(shí)進(jìn)行長(zhǎng)度校驗(yàn)的方法
在GoLang?socket網(wǎng)絡(luò)編程中,為了確保數(shù)據(jù)交互的穩(wěn)定性和安全性,通常會(huì)通過(guò)傳輸數(shù)據(jù)的長(zhǎng)度進(jìn)行校驗(yàn),發(fā)送端首先發(fā)送數(shù)據(jù)長(zhǎng)度,然后發(fā)送數(shù)據(jù)本體,接收端則根據(jù)接收到的數(shù)據(jù)長(zhǎng)度和數(shù)據(jù)本體進(jìn)行比較,以此來(lái)確認(rèn)數(shù)據(jù)是否傳輸成功2024-11-11
golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法
這篇文章主要介紹了golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-03-03
Go?gRPC進(jìn)階教程服務(wù)超時(shí)設(shè)置
這篇文章主要為大家介紹了Go?gRPC進(jìn)階,gRPC請(qǐng)求的超時(shí)時(shí)間設(shè)置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
Go?chassis云原生微服務(wù)開(kāi)發(fā)框架應(yīng)用編程實(shí)戰(zhàn)
這篇文章主要為大家介紹了Go?chassis云原生微服務(wù)開(kāi)發(fā)框架應(yīng)用編程實(shí)戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
GO語(yǔ)言求100以?xún)?nèi)的素?cái)?shù)
這篇文章主要介紹了GO語(yǔ)言求100以?xún)?nèi)的素?cái)?shù),主要通過(guò)篩選法來(lái)實(shí)現(xiàn),涉及GO語(yǔ)言基本的循環(huán)與函數(shù)調(diào)用方法,需要的朋友可以參考下2014-12-12
golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解
常見(jiàn)的IO模型有阻塞、非阻塞、IO多路復(fù)用,異,下面這篇文章主要給大家介紹了關(guān)于golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09
Go中時(shí)間與時(shí)區(qū)問(wèn)題的深入講解
go語(yǔ)言中如果不設(shè)置指定的時(shí)區(qū),通過(guò)time.Now()獲取到的就是本地時(shí)區(qū),下面這篇文章主要給大家介紹了關(guān)于Go中時(shí)間與時(shí)區(qū)問(wèn)題的相關(guān)資料,需要的朋友可以參考下2021-12-12

