通俗易懂的Java常見限流算法具體實(shí)現(xiàn)
一、漏桶算法

1.漏桶算法的思想和原理
1.固定容量的漏桶:系統(tǒng)維護(hù)一個(gè)固定容量的漏桶,用來存放請求。
2.請求入桶:當(dāng)一個(gè)請求到達(dá)系統(tǒng)時(shí),相當(dāng)于將水倒入漏桶。如果漏桶已滿,多余的請求會(huì)被丟棄或拒絕。
3.恒定速率的出桶:漏桶以恒定的速率處理請求,就像漏斗中的水穩(wěn)定地漏出一樣。
4.平滑流量:通過漏桶的出水速率,可以平滑流入系統(tǒng)的請求,避免突發(fā)流量。
5.限流判斷:當(dāng)一個(gè)請求到達(dá)時(shí),會(huì)檢查漏桶是否已滿,如果漏桶已滿,則觸發(fā)限流機(jī)制,拒絕請求。
漏桶算法的實(shí)現(xiàn)步驟是,先聲明一個(gè)隊(duì)列用來保存請求,這個(gè)隊(duì)列相當(dāng)于漏斗,當(dāng)隊(duì)列容量滿了之后就放棄新來的請求,然后重新聲明一個(gè)線程定期(指定速率)從任務(wù)隊(duì)列中獲取一個(gè)或多個(gè)任務(wù)進(jìn)行執(zhí)行,這樣就實(shí)現(xiàn)了漏桶算法。
優(yōu)點(diǎn):可以有效控制流量,避免突發(fā)請求的沖擊,保持系統(tǒng)穩(wěn)定性;
缺點(diǎn):可能會(huì)影響請求響應(yīng)時(shí)間,且不使用大并發(fā)量的請求系統(tǒng);
2.具體實(shí)現(xiàn)
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class LeakyBucket {
private final long capacity; // 桶容量
private final long rate; // 漏水速率
private long water; // 當(dāng)前水量
private long lastLeakTime; // 上一次漏水時(shí)間
private final AtomicLong requestCount; // 請求計(jì)數(shù)
public LeakyBucket(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.lastLeakTime = System.currentTimeMillis();
this.requestCount = new AtomicLong(0);
//以固定的速率漏水
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
}
//限流
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastLeakTime;
water = Math.max(0, water - elapsedTime * rate); // 漏水
lastLeakTime = currentTime;
if (water < capacity) {
water++;
requestCount.incrementAndGet();
return true; // 請求通過
}
return false; // 漏桶已滿,限流
}
public long getRequestCount() {
return requestCount.get();
}
//以固定速率漏水
private synchronized void leakWater() {
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastLeakTime;
water = Math.max(0, water - elapsedTime * rate); // 漏水
lastLeakTime = currentTime;
}
public static void main(String[] args) {
// 創(chuàng)建一個(gè)容量為 10,速率為 2/S的漏桶
LeakyBucket leakyBucket = new LeakyBucket(10, 2);
// 模擬請求
for (int i = 0; i < 20; i++) {
boolean allowed = leakyBucket.allowRequest();
if (allowed) {
System.out.println("Request " + (i + 1) + ": Allowed");
} else {
System.out.println("Request " + (i + 1) + ": Limited");
}
}
// 輸出總請求數(shù)
System.out.println("Total requests: " + leakyBucket.getRequestCount());
}
}
二、令牌桶算法

