亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Golang限流庫與漏桶和令牌桶的使用介紹

 更新時間:2023年03月30日 10:56:59   作者:捶捶自己  
這篇文章主要介紹了golang限流庫以及漏桶與令牌桶的實現(xiàn)原理,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

RateLimit 限流中間件

為什么需要限流中間件

在大數(shù)據(jù)量高并發(fā)訪問時,經(jīng)常會出現(xiàn)服務(wù)或接口面對大量的請求而導(dǎo)致數(shù)據(jù)庫崩潰的情況,甚至引發(fā)連鎖反映導(dǎo)致整個系統(tǒng)崩潰?;蛘哂腥藧阂夤艟W(wǎng)站,大量的無用請求出現(xiàn)會導(dǎo)致緩存穿透的情況出現(xiàn)。使用限流中間件可以在短時間內(nèi)對請求進行限制數(shù)量,起到降級的作用,從而保障了網(wǎng)站的安全性。

應(yīng)對大量并發(fā)請求的策略

  • 使用消息中間件進行統(tǒng)一限制(降速)
  • 使用限流方案將多余請求返回(限流)
  • 升級服務(wù)器
  • 負載均衡升級
  • 等等

可以看出在代碼已經(jīng)無法提升的情況下,只能去提升硬件水平?;蛘吒膭蛹軜?gòu)再加一層!也可以使用消息中間件統(tǒng)一處理。而結(jié)合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。

常見的限流方案

  • 令牌桶算法
  • 漏桶算法
  • 滑動窗口算法
  • 等等

這里主要根據(jù)golang的庫介紹令牌桶和漏桶的實現(xiàn)原理以及在實際項目中如何應(yīng)用。

漏桶

引入ratelimit庫

go get -u go.uber.org/ratelimit

庫函數(shù)源代碼

// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
	return newAtomicBased(rate, opts...)
}
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
	// TODO consider moving config building to the implementation
	// independent code.
	config := buildConfig(opts)
	perRequest := config.per / time.Duration(rate)
	l := &atomicLimiter{
		perRequest: perRequest,
		maxSlack:   -1 * time.Duration(config.slack) * perRequest,
		clock:      config.clock,
	}
	initialState := state{
		last:     time.Time{},
		sleepFor: 0,
	}
	atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
	return l
}

該函數(shù)使用了函數(shù)選項模式對多個結(jié)構(gòu)體對象進行初始化

首先根據(jù)傳入的值來初始化一個桶結(jié)構(gòu)體 rateint傳參 (time.Duration(rate)單位為納秒 = 1/1e9秒)

初始化過程中包括了

  • 每一滴水需要的時間 perquest = config.per / time.Duration(rate)
  • maxSlack 寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest 松緊度是用來規(guī)范等待時間的
// Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
   Now() time.Time
   Sleep(time.Duration)
}

同時還需要結(jié)構(gòu)體Clock來記錄當(dāng)前請求的時間now和此刻的請求所需要花費等待的時間sleep

type state struct {
   last     time.Time
   sleepFor time.Duration
}

state 主要用來記錄上次執(zhí)行的時間以及當(dāng)前執(zhí)行請求需要花費等待的時間(作為中間狀態(tài)記錄)

最重要的Take邏輯

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
   var (
      newState state
      taken    bool
      interval time.Duration
   )
   for !taken {
      now := t.clock.Now()
      previousStatePointer := atomic.LoadPointer(&t.state)
      oldState := (*state)(previousStatePointer)
      newState = state{
         last:     now,
         sleepFor: oldState.sleepFor,
      }
      // If this is our first request, then we allow it.
      if oldState.last.IsZero() {
         taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
         continue
      }
      // 計算是否需要進行等待取水操作
      newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當(dāng)前時間與上次取水時間的間隔)
       // 如果等待取水時間特別小,就需要松緊度進行維護
      if newState.sleepFor < t.maxSlack {
         newState.sleepFor = t.maxSlack
      }
       // 如果等待時間大于0,就進行更新
      if newState.sleepFor > 0 {
         newState.last = newState.last.Add(newState.sleepFor)
         interval, newState.sleepFor = newState.sleepFor, 0
      }
      taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
   }
   t.clock.Sleep(interval)
   // 最后返回需要等待的時間
    return newState.last
}

