亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

解決Golang并發(fā)工具Singleflight的問(wèn)題

 更新時(shí)間:2022年05月06日 14:52:06   作者:北戴河游泳  
前段時(shí)間在一個(gè)項(xiàng)目里使用到了分布式鎖進(jìn)行共享資源的訪(fǎng)問(wèn)限制,后來(lái)了解到Golang里還能夠使用singleflight對(duì)共享資源的訪(fǎng)問(wèn)做限制,于是利用空余時(shí)間了解,將知識(shí)沉淀下來(lái),并做分享

前言

前段時(shí)間在一個(gè)項(xiàng)目里使用到了分布式鎖進(jìn)行共享資源的訪(fǎng)問(wèn)限制,后來(lái)了解到Golang里還能夠使用singleflight對(duì)共享資源的訪(fǎng)問(wèn)做限制,于是利用空余時(shí)間了解,將知識(shí)沉淀下來(lái),并做分享

文章盡量用通俗的語(yǔ)言表達(dá)自己的理解,從入門(mén)demo開(kāi)始,結(jié)合源碼分析singleflight的重點(diǎn)方法,最后分享singleflight的實(shí)際使用方式與需要注意的“坑“。

定義

按照官方文檔的定義,singleflight 提供了一個(gè)重復(fù)的函數(shù)調(diào)用抑制機(jī)制

Package singleflight provides a duplicate function call suppression

用途

通俗的來(lái)說(shuō)就是 singleflight將相同的并發(fā)請(qǐng)求合并成一個(gè)請(qǐng)求,進(jìn)而減少對(duì)下層服務(wù)的壓力,通常用于解決緩存擊穿的問(wèn)題

  • 緩存擊穿是指: 在高并發(fā)的場(chǎng)景中,大量的request同時(shí)請(qǐng)求查詢(xún)一個(gè)共享資源(例如Redis緩存的key) ,如果這個(gè)共享資源正好過(guò)期失效了,就會(huì)導(dǎo)致大量相同的request都打到Redis下游的數(shù)據(jù)庫(kù),導(dǎo)致數(shù)據(jù)庫(kù)的負(fù)載上升。

簡(jiǎn)單Demo

var (
	sfKey1 = "key1"
	wg     *sync.WaitGroup
	sf     singleflight.Group
	nums   = 10
)
func getValueService(key string) { //service
   var val string
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ { // 模擬多協(xié)程同時(shí)請(qǐng)求
      go func(idx int) { // 注意for的一個(gè)小坑
         defer wg.Done()
         value, _ := getAndSetCacheNoChan(idx, key) //簡(jiǎn)化代碼,不處理error
         log.Printf("request %v get value: %v", idx, value)
         val = value
      }(idx)
   }
   wg.Wait()
   log.Println("val: ", val)
   return
}
// getValueBySingleflight 使用singleflight取cacheKey對(duì)應(yīng)的value值
func getValueBySingleflight(idx int, cacheKey string) (string, error) {
   log.Printf("idx %v into-cache...", idx)
   // 調(diào)用singleflight的Do()方法
   value, _, _ := sf.Do(cacheKey, func() (ret interface{}, err error) {
      log.Printf("idx %v is-setting-cache", idx)
      // 休眠0.1s以捕獲并發(fā)的相同請(qǐng)求
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   return value.(string), nil
}

看看實(shí)際效果

  • 由結(jié)果圖可以看到,索引=8的協(xié)程第一個(gè)進(jìn)入了Do()方法,其他協(xié)程則阻塞住,等到idx=8的協(xié)程拿到執(zhí)行結(jié)果后,協(xié)程以亂序的形式返回執(zhí)行結(jié)果。
  • 相同key的情況下,singleflight將我們的多個(gè)請(qǐng)求合并成1個(gè)請(qǐng)求。由1個(gè)請(qǐng)求去執(zhí)行對(duì)共享資源的操作。

源碼分析

結(jié)構(gòu)

type (
   Group struct { // singleflight實(shí)體
      mu sync.Mutex       // 互斥鎖
      m  map[string]*call // 懶加載
   }
   call struct {
      wg sync.WaitGroup
      // 存儲(chǔ) 調(diào)用singleflight.Do()方法返回的結(jié)果
      val interface{}
      err error
      // 調(diào)用singleflight.Forget(key)時(shí)將對(duì)應(yīng)的key從Group.m中刪除
      forgotten bool
      // 通俗的理解成singleflight合并的并發(fā)請(qǐng)求數(shù)
      dups  int
      // 存儲(chǔ) 調(diào)用singleflight.DoChan()方法返回的結(jié)果
      chans []chan<- Result
   }
   
   Result struct {
      Val    interface{}
      Err    error
      Shared bool
   }
)

對(duì)外暴露的方法

func Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)   
func DoChan(key string, fn func() (interface{}, error)) <-chan Result) 
// 將key從Group.m中刪除
func Forget(key string) 

