Java中常見的4種限流算法詳解
固定窗口
FixedWindowRateLimiter 類表示一個(gè)固定窗口限流器,使用 limit 和 interval 參數(shù)分別表示限制請(qǐng)求數(shù)量和時(shí)間間隔(毫秒)。在 allowRequest() 方法中,通過比較當(dāng)前時(shí)間與上一次請(qǐng)求時(shí)間來判斷是否需要重置請(qǐng)求數(shù)和上一次請(qǐng)求時(shí)間。如果請(qǐng)求數(shù)還沒有達(dá)到限制數(shù)量,允許請(qǐng)求并增加請(qǐng)求數(shù),否則拒絕請(qǐng)求。 缺點(diǎn):短時(shí)間內(nèi)可能會(huì)流量翻倍
public class FixedWindowRateLimiter { private final int limit; // 限制請(qǐng)求數(shù)量 private final AtomicInteger count; // 當(dāng)前請(qǐng)求數(shù) private final long interval; // 時(shí)間間隔(毫秒) private long lastRequestTime; // 上一次請(qǐng)求時(shí)間 public FixedWindowRateLimiter(int limit, long interval) { this.limit = limit; this.interval = interval; this.count = new AtomicInteger(0); this.lastRequestTime = System.currentTimeMillis(); } public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); if (currentTime - lastRequestTime > interval) { // 如果距離上一次請(qǐng)求時(shí)間已經(jīng)超過了時(shí)間間隔,重置請(qǐng)求數(shù)和上一次請(qǐng)求時(shí)間 count.set(0); lastRequestTime = currentTime; } // 如果請(qǐng)求數(shù)還沒有達(dá)到限制數(shù)量,允許請(qǐng)求并增加請(qǐng)求數(shù) if (count.get() < limit) { count.incrementAndGet(); return true; } return false; // 否則拒絕請(qǐng)求 } } // 使用示例 FixedWindowRateLimiter limiter = new FixedWindowRateLimiter(10, 1000); // 每秒最多處理10個(gè)請(qǐng)求 for (int i = 0; i < 20; i++) { // 嘗試發(fā)起20個(gè)請(qǐng)求 if (limiter.allowRequest()) { System.out.println("Allow request " + i); } else { System.out.println("Reject request " + i); } Thread.sleep(200); // 每次請(qǐng)求間隔200毫秒 }
滑動(dòng)窗口
相比于固定窗口,滑動(dòng)窗口有以下幾個(gè)好處:
- 平滑限流:滑動(dòng)窗口限流算法會(huì)以時(shí)間為軸將請(qǐng)求限制平均到每個(gè)時(shí)間段內(nèi),從而平滑了請(qǐng)求的涌入。相比于簡(jiǎn)單粗暴的限制請(qǐng)求的數(shù)量或速率等方式,這種平滑的限流方式能夠更好地保證服務(wù)的可用性和穩(wěn)定性。
- 精確控制:滑動(dòng)窗口限流算法可以根據(jù)具體的業(yè)務(wù)需要設(shè)置窗口大小和時(shí)間間隔,從而實(shí)現(xiàn)對(duì)請(qǐng)求的精確控制。通過適當(dāng)調(diào)整窗口大小和時(shí)間間隔,可以達(dá)到更好的限流效果。
關(guān)于這個(gè),我覺得sentinel中的滑動(dòng)窗口就非常的nice,下面是從sentinel中摘出來改一下的示例(同時(shí)也運(yùn)用在我本人的中間件內(nèi)),總得來說有三部分:
- 窗口存放的實(shí)體類(監(jiān)控指標(biāo)) HystrixEntity
- 窗口的定義 HystrixWindow
- 滑動(dòng)窗口具體實(shí)現(xiàn) HystrixWindowArray
// 窗口存放的實(shí)體類(監(jiān)控指標(biāo)) public class HystrixEntity { // 窗口請(qǐng)求數(shù) private AtomicInteger requestCount; // 窗口異常數(shù) private AtomicInteger errorCount; public HystrixEntity(){ this.requestCount=new AtomicInteger(0); this.errorCount=new AtomicInteger(0); } public int getRequestCountValue() { return requestCount.get(); } public int getErrorCountValue() { return errorCount.get(); } public void resetValue() { this.errorCount.set(0); this.requestCount.set(0); } public void addErrorCount(){ this.errorCount.addAndGet(1); } public void addRequestCount(){ this.requestCount.addAndGet(1); } } //窗口的定義 HystrixWindow public class HystrixWindow { // 窗口的長(zhǎng)度 單位:ms private final int windowLengthInMs; // 窗口的開始時(shí)間戳 單位:ms private long windowStartInMs; // 窗口內(nèi)存放的實(shí)體類 private HystrixEntity hystrixEntity; public HystrixWindow(int windowLengthInMs, long windowStartInMs, HystrixEntity hystrixEntity) { this.windowLengthInMs = windowLengthInMs; this.windowStartInMs = windowStartInMs; this.hystrixEntity = hystrixEntity; } public int getWindowLengthInMs() { return windowLengthInMs; } public long getWindowStartInMs() { return windowStartInMs; } public HystrixEntity getHystrixEntity() { return hystrixEntity; } public void setHystrixEntity(HystrixEntity hystrixEntity) { this.hystrixEntity = hystrixEntity; } /** * @Description 重置窗口 **/ public HystrixWindow resetTo(long startTime) { this.windowStartInMs = startTime; hystrixEntity.resetValue(); return this; } /** * @Description 判斷時(shí)間是否屬于該窗口 **/ public boolean isTimeInWindow(long timeMillis) { return windowStartInMs <= timeMillis && timeMillis < windowStartInMs + windowLengthInMs; } } //滑動(dòng)窗口具體實(shí)現(xiàn) public class HystrixWindowArray { // 單個(gè)窗口的長(zhǎng)度 private int windowLengthInMs; // 窗口數(shù)量 private int sampleCount; // 所有窗口的總長(zhǎng)度 private int intervalInMs; // 窗口數(shù)組 private final AtomicReferenceArray<HystrixWindow> array; /** * The conditional (predicate) update lock is used only when current bucket is deprecated. */ private final ReentrantLock updateLock = new ReentrantLock(); /** * @Param [sampleCount, intervalInMs] * sampleCount: 窗口數(shù)量 intervalInMs:所有窗口的總長(zhǎng)度 **/ public HystrixWindowArray(int sampleCount, int intervalInMs) { Assert.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); Assert.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); Assert.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * 獲取當(dāng)前時(shí)間所在的窗口下標(biāo)索引 */ private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); } /** * 獲取當(dāng)前時(shí)間所在的窗口開始時(shí)間 */ private long calculateWindowStart(long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } private HystrixEntity newEmptyWindowValue(long timeMillis){ return new HystrixEntity(); } /** * 獲取當(dāng)前窗口 */ public HystrixWindow currentWindow() { return currentWindow(System.currentTimeMillis()); } private HystrixWindow currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); while (true) { HystrixWindow old = array.get(idx); if (old == null) { // 如果獲取為空,說明窗口還沒創(chuàng)建,所以我們創(chuàng)建一個(gè)新的窗口(CAS保證線程安全) HystrixWindow window = new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis)); if (array.compareAndSet(idx, null, window)) { // 創(chuàng)建成功則返回當(dāng)前窗口 return window; } else { // 創(chuàng)建失敗說明發(fā)生了競(jìng)爭(zhēng),所以暫時(shí)先讓出CPU Thread.yield(); } } else if (windowStart == old.getWindowStartInMs()) { // 如果窗口已經(jīng)存在,則對(duì)比窗口的開始時(shí)間是否相同,相同說明是用同一個(gè)窗口,直接返回窗口就可以了 return old; } else if (windowStart > old.getWindowStartInMs()) { // 如果窗口已經(jīng)存在,而且窗口開始時(shí)間比之前的窗口開始時(shí)間要大 // 說明原來的窗口已經(jīng)過時(shí)了,需要替換一個(gè)新的窗口 // 所以加鎖防止競(jìng)爭(zhēng) if (updateLock.tryLock()) { try { // 這里我選擇直接重置之前的窗口() return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.getWindowStartInMs()) { // 窗口的開始時(shí)間比之前的窗口開始時(shí)間還會(huì)小,這種屬于異常情況 // 要是真出現(xiàn)了也只能新建一個(gè)窗口返回了 return new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis)); } } } /** * 獲取當(dāng)前窗口內(nèi)的值 */ public HystrixEntity getWindowValue() { return getWindowValue(System.currentTimeMillis()); } public HystrixEntity getWindowValue(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); HystrixWindow bucket = array.get(idx); if (bucket == null || !bucket.isTimeInWindow(timeMillis)) { return null; } return bucket.getHystrixEntity(); } /** * 重置一個(gè)窗口 */ private HystrixWindow resetWindowTo(HystrixWindow window, long startTime){ return window.resetTo(startTime); } public List<HystrixEntity> values() { return values(System.currentTimeMillis()); } private List<HystrixEntity> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<HystrixEntity>(); } int size = array.length(); List<HystrixEntity> result = new ArrayList<HystrixEntity>(size); for (int i = 0; i < size; i++) { HystrixWindow window = array.get(i); if (window == null || isWindowDeprecated(timeMillis, window)) { continue; } result.add(window.getHystrixEntity()); } return result; } /** * 判斷窗口是否有效 */ public boolean isWindowDeprecated(long time, HystrixWindow window) { return time - window.getWindowStartInMs() > intervalInMs; } }
使用示例:
// 初始化 代表監(jiān)控1s內(nèi)的指標(biāo) 窗口數(shù)為2 HystrixWindowArray hystrixWindowArray = new HystrixWindowArray(2, 1000); // 指標(biāo)監(jiān)控 數(shù)量+1 windowArray.currentWindow().getHystrixEntity().addErrorCount(); windowArray.currentWindow().getHystrixEntity().addRequestCount(); // 獲取所有窗口的指標(biāo)累計(jì) 判斷是否超標(biāo),也就是1s內(nèi)的總計(jì) List<HystrixEntity> windowValues = windowArray.values(); Integer errorCount = hystrixEntities.stream().map(HystrixEntity::getErrorCountValue).reduce(Integer::sum).get(); Integer requestCount = hystrixEntities.stream().map(HystrixEntity::getRequestCountValue).reduce(Integer::sum).get();
令牌桶算法
思想: 固定時(shí)間內(nèi)(例如 1 秒)通過一個(gè)桶來存儲(chǔ)令牌,每當(dāng)接收到一個(gè)請(qǐng)求時(shí)就會(huì)消耗一個(gè)令牌。如果請(qǐng)求過來時(shí)沒有令牌,則無法繼續(xù)處理該請(qǐng)求。在sentinel中被稱為冷啟動(dòng)
優(yōu)點(diǎn):
相比于漏桶算法,令牌桶算法具有更好的適應(yīng)性,可以應(yīng)對(duì)短時(shí)間內(nèi)的流量波動(dòng)。(漏桶算法只能處理恒定速率的流量)
代碼如下:
public class TokenBucket { private long lastTime; // 上次請(qǐng)求時(shí)間 private double rate; // 令牌放入速率 private long capacity; // 令牌桶容量 private long tokens; // 當(dāng)前令牌數(shù)量 public TokenBucket(double rate, long capacity) { this.lastTime = System.currentTimeMillis(); this.rate = rate; this.capacity = capacity; this.tokens = capacity; } public synchronized boolean getToken() { long now = System.currentTimeMillis(); long timeElapsed = now - lastTime; tokens += timeElapsed * rate; if (tokens > capacity) { tokens = capacity; } lastTime = now; if (tokens >= 1) { tokens--; return true; } else { return false; } } }
漏斗桶算法
漏桶算法(Leaky Bucket)是網(wǎng)絡(luò)世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)時(shí)經(jīng)常使用的一種算法,它的主要目的是控制數(shù)據(jù)注入到網(wǎng)絡(luò)的速率,平滑網(wǎng)絡(luò)上的突發(fā)流量。漏桶算法提供了一種機(jī)制,通過它,突發(fā)流量可以被整形以便為網(wǎng)絡(luò)提供一個(gè)穩(wěn)定的流量。在sentinel中被稱為勻速器
優(yōu)點(diǎn):
- 控制速率:漏斗桶算法可以限制數(shù)據(jù)流量的傳輸速率,確保各個(gè)環(huán)節(jié)的數(shù)據(jù)處理能力都得到了滿足,從而避免了系統(tǒng)因?yàn)閿?shù)據(jù)過多而導(dǎo)致的崩潰和癱瘓。
- 平滑輸出:漏斗桶算法通過將數(shù)據(jù)分散到不同的時(shí)間段內(nèi)進(jìn)行處理,使得數(shù)據(jù)傳輸?shù)妮敵龈悠椒€(wěn),不會(huì)出現(xiàn)明顯的波動(dòng),提高了網(wǎng)絡(luò)的穩(wěn)定性。
代碼如下:
public class LeakyBucket { private int capacity; //漏桶容量 private int rate; //漏水速率 private int water; //當(dāng)前水量 private Instant timestamp; //上次漏水時(shí)間 public LeakyBucket(int capacity, int rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.timestamp = Instant.now(); } public synchronized boolean allow() { //判斷是否允許通過 Instant now = Instant.now(); long duration = now.toEpochMilli() - timestamp.toEpochMilli(); //計(jì)算距上次漏水過去了多久 int outflow = (int) (duration * rate / 1000); //計(jì)算過去的時(shí)間內(nèi)漏出的水量 water = Math.max(0, water - outflow); //更新當(dāng)前水量,不能小于0 if (water < capacity) { //如果漏桶還沒滿,放行 water++; timestamp = now; return true; } return false; //否則拒絕通過 } }
到此這篇關(guān)于Java中常見的4種限流算法詳解的文章就介紹到這了,更多相關(guān)Java限流算法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Listener監(jiān)聽器使用規(guī)范詳細(xì)介紹
監(jiān)聽器是一個(gè)專門用于對(duì)其他對(duì)象身上發(fā)生的事件或狀態(tài)改變進(jìn)行監(jiān)聽和相應(yīng)處理的對(duì)象,當(dāng)被監(jiān)視的對(duì)象發(fā)生情況時(shí),立即采取相應(yīng)的行動(dòng)。監(jiān)聽器其實(shí)就是一個(gè)實(shí)現(xiàn)特定接口的普通java程序,這個(gè)程序?qū)iT用于監(jiān)聽另一個(gè)java對(duì)象的方法調(diào)用或?qū)傩愿淖?/div> 2023-01-01Java詳解如何將excel數(shù)據(jù)轉(zhuǎn)為樹形
在平常的辦公工作中,excel數(shù)據(jù)的操作是最常見的需求,今天就來看一下通過Java如何來實(shí)現(xiàn)將excel數(shù)據(jù)轉(zhuǎn)為樹形,感興趣的朋友可以了解下2022-08-08java打包maven啟動(dòng)報(bào)錯(cuò)jar中沒有主清單屬性
本文主要介紹了java打包maven啟動(dòng)報(bào)錯(cuò)jar中沒有主清單屬性,可能原因是創(chuàng)建springboot項(xiàng)目時(shí),自動(dòng)導(dǎo)入,下面就來介紹一下解決方法,感興趣的可以了解一下2024-03-03Idea 自動(dòng)生成測(cè)試的實(shí)現(xiàn)步驟
當(dāng)我們?cè)趯懲暌恍┙涌诜椒ê笮枰獪y(cè)試時(shí),一個(gè)一個(gè)新建測(cè)試類比較麻煩 idea給我們提供了快捷辦法,本文主要介紹了Idea 自動(dòng)生成測(cè)試的實(shí)現(xiàn)步驟,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05Java中Json字符串直接轉(zhuǎn)換為對(duì)象的方法(包括多層List集合)
下面小編就為大家?guī)硪黄狫ava中Json字符串直接轉(zhuǎn)換為對(duì)象的方法(包括多層List集合)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-08-08最新評(píng)論