實現(xiàn)一個Take方法

  • 該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發(fā)請求下仍可以保證正常使用。
  • 記錄下當(dāng)前的時間 now := t.clock.Now()
  • oldState.last.IsZero() 判斷是不是第一次取水,如果是就直接將state結(jié)構(gòu)體中的值進行返回。而這個結(jié)構(gòu)體中初始化了上次執(zhí)行時間,如果是第一次取水就作為當(dāng)前時間直接傳參。
  • 如果 newState.sleepFor 非常小,就會出現(xiàn)問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。
  • 如果newState.sleepFor > 0 就直接更新結(jié)構(gòu)體中上次執(zhí)行時間newState.last = newState.last.Add(newState.sleepFor)并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0。
  • 如果允許取水和等待操作,那就說明沒有發(fā)生并發(fā)競爭的情況,就模擬睡眠時間t.clock.Sleep(interval)。然后將取水的目標時間進行返回,由服務(wù)端代碼來判斷是否打回響應(yīng)或者等待該時間后繼續(xù)響應(yīng)。

t.clock.Sleep(interval)

func (c *clock) Sleep(d time.Duration) {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> time.Sleep(d) }

實際上在一個請求來的時候,限流器就會進行睡眠對應(yīng)的時間,并在睡眠后將最新取水時間返回。

實際應(yīng)用(使用Gin框架)

func ratelimit1() func(ctx *gin.Context) {
	r1 := rate1.New(100)
	return func(ctx *gin.Context) {
		now := time.Now()
		//  Take 返回的是一個 time.Duration的時間
		if r1.Take().Sub(now) > 0 {
			// 返回的時間比當(dāng)前的時間還大,說明需要進行等待
			// 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
			// 如果不需要等待請求時間,就直接進行Abort 然后返回
			response(ctx, http.StatusRequestTimeout, "rate1 limit...")
			fmt.Println("rate1 limit...")
			ctx.Abort()
			return
		}
		// 放行
		ctx.Next()
	}
}

這里你可以進行選擇是否返回。因為Take一定會執(zhí)行sleep函數(shù),所以當(dāng)執(zhí)行take結(jié)束后表示當(dāng)前請求已經(jīng)接到了水。當(dāng)前演示使用第一種情況。

  • 如果你的業(yè)務(wù)要求響應(yīng)不允許進行等待。那么可以在該請求接完水之后然后,如上例。
  • 如果你的業(yè)務(wù)允許響應(yīng)等待,那么該請求等待對應(yīng)的接水時間后進行下一步。具體代碼就是將if中的內(nèi)容直接忽略。(建議使用)

測試代碼

這里定義了一個響應(yīng)函數(shù)和一個handler函數(shù)方便測試

func response(c *gin.Context, code int, info any) {
   c.JSON(code, info)
}
func pingHandler(c *gin.Context) {
   response(c, 200, "ping ok~")
}

執(zhí)行go test -run=Run -v先開啟一個web服務(wù)

func TestRun(t *testing.T) {
   r := gin.Default()
   r.GET("/ping1", ratelimit1(), pingHandler)
   r.GET("/ping2", ratelimit2(), helloHandler)
   _ = r.Run(":4399")
}

使用接口壓力測試工具go-wrk進行測試->tsliwowicz/go-wrk: go-wrk

golang install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest下載

使用幫助

   Usage: go-wrk <options> <url>
   Options:
    -H       Header to add to each request (you can define multiple -H flags) (Default )
    -M       HTTP method (Default GET)
    -T       Socket/request timeout in ms (Default 1000)
    -body    request body string or @filename (Default )
    -c       Number of goroutines to use (concurrent connections) (Default 10)
    -ca      CA file to verify peer against (SSL/TLS) (Default )
    -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
    -d       Duration of test in seconds (Default 10)
    -f       Playback file name (Default <empty>)
    -help    Print help (Default false)
    -host    Host Header (Default )
    -http    Use HTTP/2 (Default true)
    -key     Private key file name (SSL/TLS (Default )
    -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
    -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
    -no-vr   Skip verifying SSL certificate of the server (Default false)
    -redir   Allow Redirects (Default false)
    -v       Print version details (Default false)

-t 8個線程 -c 400個連接 -n 模擬1k次請求 -d 替換-n 表示連接時間

輸入go-wrk -t=8 -c=400 -n=1000 http://127.0.0.1:4399/ping1

可以稍微等待一下水流積攢否則一個請求也不一定能夠響應(yīng)。

可以看出,89個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應(yīng)。因此我認為既然內(nèi)部已經(jīng)睡眠,那么就應(yīng)該對請求放行處理。限流器實現(xiàn)的比較純粹!

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
   return NewBucketWithClock(fillInterval, capacity, nil)
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
   return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}