DoChan()和Do()最大的區(qū)別是DoChan()屬于異步調(diào)用,返回一個(gè)channel,解決同步調(diào)用時(shí)的阻塞問(wèn)題

重點(diǎn)方法分析

Do

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock() // 加互斥鎖
   if g.m == nil { // 懶加載map
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok { // 檢查相同的請(qǐng)求已經(jīng)是否進(jìn)入過(guò)singleflight
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() // 調(diào)用waitGroup的wait()方法阻塞住本次調(diào)用,等待第一個(gè)進(jìn)入singleflight的請(qǐng)求執(zhí)行完畢拿到結(jié)果,將本次請(qǐng)求喚醒.
      if e, ok := c.err.(*panicError); ok { //如果調(diào)用完成,發(fā)生error ,將error上拋
         panic(e)
      } else if c.err == errGoexit {
         runtime.Goexit()
      }
      // 返回調(diào)用結(jié)果
      return c.val, c.err, true
   }
   c := new(call) // 相同的請(qǐng)求第一次進(jìn)入singleflight
   c.wg.Add(1)
   g.m[key] = c // new一個(gè)call實(shí)體,放入singleflight.call這個(gè)map
   g.mu.Unlock()
   g.doCall(c, key, fn) //實(shí)際執(zhí)行的函數(shù)
   return c.val, c.err, c.dups > 0
}

流程圖

由源碼可以分析出,最后實(shí)際執(zhí)行我們業(yè)務(wù)邏輯的函數(shù)其實(shí)是放到了doCall() 里,我們稍后分析這個(gè)函數(shù)

Forget

再簡(jiǎn)單看看Forget()函數(shù),很短.

func (g *Group) Forget(key string) {
   g.mu.Lock()
   if c, ok := g.m[key]; ok {
      c.forgotten = true // key的forgotten標(biāo)志位記為true
   }
   delete(g.m, key)  // Group.m中刪除對(duì)應(yīng)的key
   g.mu.Unlock()
}

doCall

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   normalReturn := false
   recovered := false
    //使用雙重defer來(lái)區(qū)分error的類(lèi)型: panic && runtime.error
   defer func() { 
      if !normalReturn && !recovered {
        // fn()發(fā)生了panic且fn()中的panic沒(méi)有被recover掉
        // errGoexit連接runtime.Goexit錯(cuò)誤
         c.err = errGoexit 
      }
      c.wg.Done()
      g.mu.Lock()
      defer g.mu.Unlock()
      if !c.forgotten { // 檢查key是否調(diào)用了Forget()
         delete(g.m, key)
      }
      if e, ok := c.err.(*panicError); ok {
         // 如果返回的是?panic?錯(cuò)誤,為了避免channel被永久阻塞,我們需要確保這個(gè)panic無(wú)法被recover
         if len(c.chans) > 0 {
            go panic(e)  // panic無(wú)法被恢復(fù)
            select {} // 阻塞本goroutinue.
         } else {
            panic(e)
         }
      } else {
         // 將結(jié)果正常地返回
         for _, ch := range c.chans {
            ch <- Result{c.val, c.err, c.dups > 0}
         }
      }
   }()
   func() {
      defer func() {
         if !normalReturn {
            // 表示fn()發(fā)生了panic()
            // 此時(shí)與panic相關(guān)的堆棧已經(jīng)被丟棄(調(diào)用的fn()) ,無(wú)法通過(guò)堆棧跟蹤去確定error類(lèi)型
            if r := recover(); r != nil {
               c.err = newPanicError(r) //new一個(gè)新的自定義panic err,往第一個(gè)defer拋
            }
         }
      }()
     // 執(zhí)行我們實(shí)際的業(yè)務(wù)邏輯,并將業(yè)務(wù)方法的返回值賦給singleflight.call
      c.val, c.err = fn()的val和err屬性
      // 如果fn()發(fā)生panic,normalReturn無(wú)法被賦值為true,而是進(jìn)入doCall()的第二個(gè)defer()
      normalReturn = true
   }()
   // 如果normalResult為false時(shí),表示fn()發(fā)生了panic
   // 但是執(zhí)行到了這一步,表示fn()中的panic被recover了
   if !normalReturn {
      recovered = true // recovered標(biāo)志位置為true
   }
}

