Go語言實現(xiàn)常見限流算法的示例代碼
用go語言嘗試計數(shù)器、滑動窗口、漏斗算法、令牌桶算法等算法
1. 計數(shù)器
計數(shù)器是一種最簡單限流算法,其原理就是:在一段時間間隔內(nèi),對請求進行計數(shù),與閥值進行比較判斷是否需要限流,一旦到了時間臨界點,將計數(shù)器清零。
- 可以在程序中設(shè)置一個變量
count
,當過來一個請求我就將這個數(shù)+1
,同時記錄請求時間。 - 當下一個請求來的時候判斷
count
的計數(shù)值是否超過設(shè)定的頻次,以及當前請求的時間和第一次請求時間是否在1
分鐘內(nèi)。 - 如果在
1
分鐘內(nèi)并且超過設(shè)定的頻次則證明請求過多,后面的請求就拒絕掉。 - 如果該請求與第一個請求的間隔時間大于計數(shù)周期,且
count
值還在限流范圍內(nèi),就重置count
。
常見使用redis。比如對每秒限流,可以每秒設(shè)置一個key(limit-2021-11-20-11:11:11、limit-2021-11-20-11:11:12)。首先獲取當前時間拼接一個如上的key。如果key存在且未超過某個閾值就自增,超過閾值就拒絕;如果key不存在就代表這新的一秒沒有請求則重置計數(shù)。
Go實現(xiàn)
type Counter struct { rate int // 計數(shù)周期內(nèi)最多允許的請求數(shù) begin time.Time // 計數(shù)開始時間 cycle time.Duration // 計數(shù)周期 count int // 計數(shù)周期內(nèi)累計收到的請求數(shù) lock sync.Mutex } func (l *Counter) Allow() bool { l.lock.Lock() defer l.lock.Unlock() if l.count == l.rate-1 { now := time.Now() if now.Sub(l.begin) >= l.cycle { //速度允許范圍內(nèi), 重置計數(shù)器 l.Reset(now) return true } else { return false } } else { //沒有達到速率限制,計數(shù)加1 l.count++ return true } } func (l *Counter) Set(r int, cycle time.Duration) { l.rate = r l.begin = time.Now() l.cycle = cycle l.count = 0 } func (l *Counter) Reset(t time.Time) { l.begin = t l.count = 0 } func main() { var lr Counter lr.Set(3, time.Second) // 1s內(nèi)最多請求3次 var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func(i int) { if lr.Allow() { log.Println("ok:", i) } else { log.Println("fail:", i) } wg.Done() }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() } /* 每秒三個 2021/11/20 16:22:39 ok: 0 2021/11/20 16:22:39 ok: 1 2021/11/20 16:22:39 ok: 2 2021/11/20 16:22:39 fail: 3 2021/11/20 16:22:40 fail: 4 2021/11/20 16:22:40 ok: 5 2021/11/20 16:22:40 ok: 6 2021/11/20 16:22:40 ok: 7 2021/11/20 16:22:41 ok: 8 2021/11/20 16:22:41 fail: 9 */
缺點
如果有個需求對于某個接口 /query
每分鐘最多允許訪問 200 次,假設(shè)有個用戶在第 59 秒的最后幾毫秒瞬間發(fā)送 200 個請求,當 59 秒結(jié)束后 Counter
清零了,他在下一秒的時候又發(fā)送 200 個請求。那么在 1 秒鐘內(nèi)這個用戶發(fā)送了 2 倍的請求,這個是符合我們的設(shè)計邏輯的,這也是計數(shù)器方法的設(shè)計缺陷,系統(tǒng)可能會承受惡意用戶的大量請求,甚至擊穿系統(tǒng)。
2. 滑動窗口
滑動窗口是針對計數(shù)器存在的臨界點缺陷,所謂 滑動窗口(Sliding window) 是一種流量控制技術(shù),這個詞出現(xiàn)在 TCP
協(xié)議中。滑動窗口把固定時間片進行劃分,并且隨著時間的流逝,進行移動,固定數(shù)量的可以移動的格子,進行計數(shù)并判斷閥值。
算法思想
上圖中我們用紅色的虛線代表一個時間窗口(一分鐘
),每個時間窗口有 6
個格子,每個格子是 10
秒鐘。每過 10
秒鐘時間窗口向右移動一格,可以看紅色箭頭的方向。我們?yōu)槊總€格子都設(shè)置一個獨立的計數(shù)器 Counter
,假如一個請求在 0:45
訪問了那么我們將第五個格子的計數(shù)器 +1
(也是就是 0:40~0:50
),在判斷限流的時候需要把所有格子的計數(shù)加起來和設(shè)定的頻次進行比較即可。
那么滑動窗口如何解決我們上面遇到的問題呢?來看下面的圖:
當用戶在0:59
秒鐘發(fā)送了 200
個請求就會被第六個格子的計數(shù)器記錄 +200
,當下一秒的時候時間窗口向右移動了一個,此時計數(shù)器已經(jīng)記錄了該用戶發(fā)送的 200
個請求,所以再發(fā)送的話就會觸發(fā)限流,則拒絕新的請求。
其實計數(shù)器就是滑動窗口啊,只不過只有一個格子而已,所以想讓限流做的更精確只需要劃分更多的格子就可以了,為了更精確我們也不知道到底該設(shè)置多少個格子,格子的數(shù)量影響著滑動窗口算法的精度,依然有時間片的概念,無法根本解決臨界點問題。
適用場景
與令牌桶一樣,有應(yīng)對突發(fā)流量的能力
Go實現(xiàn)
主要就是實現(xiàn)sliding window算法??梢詤⒖糂ilibili開源的kratos框架里circuit breaker用循環(huán)列表保存time slot對象的實現(xiàn),他們這個實現(xiàn)的好處是不用頻繁的創(chuàng)建和銷毀time slot對象。下面給出一個簡單的基本實現(xiàn):
var winMu map[string]*sync.RWMutex func init() { winMu = make(map[string]*sync.RWMutex) } type timeSlot struct { timestamp time.Time // 這個timeSlot的時間起點 count int // 落在這個timeSlot內(nèi)的請求數(shù) } func countReq(win []*timeSlot) int { var count int for _, ts := range win { count += ts.count } return count } type SlidingWindowLimiter struct { SlotDuration time.Duration // time slot的長度 WinDuration time.Duration // sliding window的長度 numSlots int // window內(nèi)最多有多少個slot maxReq int // win duration內(nèi)允許的最大請求數(shù) windows map[string][]*timeSlot } func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter { return &SlidingWindowLimiter{ SlotDuration: slotDuration, WinDuration: winDuration, numSlots: int(winDuration / slotDuration), windows: make(map[string][]*timeSlot), maxReq: maxReq, } } // 獲取user_id/ip的時間窗口 func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot { win, ok := l.windows[uidOrIp] if !ok { win = make([]*timeSlot, 0, l.numSlots) } return win } func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) { l.windows[uidOrIp] = win } func (l *SlidingWindowLimiter) validate(uidOrIp string) bool { // 同一user_id/ip并發(fā)安全 mu, ok := winMu[uidOrIp] if !ok { var m sync.RWMutex mu = &m winMu[uidOrIp] = mu } mu.Lock() defer mu.Unlock() win := l.getWindow(uidOrIp) now := time.Now() // 已經(jīng)過期的time slot移出時間窗 timeoutOffset := -1 for i, ts := range win { if ts.timestamp.Add(l.WinDuration).After(now) { break } timeoutOffset = i } if timeoutOffset > -1 { win = win[timeoutOffset+1:] } // 判斷請求是否超限 var result bool if countReq(win) < l.maxReq { result = true } // 記錄這次的請求數(shù) var lastSlot *timeSlot if len(win) > 0 { lastSlot = win[len(win)-1] if lastSlot.timestamp.Add(l.SlotDuration).Before(now) { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } else { lastSlot.count++ } } else { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } l.storeWindow(uidOrIp, win) return result } func (l *SlidingWindowLimiter) getUidOrIp() string { return "127.0.0.1" } func (l *SlidingWindowLimiter) IsLimited() bool { return !l.validate(l.getUidOrIp()) } func main() { limiter := NewSliding(100*time.Millisecond, time.Second, 10) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } time.Sleep(100 * time.Millisecond) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } fmt.Println(limiter.IsLimited()) for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } fmt.Println("a thousand years later...") time.Sleep(time.Second) for i := 0; i < 7; i++ { fmt.Println(limiter.IsLimited()) } for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } }
3. 漏桶算法
算法思想
與令牌桶是“反向”的算法,當有請求到來時先放到木桶中,worker以固定的速度從木桶中取出請求進行相應(yīng)。如果木桶已經(jīng)滿了,直接返回請求頻率超限的錯誤碼或者頁面
特點
漏桶算法有以下特點:
- 漏桶具有固定容量,出水速率是固定常量(流出請求)
- 如果桶是空的,則不需流出水滴
- 可以以任意速率流入水滴到漏桶(流入請求)
- 如果流入水滴超出了桶的容量,則流入的水滴溢出(新請求被拒絕)
漏桶限制的是常量流出速率(即流出速率是一個固定常量值),所以最大的速率就是出水的速率,不能出現(xiàn)突發(fā)流量。
適用場景
流量最均勻的限流方式,一般用于流量“整形”,例如保護數(shù)據(jù)庫的限流。先把對數(shù)據(jù)庫的訪問加入到木桶中,worker再以db能夠承受的qps從木桶中取出請求,去訪問數(shù)據(jù)庫。不太適合電商搶購和微博出現(xiàn)熱點事件等場景的限流,一是應(yīng)對突發(fā)流量不是很靈活,二是為每個user_id/ip維護一個隊列(木桶),workder從這些隊列中拉取任務(wù),資源的消耗會比較大。
Go實現(xiàn)
通常使用隊列來實現(xiàn),在go語言中可以通過buffered channel來快速實現(xiàn),任務(wù)加入channel,開啟一定數(shù)量的 worker 從 channel 中獲取任務(wù)執(zhí)行。
// 封裝業(yè)務(wù)邏輯的執(zhí)行結(jié)果 type Result struct { Msg string } // 執(zhí)行的業(yè)務(wù)邏輯函數(shù) type Handler func() Result // 每個請求來了,把需要執(zhí)行的業(yè)務(wù)邏輯封裝成Task,放入木桶,等待worker取出執(zhí)行 type Task struct { handler Handler // worker從木桶中取出請求對象后要執(zhí)行的業(yè)務(wù)邏輯函數(shù) resChan chan Result // 等待worker執(zhí)行并返回結(jié)果的channel taskID int } func NewTask(id int, handler Handler) Task { return Task{ handler: handler, resChan: make(chan Result), taskID: id, } } // 漏桶 type LeakyBucket struct { BucketSize int // 木桶的大小 WorkerNum int // 同時從木桶中獲取任務(wù)執(zhí)行的worker數(shù)量 bucket chan Task // 存方任務(wù)的木桶 } func NewLeakyBucket(bucketSize int, workNum int) *LeakyBucket { return &LeakyBucket{ BucketSize: bucketSize, WorkerNum: workNum, bucket: make(chan Task, bucketSize), } } func (b *LeakyBucket) AddTask(task Task) bool { // 如果木桶已經(jīng)滿了,返回false select { case b.bucket <- task: default: fmt.Printf("request[id=%d] is refused\n", task.taskID) return false } // 如果成功入桶,調(diào)用者會等待worker執(zhí)行結(jié)果 resp := <-task.resChan fmt.Printf("request[id=%d] is run ok, resp[%v]\n", task.taskID, resp) return true } func (b *LeakyBucket) Start(ctx context.Context) { // 開啟worker從木桶拉取任務(wù)執(zhí)行 for i := 0; i < b.WorkerNum; i++ { go func(ctx context.Context) { for { select { case <-ctx.Done(): return default: task := <-b.bucket result := task.handler() task.resChan <- result } } }(ctx) } } func main() { bucket := NewLeakyBucket(10, 4) ctx, cancel := context.WithCancel(context.Background()) defer cancel() bucket.Start(ctx) // 開啟消費者 // 模擬20個并發(fā)請求 var wg sync.WaitGroup wg.Add(20) for i := 0; i < 20; i++ { go func(id int) { defer wg.Done() task := NewTask(id, func() Result { time.Sleep(300 * time.Millisecond) return Result{} }) bucket.AddTask(task) }(i) } wg.Wait() time.Sleep(10 * time.Second) }
4. 令牌桶算法
算法思想
令牌桶算法(Token Bucket)
是網(wǎng)絡(luò)流量整形(Traffic Shaping)
和速率限制(Rate Limiting)
中最常使用的一種算法。典型情況下,令牌桶算法用來控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。想象有一個木桶,以固定的速度往木桶里加入令牌,木桶滿了則不再加入令牌。服務(wù)收到請求時嘗試從木桶中取出一個令牌,如果能夠得到令牌則繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯;如果沒有得到令牌,直接返回反問頻率超限的錯誤碼或頁面等,不繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯
特點
由于木桶內(nèi)只要有令牌,請求就可以被處理,所以令牌桶算法可以支持突發(fā)流量。同時由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以單位時間內(nèi)處理的請求數(shù)目也能夠得到控制,起到限流的目的。假設(shè)加入令牌的速度為 1token/10ms,桶的容量為500,在請求比較的少的時候(小于每10毫秒1個請求)時,木桶可以先"攢"一些令牌(最多500個)。當有突發(fā)流量時,一下把木桶內(nèi)的令牌取空,也就是有500個在并發(fā)執(zhí)行的業(yè)務(wù)邏輯,之后要等每10ms補充一個新的令牌才能接收一個新的請求。
木桶的容量 - 考慮業(yè)務(wù)邏輯的資源消耗和機器能承載并發(fā)處理多少業(yè)務(wù)邏輯。生成令牌的速度 - 太慢的話起不到“攢”令牌應(yīng)對突發(fā)流量的效果。
- 令牌按固定的速率被放入令牌桶中
- 桶中最多存放
B
個令牌,當桶滿時,新添加的令牌被丟棄或拒絕 - 如果桶中的令牌不足
N
個,則不會刪除令牌,且請求將被限流(丟棄或阻塞等待)
令牌桶限制的是平均流入速率(允許突發(fā)請求,只要有令牌就可以處理,支持一次拿3個令牌,4個令牌...),并允許一定程度突發(fā)流量。
適用場景
適合電商搶購或者微博出現(xiàn)熱點事件這種場景,因為在限流的同時可以應(yīng)對一定的突發(fā)流量。如果采用均勻速度處理請求的算法,在發(fā)生熱點時間的時候,會造成大量的用戶無法訪問,對用戶體驗的損害比較大。
Go實現(xiàn)
假設(shè)每100ms生產(chǎn)一個令牌,按user_id/IP記錄訪問最近一次訪問的時間戳 t_last 和令牌數(shù),每次請求時如果 now - last > 100ms, 增加 (now - last) / 100ms個令牌。然后,如果令牌數(shù) > 0,令牌數(shù) -1 繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯,否則返回請求頻率超限的錯誤碼或頁面。
type TokenBucket struct { lock sync.Mutex rate time.Duration // 多長時間生成一個令牌 capacity int // 桶的容量 tokens int // 桶中當前token數(shù)量 last time.Time // 桶上次放token的時間戳 s } func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket { return &TokenBucket{ capacity: bucketSize, rate: tokenRate, } } // 驗證是否能獲取一個令牌 返回是否被限流 func (t *TokenBucket) Allow() bool { t.lock.Lock() defer t.lock.Unlock() now := time.Now() if t.last.IsZero() { // 第一次訪問初始化為最大令牌數(shù) t.last, t.tokens = now, t.capacity } else { if t.last.Add(t.rate).Before(now) { // 如果 now 與上次請求的間隔超過了 token rate // 則增加令牌,更新last t.tokens += int(now.Sub(t.last) / t.rate) if t.tokens > t.capacity { t.tokens = t.capacity } t.last = now } } if t.tokens > 0 { // 如果令牌數(shù)大于0,取走一個令牌 t.tokens-- return true } // 沒有令牌,則拒絕 return false } func main() { tokenBucket := NewTokenBucket(5, 100*time.Millisecond) for i := 0; i < 10; i++ { fmt.Println(tokenBucket.Allow()) } time.Sleep(100 * time.Millisecond) fmt.Println(tokenBucket.Allow()) }
以上就是Go語言實現(xiàn)常見限流算法的示例代碼的詳細內(nèi)容,更多關(guān)于Go語言限流算法的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang的匿名函數(shù)和普通函數(shù)的區(qū)別解析
匿名函數(shù)是不具名的函數(shù),可以在不定義函數(shù)名的情況下直接使用,通常用于函數(shù)內(nèi)部的局部作用域中,這篇文章主要介紹了golang的匿名函數(shù)和普通函數(shù)的區(qū)別,需要的朋友可以參考下2023-03-03淺析Golang如何向已關(guān)閉的chan讀寫數(shù)據(jù)
這篇文章主要為大家詳細介紹了Golang如何向已關(guān)閉的chan讀寫數(shù)據(jù),文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2024-02-02