進行Bucket桶的初始化。

/ NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
// also has a clock argument that allows clients to fake the passing
// of time. If clock is nil, the system clock will be used.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
   if clock == nil {
      clock = realClock{}
   }
    // 填充速率
   if fillInterval <= 0 {
      panic("token bucket fill interval is not > 0")
   }
    // 最大令牌容量
   if capacity <= 0 {
      panic("token bucket capacity is not > 0")
   }
    // 單次令牌生成量
   if quantum <= 0 {
      panic("token bucket quantum is not > 0")
   }
   return &Bucket{
      clock:           clock,
      startTime:       clock.Now(),
      latestTick:      0,
      fillInterval:    fillInterval,
      capacity:        capacity,
      quantum:         quantum,
      availableTokens: capacity,
   }
}

令牌桶初始化過程,初始化結(jié)構(gòu)體 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當(dāng)前令牌數(shù)初始化為最大容量。

調(diào)用

// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
   tb.mu.Lock()
   defer tb.mu.Unlock()
   return tb.takeAvailable(tb.clock.Now(), count)
}

調(diào)用TakeAvailable函數(shù),傳入?yún)?shù)為需要取出的令牌數(shù)量,返回參數(shù)是實際能夠取出的令牌數(shù)量。

內(nèi)部實現(xiàn)

// takeAvailable is the internal version of TakeAvailable - it takes the
// current time as an argument to enable easy testing.
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
   // 如果需要取出的令牌數(shù)小于等于零,那么就返回0個令牌
    if count <= 0 {
      return 0
   }
    // 根據(jù)時間對當(dāng)前桶中令牌數(shù)進行計算
   tb.adjustavailableTokens(tb.currentTick(now))
    // 計算之后的令牌總數(shù)小于等于0,說明當(dāng)前令牌不足取出,那么就直接返回0個令牌
   if tb.availableTokens <= 0 {
      return 0
   }
    // 如果當(dāng)前存儲的令牌數(shù)量多于請求數(shù)量,那么就返回取出令牌數(shù)
   if count > tb.availableTokens {
      count = tb.availableTokens
   }
    // 調(diào)整令牌數(shù)
   tb.availableTokens -= count
   return count
}

調(diào)整令牌

// adjustavailableTokens adjusts the current number of tokens
// available in the bucket at the given time, which must
// be in the future (positive) with respect to tb.latestTick.
func (tb *Bucket) adjustavailableTokens(tick int64) {
   lastTick := tb.latestTick
   tb.latestTick = tick
    // 如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量
   if tb.availableTokens >= tb.capacity {
      return
   }
    // 當(dāng)前令牌數(shù) += (當(dāng)前時間 - 上次取出令牌數(shù)的時間) * quannum(每次生成令牌量)
   tb.availableTokens += (tick - lastTick) * tb.quantum
    // 如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù)
   if tb.availableTokens > tb.capacity {
      tb.availableTokens = tb.capacity
   }
   return
}

實現(xiàn)原理

加鎖 defer 解鎖

判斷count(想要取出的令牌數(shù)) 是否小于等于 0,如果是直接返回 0

調(diào)用函數(shù)adjustTokens 獲取可用的令牌數(shù)量,該函數(shù)實現(xiàn)原理:

  • 如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量
  • 當(dāng)前令牌數(shù) += (當(dāng)前時間 - 上次取出令牌數(shù)的時間) * quannum(每次生成令牌量)
  • 如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù)

如果當(dāng)前可以取出的令牌數(shù)小于等于0 直接返回 0

如果當(dāng)前可以取出的令牌數(shù)小于當(dāng)前想要取出的令牌數(shù)(count) count = 當(dāng)前可以取出的令牌數(shù)

當(dāng)前的令牌數(shù) -= 取出的令牌數(shù)(count)

返回 count

額外介紹

take函數(shù),能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函數(shù),可以根據(jù)最大等待時間來進行判斷。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因為他們內(nèi)部的實現(xiàn)都基于令牌調(diào)整,我這里不做過多介紹,如果感興趣可以自行研究一下。

測試

func ratelimit2() func(ctx *gin.Context) {
	// 生成速率 最大容量
	r2 := rate2.NewBucket(time.Second, 200)
	return func(ctx *gin.Context) {
		//r2.Take() // 允許欠賬,令牌不夠也可以接收請求
		if r2.TakeAvailable(1) == 1 {
			// 如果想要取出1個令牌并且能夠取出,就放行
			ctx.Next()
			return
		}
		response(ctx, http.StatusRequestTimeout, "rate2 limit...")
		ctx.Abort()
		return
	}
}