由以上分析可以得出幾個(gè)重要的結(jié)論

  • singleflight主要使用sync.Mutex和sync.WaitGroup進(jìn)行并發(fā)控制.

  • 對(duì)于key相同的請(qǐng)求, singleflight只會(huì)處理的一個(gè)進(jìn)入的請(qǐng)求,后續(xù)的請(qǐng)求都會(huì)使用waitGroup.wait()將請(qǐng)求阻塞

  • 使用雙重defer()區(qū)分了panic和runtime.Goexit錯(cuò)誤,如果返回的是一個(gè)panic錯(cuò)誤,group.c.chans會(huì)發(fā)生阻塞,那么需要拋出這個(gè)panic且確保其無(wú)法被recover

實(shí)際使用

分享一段實(shí)際項(xiàng)目中使用singleflight結(jié)合本地緩存的代碼模版

func (s Service) getDataBySingleFlight(ctx  context.Context) (entity.List, error) {
    // 1. 從localCache查
    resData, err := local_cache.Get(ctx, key)
    if err != nil {
       log.Fatalln()
       return resData, err
    }
    if resData != nil {
       return resData, nil
    }
    // 2. localCache無(wú)數(shù)據(jù),從redis查
    resData, err = srv.rdsRepo.Get()
    if err != nil && err != redis.Nil {
       // redis錯(cuò)誤
       log.Fatalln()
       return resData, err
    } else if redis.Nil == err {
           // redis無(wú)數(shù)據(jù) ,查db
           resData, err, _ = singleFlight.Do(key, func() (interface{}, error) {
           // 構(gòu)建db查詢(xún)條件
          searchConn := entity.SearchInfo{}
           //  建議休眠0.1s 捕獲0.1s內(nèi)的重復(fù)請(qǐng)求
          time.Sleep(100 * time.Millisecond)
           // 4. 查db
          data, err := srv.dBRepo.GetByConn(ctx, searchConn)
          if err != nil {
             log.Fatalln()
             return data, err
          }
           // 5. 回寫(xiě)localCache && redisCache
          err = local_cache.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
          err = srv.rdsRepo.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
      // 返回db數(shù)據(jù),回寫(xiě)cache的error不上拋
      return data, nil
   })
   return resData, err
}
return resData, nil

弊端與解決方案

singleflight當(dāng)然不是解決問(wèn)題的銀彈,在使用的過(guò)程中有一些“坑”需要我們注意

  • Do()方法是一個(gè)同步調(diào)用的方法,無(wú)法處理下游服務(wù)調(diào)用的超時(shí)情況

解決方案:

使用singleflight的doChan()方法,在service層使用 channel+select 做超時(shí)控制.

func enterGetAndSetCacheWithChan(ctx context.Context, key string) (str string, err error) {
   tag := "enterGetAndSetCacheWithChan"
   sonCtx, _ := context.WithTimeout(ctx, 2 * time.Second)
   val := ""
   nums := 10 //協(xié)程數(shù)
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ {
      go func() {
         defer wg.Done()
         val, err = getAndSetCacheWithChan(sonCtx, idx, key)
         if err != nil {
            log.Printf("err:[%+v]", err)
            return
         }
         str = val
      }()
   }
   wg.Wait()
   log.Printf("tag:[%s] val:[%s]", tag, val)
   return
}
func getAndSetCacheWithChan(ctx context.Context, idx int, cacheKey string) (string, error) {
   tag := "getAndSetCacheWithChan"
   log.Printf("tag: %s ;idx %d into-cache...", tag, idx)
   ch := sf.DoChan(cacheKey, func() (ret interface{}, err error) { // do的入?yún)ey,可以直接使用緩存的key,這樣同一個(gè)緩存,只有一個(gè)協(xié)程會(huì)去讀DB
      log.Printf("idx %v is-setting-cache", idx)
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   for { // 選擇 context + select 超時(shí)控制
      select {
      case <-ctx.Done():
         return "", errors.New("ctx-timeout") // 根據(jù)業(yè)務(wù)邏輯選擇上拋 error
      case data, _ := <-ch:
         return data.Val.(string), nil
      default:
      }
   }
}
  • 如果第一個(gè)請(qǐng)求失敗了,那么所有等待的請(qǐng)求都會(huì)返回同一個(gè)error

解決方案

根據(jù)實(shí)際情況,結(jié)合下游服務(wù)調(diào)用耗時(shí)與下游實(shí)際能支持的QPS等數(shù)據(jù),對(duì)key做定時(shí)Forget()。

go func() {
       time.Sleep(100 * time.Millisecond)
       g.Forget(key)
   }()

參考文章

singleflight雙重defer: developer.51cto.com/article/652…

到此這篇關(guān)于Golang并發(fā)工具-Singleflight的文章就介紹到這了,更多相關(guān)Golang并發(fā)Singleflight內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go語(yǔ)言學(xué)習(xí)之映射(map)的用法詳解

    Go語(yǔ)言學(xué)習(xí)之映射(map)的用法詳解

    Map是一種無(wú)序的鍵值對(duì)的集合。這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言中映射的用法,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Go語(yǔ)言有一定的幫助,需要的可以參考一下
    2022-04-04
  • Golang使用Gin框架實(shí)現(xiàn)HTTP響應(yīng)格式統(tǒng)一處理

    Golang使用Gin框架實(shí)現(xiàn)HTTP響應(yīng)格式統(tǒng)一處理

    在gin框架中,我們可以定義一個(gè)中間件來(lái)處理統(tǒng)一的HTTP響應(yīng)格式,本文主要為大家介紹了具體是怎么定義實(shí)現(xiàn)這樣的中間件的,感興趣的小伙伴可以了解一下
    2023-07-07
  • Go讀取文件與寫(xiě)入文件的三種方法操作指南

    Go讀取文件與寫(xiě)入文件的三種方法操作指南

    在 Go 語(yǔ)言中也經(jīng)常會(huì)遇到操作文件的需求,下面這篇文章主要給大家介紹了關(guān)于Go讀取文件與寫(xiě)入文件的三種方法操作,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-09-09
  • go實(shí)現(xiàn)服務(wù)優(yōu)雅關(guān)閉的示例

    go實(shí)現(xiàn)服務(wù)優(yōu)雅關(guān)閉的示例

    本文主要介紹了go實(shí)現(xiàn)服務(wù)優(yōu)雅關(guān)閉的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • Go1.18都出泛型了速來(lái)圍觀(guān)

    Go1.18都出泛型了速來(lái)圍觀(guān)

    泛型允許程序員在強(qiáng)類(lèi)型程序設(shè)計(jì)語(yǔ)言中編寫(xiě)代碼時(shí)使用一些以后才指定的類(lèi)型,在實(shí)例化時(shí)作為參數(shù)指明這些類(lèi)型,本文通過(guò)例子給大家介紹下如何使用泛型,對(duì)Go1.18泛型相關(guān)知識(shí)感興趣的朋友一起看看吧
    2022-03-03
  • 使用Go中的Web3庫(kù)進(jìn)行區(qū)塊鏈開(kāi)發(fā)的案例

    使用Go中的Web3庫(kù)進(jìn)行區(qū)塊鏈開(kāi)發(fā)的案例

    區(qū)塊鏈作為一種分布式賬本技術(shù),在近年來(lái)取得了巨大的發(fā)展,而Golang作為一種高效、并發(fā)性強(qiáng)的編程語(yǔ)言,被廣泛用于區(qū)塊鏈開(kāi)發(fā)中,本文將介紹如何使用Golang中的Web3庫(kù)進(jìn)行區(qū)塊鏈開(kāi)發(fā),并提供一些實(shí)際案例,需要的朋友可以參考下
    2023-10-10
  • 深入理解Golang中WebSocket和WSS的支持

    深入理解Golang中WebSocket和WSS的支持

    本文主要介紹了深入理解Golang中WebSocket和WSS的支持,實(shí)現(xiàn)了Golang構(gòu)建WebSocket服務(wù)器和客戶(hù)端,并使用自簽名證書(shū)實(shí)現(xiàn)WSS的功能,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03
  • golang 中的 nil的場(chǎng)景分析

    golang 中的 nil的場(chǎng)景分析

    這篇文章主要介紹了golang 中的 nil,本文通過(guò)多種場(chǎng)景分析給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • Go1.16新特性embed打包靜態(tài)資源文件實(shí)現(xiàn)

    Go1.16新特性embed打包靜態(tài)資源文件實(shí)現(xiàn)

    這篇文章主要為大家介紹了Go?1.16新特性embed打包靜態(tài)資源文件的實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • Go實(shí)現(xiàn)共享庫(kù)的方法

    Go實(shí)現(xiàn)共享庫(kù)的方法

    本文主要介紹了Go實(shí)現(xiàn)共享庫(kù)的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02

最新評(píng)論