redis+lua實現(xiàn)分布式限流的示例
為什么使用redis+lua實現(xiàn)分布式限流
- 原子性:通過Lua腳本執(zhí)行限流邏輯,所有操作在一個原子上下文中完成,避免了多步操作導(dǎo)致的并發(fā)問題。
- 靈活性:Lua腳本可以編寫復(fù)雜的邏輯,比如滑動窗口限流,易于擴展和定制化。
- 性能:由于所有邏輯在Redis服務(wù)器端執(zhí)行,減少了網(wǎng)絡(luò)往返,提高了執(zhí)行效率。
使用ZSET也可以實現(xiàn)限流,為什么選擇lua的方式
使用zset需要額度解決這些問題
- 并發(fā)控制:需要額外的邏輯來保證操作的原子性和準(zhǔn)確性,可能需要配合Lua腳本或Lua腳本+WATCH/MULTI/EXEC模式來實現(xiàn)。
- 資源消耗:長期存儲請求記錄可能導(dǎo)致Redis占用更多的內(nèi)存資源。
為什么redis+zset不能保證原子性和準(zhǔn)確性
- 多步驟操作:滑動窗口限流通常需要執(zhí)行多個步驟,比如檢查當(dāng)前窗口的請求次數(shù)、添加新的請求記錄、可能還需要刪除過期的請求記錄等。這些操作如果分開執(zhí)行,就有可能在多線程或多進程環(huán)境下出現(xiàn)不一致的情況。
- 非原子性復(fù)合操作:雖然單個Redis命令是原子的,但當(dāng)你需要執(zhí)行一系列操作來維持限流狀態(tài)時(例如,先檢查計數(shù)、再增加計數(shù)、最后可能還要刪除舊記錄),沒有一個單一的Redis命令能完成這些復(fù)合操作。如果在這系列操作之間有其他客戶端修改了數(shù)據(jù),就會導(dǎo)致限流不準(zhǔn)確。
- 競爭條件:在高并發(fā)環(huán)境下,多個客戶端可能幾乎同時執(zhí)行限流檢查和增加請求的操作,如果沒有適當(dāng)?shù)耐綑C制,可能會導(dǎo)致請求計數(shù)錯誤。
實現(xiàn)
依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.kang</groupId> <artifactId>rate-limiter-project</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rate-limiter-project</name> <description>rate-limiter-project</description> <properties> <java.version>8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.0.1-jre</version> <!-- 請檢查最新版本 --> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
lua腳本
-- KEYS[1] 是Redis中存儲計數(shù)的key,,, local key = KEYS[1] -- ARGV[1]是當(dāng)前時間戳-[當(dāng)前時間戳] local now = tonumber(ARGV[1]) -- ARGV[2]是最大請求次數(shù)-[最大請求次數(shù)] local maxRequests = tonumber(ARGV[2]) -- ARGV[3]是時間窗口長度-[時間窗口長度] local windowSize = tonumber(ARGV[3]) -- 獲取當(dāng)前時間窗口的起始時間 local windowStart = math.floor(now / windowSize) * windowSize -- 構(gòu)建時間窗口內(nèi)的key,用于區(qū)分不同窗口的計數(shù) local windowKey = key .. ':' .. tostring(windowStart) -- 獲取當(dāng)前窗口的計數(shù) local currentCount = tonumber(redis.call('get', windowKey) or '0') -- 如果當(dāng)前時間不在窗口內(nèi),重置計數(shù) if now > windowStart + windowSize then redis.call('del', windowKey) currentCount = 0 end -- 檢查是否超過限制 if currentCount + 1 <= maxRequests then -- 未超過,增加計數(shù)并返回成功,并設(shè)置鍵的過期時間為窗口剩余時間,以自動清理過期數(shù)據(jù)。如果超過最大請求次數(shù),則拒絕請求 redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart)) return 1 -- 成功 else return 0 -- 失敗 end
yaml
server: port: 10086 spring: redis: host: 127.0.0.1 port: 6379 database: 0 lettuce: pool: max-active: 20 max-idle: 10 min-idle: 5
代碼實現(xiàn)
啟動類
package com.kang.limter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @Slf4j @SpringBootApplication public class RateLimiterProjectApplication { public static void main(String[] args) { SpringApplication.run(RateLimiterProjectApplication.class, args); log.info("RateLimiterProjectApplication start success"); } }
CacheConfig
package com.kang.limter.cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.kang.limter.utils.LuaScriptUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH; /** * @Author Emperor Kang * @ClassName CacheConfig * @Description 緩存配置 * @Date 2024/6/13 10:07 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ @Slf4j @Configuration public class CacheConfig { /** * 緩存配置,加載lua腳本 * @return */ @Bean(name = "rateLimiterLuaCache") public LoadingCache<String, String> rateLimiterLuaCache() { LoadingCache<String, String> cache = CacheBuilder.newBuilder() // 設(shè)置緩存的最大容量,最多100個鍵值對 .maximumSize(100) // 設(shè)置緩存項過期策略:寫入后2小時過期 .expireAfterWrite(2, TimeUnit.HOURS) // 緩存統(tǒng)計信息記錄 .recordStats() // 構(gòu)建緩存加載器,用于加載緩存項的值 .build(new CacheLoader<String, String>() { @Override public String load(String scriptPath) throws Exception { try { return LuaScriptUtils.loadLuaScript(scriptPath); } catch (Exception e) { log.error("加載lua腳本失敗:{}", e.getMessage()); return null; } } }); // 預(yù)熱緩存 warmUpCache(cache); return cache; } /** * 預(yù)熱緩存 */ private void warmUpCache(LoadingCache<String, String> cache) { try { // 假設(shè)我們有一個已知的腳本列表需要預(yù)熱 List<String> knownScripts = Collections.singletonList(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH); for (String script : knownScripts) { String luaScript = LuaScriptUtils.loadLuaScript(script); // 手動初始化緩存 cache.put(script, luaScript); log.info("預(yù)加載Lua腳本成功: {}, length: {}", script, luaScript.length()); } } catch (Exception e) { log.error("預(yù)加載Lua腳本失敗: {}", e.getMessage(), e); } } }
- 這里使用緩存預(yù)熱加快lua腳本的加載速度,基于JVM內(nèi)存操作,所以很快
SystemConstant
package com.kang.limter.constant; /** * @Author Emperor Kang * @ClassName SystemConstant * @Description 系統(tǒng)常量 * @Date 2024/6/12 19:25 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ public class SystemConstant { /** * 限流配置緩存key前綴 */ public static final String REDIS_RATE_LIMITER_KEY_PREFIX = "outreach:config:limiter:%s"; /** * 限流lua腳本路徑 */ public static final String REDIS_RATE_LIMITER_LUA_SCRIPT_PATH = "classpath:lua/rate_limiter.lua"; }
RateLimiterController
package com.kang.limter.controller; import com.kang.limter.dto.RateLimiterRequestDto; import com.kang.limter.utils.RateLimiterUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import static java.lang.Thread.sleep; /** * @Author Emperor Kang * @ClassName RateLimiterController * @Description TODO * @Date 2024/6/12 19:33 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ @Slf4j @RestController @RequestMapping("/rate/limiter") public class RateLimiterController { @Autowired private RateLimiterUtil rateLimiterUtil; @PostMapping("/test") public String test(@RequestBody RateLimiterRequestDto rateLimiterRequestDto) { // 是否限流 if (!rateLimiterUtil.tryAcquire(rateLimiterRequestDto.getInterfaceCode(), 5, 1000)) { log.info("觸發(fā)限流策略,InterfaceCode:{}", rateLimiterRequestDto.getInterfaceCode()); return "我被限流了InterfaceCode:" + rateLimiterRequestDto.getInterfaceCode(); } log.info("請求參數(shù):{}", rateLimiterRequestDto); try { log.info("開始加工邏輯"); sleep(1000); } catch (InterruptedException e) { log.error("休眠異常"); Thread.currentThread().interrupt(); return "加工異常"; } return "加工成功,成功返回"; } }
RateLimiterRequestDto
package com.kang.limter.dto; import lombok.Data; /** * @Author Emperor Kang * @ClassName RateLimiterRequestDto * @Description TODO * @Date 2024/6/12 19:39 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ @Data public class RateLimiterRequestDto { /** * 接口編碼 */ private String interfaceCode; }
ResourceLoaderException
package com.kang.limter.exception; /** * @Author Emperor Kang * @ClassName ResourceLoaderException * @Description 自定義資源加載異常 * @Date 2024/6/12 18:10 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ public class ResourceLoaderException extends Exception{ public ResourceLoaderException() { super(); } public ResourceLoaderException(String message) { super(message); } public ResourceLoaderException(String message, Throwable cause) { super(message, cause); } public ResourceLoaderException(Throwable cause) { super(cause); } protected ResourceLoaderException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }
LuaScriptUtils
package com.kang.limter.utils; import com.kang.limter.exception.ResourceLoaderException; import lombok.extern.slf4j.Slf4j; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.util.Assert; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; @Slf4j public class LuaScriptUtils { /** * 從類路徑下讀取Lua腳本內(nèi)容。 * @param scriptPath 類路徑下的Lua腳本文件路徑 * @return Lua腳本的文本內(nèi)容 */ public static String loadLuaScript(String scriptPath) throws ResourceLoaderException { Assert.notNull(scriptPath, "script path must not be null"); try { // 讀取lua腳本 ResourceLoader resourceLoader = new DefaultResourceLoader(); Resource resource = resourceLoader.getResource(scriptPath); try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8))) { StringBuilder scriptBuilder = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { scriptBuilder.append(line).append("\n"); } String lua = scriptBuilder.toString(); log.debug("讀取的lua腳本為: {}", lua); return lua; } } catch (Exception e) { log.error("Failed to load Lua script from path: {}", scriptPath, e); throw new ResourceLoaderException("Failed to load Lua script from path: " + scriptPath, e); } } }
RateLimiterUtil
package com.kang.limter.utils; import com.google.common.cache.LoadingCache; import com.kang.limter.exception.ResourceLoaderException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import java.nio.charset.StandardCharsets; import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_KEY_PREFIX; import static com.kang.limter.constant.SystemConstant.REDIS_RATE_LIMITER_LUA_SCRIPT_PATH; /** * @Author Emperor Kang * @ClassName RateLimiterUtil * @Description 限流工具類 * @Date 2024/6/12 17:56 * @Version 1.0 * @Motto 讓營地比你來時更干凈 */ @Slf4j @Component public class RateLimiterUtil { @Autowired private StringRedisTemplate redisTemplate; @Autowired @Qualifier("rateLimiterLuaCache") private LoadingCache<String, String> rateLimiterLuaCache; /** * @param interfaceCode 接口標(biāo)識 * @param maxRequests 最大請求數(shù) * @param windowSizeMs 窗口大小 * @return boolean * @Description 嘗試獲取令牌 * @Author Emperor Kang * @Date 2024/6/12 17:57 * @Version 1.0 */ public boolean tryAcquire(String interfaceCode, int maxRequests, long windowSizeMs) { try { long currentTimeMillis = System.currentTimeMillis(); String luaScript = rateLimiterLuaCache.get(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH); log.info("緩存查詢lua,length={}", luaScript.length()); if(StringUtils.isBlank(luaScript)){ log.info("從緩存中未獲取到lua腳本,嘗試手動讀取"); luaScript = LuaScriptUtils.loadLuaScript(REDIS_RATE_LIMITER_LUA_SCRIPT_PATH); } // 二次確認(rèn) if(StringUtils.isBlank(luaScript)){ log.info("lua腳本加載失敗,暫時放棄獲取許可,不再限流"); return true; } // 限流核心邏輯 String finalLuaScript = luaScript; Long result = redisTemplate.execute((RedisCallback<Long>) connection -> { // 用于存儲的key byte[] key = String.format(REDIS_RATE_LIMITER_KEY_PREFIX, interfaceCode).getBytes(StandardCharsets.UTF_8); // 當(dāng)前時間(毫秒) byte[] now = String.valueOf(currentTimeMillis).getBytes(StandardCharsets.UTF_8); // 最大請求數(shù) byte[] maxRequestsBytes = String.valueOf(maxRequests).getBytes(StandardCharsets.UTF_8); // 窗口大小 byte[] windowSizeBytes = String.valueOf(windowSizeMs).getBytes(StandardCharsets.UTF_8); // 執(zhí)行l(wèi)ua腳本 return connection.eval(finalLuaScript.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, key, now, maxRequestsBytes, windowSizeBytes); }); Assert.notNull(result, "執(zhí)行l(wèi)ua腳本響應(yīng)結(jié)果為null"); // 獲取結(jié)果 return result == 1L; } catch (ResourceLoaderException e) { log.error("加載lua腳本失敗", e); } catch (Exception e){ log.error("執(zhí)行限流邏輯異常", e); } return true; } }
lua腳本
-- KEYS[1] 是Redis中存儲計數(shù)的key,,, local key = KEYS[1] -- ARGV[1]是當(dāng)前時間戳-[當(dāng)前時間戳] local now = tonumber(ARGV[1]) -- ARGV[2]是最大請求次數(shù)-[最大請求次數(shù)] local maxRequests = tonumber(ARGV[2]) -- ARGV[3]是時間窗口長度-[時間窗口長度] local windowSize = tonumber(ARGV[3]) -- 獲取當(dāng)前時間窗口的起始時間 local windowStart = math.floor(now / windowSize) * windowSize -- 構(gòu)建時間窗口內(nèi)的key,用于區(qū)分不同窗口的計數(shù) local windowKey = key .. ':' .. tostring(windowStart) -- 獲取當(dāng)前窗口的計數(shù) local currentCount = tonumber(redis.call('get', windowKey) or '0') -- 如果當(dāng)前時間不在窗口內(nèi),重置計數(shù) if now > windowStart + windowSize then redis.call('del', windowKey) currentCount = 0 end -- 檢查是否超過限制 if currentCount + 1 <= maxRequests then -- 未超過,增加計數(shù)并返回成功,并設(shè)置鍵的過期時間為窗口剩余時間,以自動清理過期數(shù)據(jù)。如果超過最大請求次數(shù),則拒絕請求 redis.call('set', windowKey, currentCount + 1, 'EX', windowSize - (now - windowStart)) return 1 -- 成功 else return 0 -- 失敗 end
Jmeter壓測
200次請求/s,限流了195,而我們設(shè)置的最大令牌數(shù)就是5
到此這篇關(guān)于redis+lua實現(xiàn)分布式限流的示例的文章就介紹到這了,更多相關(guān)redis+lua分布式限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis的Hash類型及相關(guān)命令小結(jié)
edis Hash是一種數(shù)據(jù)結(jié)構(gòu),用于存儲字段和值的映射關(guān)系,本文就來介紹一下Redis的Hash類型及相關(guān)命令小結(jié),具有一定的參考價值,感興趣的可以了解一下2025-01-01Redis優(yōu)雅地實現(xiàn)延遲隊列的方法分享
Redisson是Redis服務(wù)器上的分布式可伸縮Java數(shù)據(jù)結(jié)構(gòu),這篇文中主要為大家介紹了Redisson實現(xiàn)的優(yōu)雅的延遲隊列的方法,需要的可以參考一下2023-02-02redis執(zhí)行l(wèi)ua腳本的實現(xiàn)方法
redis在2.6推出了腳本功能,允許開發(fā)者使用Lua語言編寫腳本傳到redis中執(zhí)行。本文就介紹了redis執(zhí)行l(wèi)ua腳本的實現(xiàn)方法,感興趣的可以了解一下2021-11-11Redis教程(二):String數(shù)據(jù)類型
這篇文章主要介紹了Redis教程(二):String數(shù)據(jù)類型,本文講解了String數(shù)據(jù)類型概述、相關(guān)命令列表、命令使用示例三部分內(nèi)容,需要的朋友可以參考下2015-04-04Redis Cluster集群數(shù)據(jù)分片機制原理
這篇文章主要介紹了Redis Cluster集群數(shù)據(jù)分片機制原理,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04Redis如何實現(xiàn)計數(shù)統(tǒng)計
這篇文章主要介紹了Redis如何實現(xiàn)計數(shù)統(tǒng)計方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04基于Redis實現(xiàn)共享Session登錄的實現(xiàn)
本文主要介紹了基于Redis實現(xiàn)共享Session登錄的實現(xiàn),包括發(fā)送短信驗證碼、短信驗證碼登錄和注冊、以及登錄狀態(tài)校驗的流程,具有一定的參考價值,感興趣的可以了解一下2025-03-03