由于壓測速度過于快速,在實際過程中可以根據(jù)調(diào)整令牌生成速率來進行具體限流!

小結(jié)

令牌桶可以允許自己判斷請求是否繼續(xù),不用進行睡眠。而漏桶需要進行睡眠,并沒有提供方法讓程序員進行判斷是否放行。

個人用令牌桶還是多的,也可能是我對漏桶源碼的解析有誤,沒有看到相關(guān)的點。

到此這篇關(guān)于Golang限流庫與漏桶和令牌桶的使用介紹的文章就介紹到這了,更多相關(guān)Golang限流庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Golang工作池的使用實例講解

    Golang工作池的使用實例講解

    我們使用Go語言開發(fā)項目,常常會使用到goroutine;goroutine太多會造成系統(tǒng)占用過高或其他系統(tǒng)異常,我們可以將goroutine控制指定數(shù)量,且減少goroutine的創(chuàng)建,這就運用到Go工作池,下面就介紹和使用一下
    2023-02-02
  • Go語言學(xué)習(xí)之映射(map)的用法詳解

    Go語言學(xué)習(xí)之映射(map)的用法詳解

    Map是一種無序的鍵值對的集合。這篇文章主要為大家詳細介紹了Go語言中映射的用法,文中的示例代碼講解詳細,對我們學(xué)習(xí)Go語言有一定的幫助,需要的可以參考一下
    2022-04-04
  • 詳解Golang?ProtoBuf的基本語法總結(jié)

    詳解Golang?ProtoBuf的基本語法總結(jié)

    最近項目是采用微服務(wù)架構(gòu)開發(fā)的,各服務(wù)之間通過gPRC調(diào)用,基于ProtoBuf序列化協(xié)議進行數(shù)據(jù)通信,因此接觸學(xué)習(xí)了Protobuf,本文會對Protobuf的語法做下總結(jié),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助
    2022-10-10
  • 如何用Go判斷元素是否在切片中

    如何用Go判斷元素是否在切片中

    切片(Slice)是一個擁有相同類型元素的可變長度的序列,下面這篇文章主要給大家介紹了關(guān)于如何用Go判斷元素是否在切片中的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-06-06
  • Go Java算法之累加數(shù)示例詳解

    Go Java算法之累加數(shù)示例詳解

    這篇文章主要為大家介紹了Go Java算法之累加數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • Go語言hello world實例

    Go語言hello world實例

    這篇文章主要介紹了Go語言hello world實例,本文先是給出了hello world的代碼實例,然后對一些知識點和技巧做了解釋,需要的朋友可以參考下
    2014-10-10
  • Go函數(shù)使用(函數(shù)定義、函數(shù)聲明、函數(shù)調(diào)用等)

    Go函數(shù)使用(函數(shù)定義、函數(shù)聲明、函數(shù)調(diào)用等)

    本文主要介紹了Go函數(shù)使用,包括函數(shù)定義、函數(shù)聲明、函數(shù)調(diào)用、可變參數(shù)函數(shù)、匿名函數(shù)、遞歸函數(shù)、高階函數(shù)等,感興趣的可以了解一下
    2023-11-11
  • Mango?Cache緩存管理庫TinyLFU源碼解析

    Mango?Cache緩存管理庫TinyLFU源碼解析

    這篇文章主要為大家介紹了Mango?Cache緩存管理庫TinyLFU源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-09-09
  • Golang使用反射的動態(tài)方法調(diào)用詳解

    Golang使用反射的動態(tài)方法調(diào)用詳解

    Go是一種靜態(tài)類型的語言,提供了大量的安全性和性能。這篇文章主要和大家介紹一下Golang使用反射的動態(tài)方法調(diào)用,感興趣的小伙伴可以了解一下
    2023-03-03
  • Goland中Protobuf的安裝、配置和使用

    Goland中Protobuf的安裝、配置和使用

    本文記錄了mac環(huán)境下protobuf的編譯安裝,并通過一個示例來演示proto自動生成go代碼,本文使用的mac?os?12.3系統(tǒng),不建議使用homebrew安裝,系統(tǒng)版本太高,會安裝報錯,所以自己下載新版壓縮包編譯構(gòu)建安裝
    2022-05-05

最新評論