1.令牌桶算法流程:
1.放入令牌到桶:按照固定的速率被放入令牌桶中,比如每秒放5個(gè)、10個(gè)、100個(gè)令牌到桶中。
2.獲取令牌:所有的請求在處理之前都需要拿到一個(gè)可用的令牌才會(huì)被處理。
3.令牌桶滿了拒絕:桶中最多能放1000個(gè)令牌,當(dāng)桶滿時(shí),就不能繼續(xù)放入了,新添加的令牌要么被丟棄,要么就直接拒絕。
優(yōu)點(diǎn):
1.避免了突發(fā)流量對系統(tǒng)的沖擊。
2.可以根據(jù)需求調(diào)整令牌生成速率和令牌桶的容量,以適應(yīng)不同的流星控制需求。
缺點(diǎn):1.不適合瞬時(shí)突發(fā)流量,令牌桶算法可能無法處理突然涌入的大量請求,因?yàn)榱钆仆暗牧钆粕伤俾适枪潭ǖ摹?/p>
2.如果請求需要等待令牌桶中的令牌,可能會(huì)導(dǎo)致一些請求的響應(yīng)時(shí)間增加。
2.具體實(shí)現(xiàn)
2.1 編程實(shí)現(xiàn)
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TokenBucket {
private final long capacity; // 令牌桶容量
private final long rate; // 令牌生成速率
private AtomicLong tokens; // 當(dāng)前令牌數(shù)量
private ScheduledExecutorService scheduler;
public TokenBucket(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::addToken, 0, 1, TimeUnit.SECONDS);
}
public boolean allowRequest() {
long currentTokens = tokens.get();
if (currentTokens > 0) {
tokens.decrementAndGet();
return true; // 有令牌,允許請求通過
}
return false; // 無令牌,限流
}
//添加令牌
private void addToken() {
long newTokens = Math.min(capacity, tokens.get() + rate);
tokens.set(newTokens);
}
public void shutdown() {
scheduler.shutdown();
}
public static void main(String[] args) {
TokenBucket tokenBucket = new TokenBucket(10, 2); // 創(chuàng)建容量為10,速率為2的令牌桶
// 模擬請求
for (int i = 0; i < 20; i++) {
boolean allowed = tokenBucket.allowRequest();
if (allowed) {
System.out.println("Request " + (i + 1) + ": Allowed");
} else {
System.out.println("Request " + (i + 1) + ": Limited");
}
}
tokenBucket.shutdown();
}
}
2.2 使用 Google 開源的 guava 包
(1)導(dǎo)入依賴
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
(2)代碼實(shí)現(xiàn)
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Limiter {
int NOT_LIMITED = 0;
String LIMIT_ERROR = "使用太頻繁了,稍后再試..." ;
/**
* 限流key,唯一
*
* @return
*/
String key() default "";
/**
* 時(shí)間單位內(nèi)允許的次數(shù)
*
* @return
*/
double qps() default NOT_LIMITED;
/**
* 最大等待時(shí)間
*
* @return
*/
int timeout() default NOT_LIMITED;
/**
* 最大等待時(shí)間單位
*
* @return
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.RateLimiter;
import com.hytera.annotation.Limiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import utils.IpUtil;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @Author: zt 2024/1/9 17:48
* @CreateTime: 2024/1/9 17:48
* @描述:限流
**/
@Slf4j
@Aspect
@Component
public class RateLimiterAspect {
private static final ConcurrentMap<String, RateLimiter> RATE_LIMITER_CACHE = new ConcurrentHashMap<>();
@Around("@annotation(com.hytera.annotation.Limiter)")
public Object pointcut(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Limiter rateLimiter = AnnotationUtils.findAnnotation(method, Limiter.class);
if (rateLimiter != null && rateLimiter.qps() > Limiter.NOT_LIMITED) {
double qps = rateLimiter.qps();
String ip = IpUtil.getIpAddress();
String key = StrUtil.isEmpty(rateLimiter.key())?method.getName()+"-"+IpUtil.getIpAddress():rateLimiter.key()+"-"+ ip;
RateLimiter limiter = RATE_LIMITER_CACHE.get(key);
if (limiter == null) {
RATE_LIMITER_CACHE.put(key, RateLimiter.create(qps));
log.debug("【{}】的QPS設(shè)置為: {}", method.getName(), RATE_LIMITER_CACHE.get(key).getRate());
}else {
//超時(shí)或者獲取不到令牌,則報(bào)錯(cuò)
boolean b = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit());
if (b) {
throw new RuntimeException(Limiter.LIMIT_ERROR);//自定義異常
}
}
}
return point.proceed();
}
}
三、Nginx限流
Nginx 提供了兩種限流手段:一是控制速率,二是控制并發(fā)連接數(shù)。
一、控制速率
我們需要使用 limit_req_zone 用來限制單位時(shí)間內(nèi)的請求數(shù),即速率限制,示例配置如下:
#限制每個(gè) IP 訪問的速度為 2r/s,因?yàn)?Nginx 的限流統(tǒng)計(jì)是基于毫秒的,我們設(shè)置的速度是 2r/s,轉(zhuǎn)換一下就是 500ms 內(nèi)單個(gè) IP 只允許通過 1 個(gè)請求,從 501ms 開始才允許通過第 2 個(gè)請求。
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server {
location / {
limit_req zone=mylimit;
}
}
#使用 burst 關(guān)鍵字,控制一個(gè) IP 單位總時(shí)間內(nèi)的總訪問次數(shù)
#burst=4,設(shè)置一個(gè)大小為4的緩沖區(qū)域,當(dāng)大量請求到來,請求數(shù)量超過限流頻率時(shí),將其放入緩沖區(qū)域
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server {
location / {
limit_req zone=mylimit burst=4;
}
}
二、控制并發(fā)連接數(shù)
#limit_conn perip 10 表示限制單個(gè) IP 同時(shí)最多能持有 10 個(gè)連接;
#limit_conn perserver 100 表示 server 同時(shí)能處理并發(fā)連接的總數(shù)為 100 個(gè)。
limit_conn_zone $binary_remote_addr zone=perip:10m;
limit_conn_zone $server_name zone=perserver:10m;
server {
...
limit_conn perip 10;
limit_conn perserver 100;
}
四、Redis+Lua限流
1.Lua介紹
Lua 是一種輕量小巧的腳本語言,用標(biāo)準(zhǔn)C語言編寫并以源代碼形式開放, 其設(shè)計(jì)目的是為了嵌入應(yīng)用程序中,從而為應(yīng)用程序提供靈活的擴(kuò)展和定制功。
2.Lua優(yōu)勢:
(1)減少網(wǎng)絡(luò)開銷: 不使用 Lua 的代碼需要向 Redis 發(fā)送多次請求, 而腳本只需一次即可, 減少網(wǎng)絡(luò)傳輸;
(2)原子操作: Redis 將整個(gè)腳本作為一個(gè)原子執(zhí)行, 無需擔(dān)心并發(fā), 也就無需事務(wù);(3)復(fù)用: 腳本會(huì)永久保存 Redis 中, 其他客戶端可繼續(xù)使用。
3.具體實(shí)現(xiàn):
(1)編寫Lua腳本(將其放在resources/scripts/redis目錄下):
-- 下標(biāo)從 1 開始
local key = KEYS[1]
local now = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local expired = tonumber(ARGV[3])
-- 最大訪問量
local max = tonumber(ARGV[4])
-- 清除過期的數(shù)據(jù)
-- 移除指定分?jǐn)?shù)區(qū)間內(nèi)的所有元素,expired 即已經(jīng)過期的 score
-- 根據(jù)當(dāng)前時(shí)間毫秒數(shù) - 超時(shí)毫秒數(shù),得到過期時(shí)間 expired
redis.call('zremrangebyscore', key, 0, expired)
-- 獲取 zset 中的當(dāng)前元素個(gè)數(shù)
local current = tonumber(redis.call('zcard', key))
local next = current + 1
if next > max then
-- 達(dá)到限流大小 返回 0
return 0;
else
-- 往 zset 中添加一個(gè)值、得分均為當(dāng)前時(shí)間戳的元素,[value,score]
redis.call("zadd", key, now, now)
-- 每次訪問均重新設(shè)置 zset 的過期時(shí)間,單位毫秒
redis.call("pexpire", key, ttl)
return next
end
(2)代碼實(shí)現(xiàn):
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("unchecked")
public RedisScript<Long> limitRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/redis/limit.lua")));
redisScript.setResultType(Long.class);
return redisScript;
}
}
import org.springframework.core.annotation.AliasFor;
import org.springframework.core.annotation.AnnotationUtils;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
long DEFAULT_REQUEST = 10;
/**
* max 最大請求數(shù)
*/
@AliasFor("max") long value() default DEFAULT_REQUEST;
/**
* 限流key
*/
String key() default "";
/**
* 超時(shí)時(shí)長,默認(rèn)1分鐘
*/
long timeout() default 1;
/**
* 超時(shí)時(shí)間單位,默認(rèn) 分鐘
*/
TimeUnit timeUnit() default TimeUnit.MINUTES;
}
import cn.hutool.core.util.StrUtil;
import com.xkcoding.ratelimit.redis.annotation.RateLimiter;
import com.xkcoding.ratelimit.redis.util.IpUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* <p>
* 限流切面
* </p>
*
* @author yangkai.shen
* @date Created in 2019-09-30 10:30
*/
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class RateLimiterAspect {
private final static String SEPARATOR = ":";
private final static String REDIS_LIMIT_KEY_PREFIX = "limit:";
private final StringRedisTemplate stringRedisTemplate;
private final RedisScript<Long> limitRedisScript;
@Around("@annotation(com.xkcoding.ratelimit.redis.annotation.RateLimiter)")
public Object pointcut(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
// 通過 AnnotationUtils.findAnnotation 獲取 RateLimiter 注解
RateLimiter rateLimiter = AnnotationUtils.findAnnotation(method, RateLimiter.class);
if (rateLimiter != null) {
String key = rateLimiter.key();
// 默認(rèn)用類名+方法名做限流的 key 前綴
if (StrUtil.isBlank(key)) {
key = method.getDeclaringClass().getName() + StrUtil.DOT + method.getName();
}
// 最終限流的 key 為 前綴 + IP地址
key = key + SEPARATOR + IpUtil.getIpAddr();
long max = rateLimiter.max();
long timeout = rateLimiter.timeout();
TimeUnit timeUnit = rateLimiter.timeUnit();
boolean limited = shouldLimited(key, max, timeout, timeUnit);
if (limited) {
throw new RuntimeException("手速太快了,慢點(diǎn)兒吧~");
}
}
return point.proceed();
}
private boolean shouldLimited(String key, long max, long timeout, TimeUnit timeUnit) {
// 最終的 key 格式為:
// limit:自定義key:IP
// limit:類名.方法名:IP
key = REDIS_LIMIT_KEY_PREFIX + key;
// 統(tǒng)一使用單位毫秒
long ttl = timeUnit.toMillis(timeout);
// 當(dāng)前時(shí)間毫秒數(shù)
long now = Instant.now().toEpochMilli();
long expired = now - ttl;
Long executeTimes = stringRedisTemplate.execute(limitRedisScript, Collections.singletonList(key), now + "", ttl + "", expired + "", max + "");
if (executeTimes != null) {
if (executeTimes == 0) {
log.error("【{}】在單位時(shí)間 {} 毫秒內(nèi)已達(dá)到訪問上限,當(dāng)前接口上限 {}", key, ttl, max);
return true;
} else {
log.info("【{}】在單位時(shí)間 {} 毫秒內(nèi)訪問 {} 次", key, ttl, executeTimes);
return false;
}
}
return false;
}總結(jié)
到此這篇關(guān)于Java常見限流算法具體實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java常見限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot通過注解、接口創(chuàng)建定時(shí)任務(wù)詳解
使用SpringBoot創(chuàng)建定時(shí)任務(wù)其實(shí)是挺簡單的,這篇文章主要給大家介紹了關(guān)于springboot如何通過注解、接口創(chuàng)建這兩種方法實(shí)現(xiàn)定時(shí)任務(wù)的相關(guān)資料,需要的朋友可以參考下2021-07-07
JAVA中的函數(shù)式接口Function和BiFunction詳解
這篇文章主要介紹了JAVA中的函數(shù)式接口Function和BiFunction詳解,JDK的函數(shù)式接口都加上了@FunctionalInterface注解進(jìn)行標(biāo)識,但是無論是否加上該注解只要接口中只有一個(gè)抽象方法,都是函數(shù)式接口,需要的朋友可以參考下2024-01-01
關(guān)于SpringBoot Actuator漏洞補(bǔ)救方案
SpringBoot Actuator模塊提供了健康檢查,審計(jì),指標(biāo)收集,HTTP 跟蹤等,是幫助我們監(jiān)控和管理SpringBoot應(yīng)用的模塊,本文將主要介紹SpringBoot Actuator漏洞的補(bǔ)救方案,需要的朋友可以參考下2023-06-06
Jenkins如何使用DockerFile自動(dòng)部署Java項(xiàng)目
這篇文章主要介紹了Jenkins如何使用DockerFile自動(dòng)部署Java項(xiàng)目,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08
基于strict-origin-when-cross-origin問題的解決
這篇文章主要介紹了基于strict-origin-when-cross-origin問題的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
Java實(shí)現(xiàn)讀取和寫入properties文件
這篇文章主要介紹了Java實(shí)現(xiàn)讀取和寫入properties文件方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
這篇文章主要介紹了SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收功能,我們使用主流的SpringBoot框架整合RocketMQ來講解,使用方便快捷,本文分步驟給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06

