Redisson分布式限流器RRateLimiter的使用及原理小結(jié)
一、基本使用
1.1 創(chuàng)建限流器
/** * Returns rate limiter instance by name * * @param name of rate limiter * @return RateLimiter object */ RRateLimiter getRateLimiter(String name);
/** * Initializes RateLimiter's state and stores config to Redis server. * * @param mode - rate mode * @param rate - rate * @param rateInterval - rate time interval * @param rateIntervalUnit - rate time interval unit * @return true if rate was set and false otherwise */ boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
trySetRate
用于設(shè)置限流參數(shù)。其中 RateType 包含 OVERALL
和 PER_CLIENT
兩個枚舉常量,分別表示全局限流和單機限流。后面三個參數(shù)表明了令牌的生成速率,即每 rateInterval
生成 rate
個令牌,rateIntervalUnit
為 rateInterval
的時間單位。
1.2 獲取令牌
/** * Acquires a specified permits from this RateLimiter, * blocking until one is available. * * Acquires the given number of permits, if they are available * and returns immediately, reducing the number of available permits * by the given amount. * * @param permits the number of permits to acquire */ void acquire(long permits); /** * Acquires the given number of permits only if all are available * within the given waiting time. * * Acquires the given number of permits, if all are available and returns immediately, * with the value true, reducing the number of available permits by one. * * If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * the specified waiting time elapses. * * If a permits is acquired then the value true is returned. * * If the specified waiting time elapses then the value false * is returned. If the time is less than or equal to zero, the method * will not wait at all. * * @param permits amount * @param timeout the maximum time to wait for a permit * @param unit the time unit of the timeout argument * @return true if a permit was acquired and false * if the waiting time elapsed before a permit was acquired */ boolean tryAcquire(long permits, long timeout, TimeUnit unit);
acquire
和 tryAcquire
均可用于獲取指定數(shù)量的令牌,不過 acquire
會阻塞等待,而 tryAcquire
會等待 timeout
時間,如果仍然沒有獲得指定數(shù)量的令牌直接返回 false
。
1.3 使用示例
@Slf4j @SpringBootTest class RateLimiterTest { @Autowired private RedissonClient redissonClient; private static final int threadCount = 10; @Test void test() throws InterruptedException { RRateLimiter rateLimiter = redissonClient.getRateLimiter("my_limiter"); rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { new Thread(() -> { rateLimiter.tryAcquire(5, 3, TimeUnit.SECONDS); latch.countDown(); log.info("latch count {}", latch.getCount()); }).start(); } latch.await(); } }
2024-01-16 20:14:27 INFO [Thread-2] atreus.ink.rate.RateLimiterTest : latch count 9
2024-01-16 20:14:27 INFO [Thread-3] atreus.ink.rate.RateLimiterTest : latch count 8
2024-01-16 20:14:28 INFO [Thread-1] atreus.ink.rate.RateLimiterTest : latch count 7
2024-01-16 20:14:29 INFO [Thread-10] atreus.ink.rate.RateLimiterTest : latch count 6
2024-01-16 20:14:29 INFO [Thread-8] atreus.ink.rate.RateLimiterTest : latch count 5
2024-01-16 20:14:30 INFO [Thread-5] atreus.ink.rate.RateLimiterTest : latch count 4
2024-01-16 20:14:30 INFO [Thread-4] atreus.ink.rate.RateLimiterTest : latch count 3
2024-01-16 20:14:30 INFO [Thread-6] atreus.ink.rate.RateLimiterTest : latch count 2
2024-01-16 20:14:30 INFO [Thread-7] atreus.ink.rate.RateLimiterTest : latch count 1
2024-01-16 20:14:30 INFO [Thread-9] atreus.ink.rate.RateLimiterTest : latch count 0
二、實現(xiàn)原理
Redisson 的 RRateLimiter 基于令牌桶實現(xiàn),令牌桶的主要特點如下:
- 令牌以固定速率生成。
- 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當(dāng)請求到達(dá)時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執(zhí)行。
- 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。
RRateLimiter 在創(chuàng)建限流器時通過下面 Lua 腳本設(shè)置限流器的相關(guān)參數(shù):
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);
而獲取令牌則是通過以下的 Lua 腳本實現(xiàn):
-- 請求參數(shù)示例 -- KEYS[1] my_limiter -- KEYS[2] {my_limiter}:value -- KEYS[4] {my_limiter}:permits -- ARGV[1] 3 本次請求的令牌數(shù) -- ARGV[2] 1705396021850 System.currentTimeMillis() -- ARGV[3] 6966135962453115904 ThreadLocalRandom.current().nextLong() -- 讀取 RRateLimiter.trySetRate 中配置的限流器信息 local rate = redis.call('hget', KEYS[1], 'rate'); -- 10 一個時間窗口內(nèi)產(chǎn)生的令牌數(shù) local interval = redis.call('hget', KEYS[1], 'interval'); -- 1000 一個時間窗口對應(yīng)的毫秒數(shù) local type = redis.call('hget', KEYS[1], 'type'); -- 0 全局限流 assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized') local valueName = KEYS[2]; -- {my_limiter}:value 當(dāng)前可用令牌數(shù)字符串的 key local permitsName = KEYS[4]; -- {my_limiter}:permits 授權(quán)記錄有序集合的 key -- 單機限流配置 無需考慮 if type == '1' then valueName = KEYS[3]; permitsName = KEYS[5]; end; -- 查詢當(dāng)前可用的令牌數(shù) 查詢失敗表明是首次請求令牌 local currentValue = redis.call('get', valueName); if currentValue == false then -- 首次請求令牌 -- 單次請求的令牌數(shù)不能超過一個時間窗口內(nèi)產(chǎn)生的令牌數(shù) assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); -- 更新當(dāng)前可用令牌數(shù)以及令牌授權(quán)記錄 {my_limiter}:permits -- set {my_limiter}:permits 10 redis.call('set', valueName, rate); -- zadd {my_limiter}:permits 1705396021850 6966135962453115904_1 redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); -- decrby {my_limiter}:permits 3 redis.call('decrby', valueName, ARGV[1]); return nil; else -- 再次請求令牌 -- 查詢可以回收的令牌對應(yīng)的授權(quán)記錄 即一個時間窗口前的所有授權(quán)記錄且包括一個時間窗口前這一時刻 -- 舊令牌回收的本質(zhì)是新令牌的加入 如果一個令牌是在一個時間窗口前被分配的 那經(jīng)過一個時間窗口后這個空出的位置應(yīng)該已經(jīng)由新令牌填充 -- zrangebyscore {my_limiter}:permits 0 1705396020850 local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); -- [1936135962853113704_2, 536135765023123704_5] -- 統(tǒng)計可以回收的令牌數(shù) local released = 0; for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('fI', v); -- released = released + 2 -- released = released + 5 released = released + permits; end; -- 刪除授權(quán)記錄并回收令牌 if released > 0 then -- zrem {my_limiter}:permits 1936135962853113704_2 536135765023123704_5 redis.call('zrem', permitsName, unpack(expiredValues)); currentValue = tonumber(currentValue) + released; -- incrby {my_limiter}:value 7 redis.call('set', valueName, currentValue); end; if tonumber(currentValue) < tonumber(ARGV[1]) then -- 如果回收后可用令牌數(shù)仍然不足 返回需要等待的時間 -- zrangebyscore {my_limiter}:permits (1705396020850 1705396021850 withscores limit 0 1 local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1); local random, permits = struct.unpack('fI', nearest[1]); -- 1705396021650 - 1705396021850 + 1000 = 800 return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval); else redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); return nil; end; end;
參考:
https://github.com/oneone1995/blog/issues/13
https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673
到此這篇關(guān)于Redisson分布式限流器RRateLimiter的使用及原理小結(jié)的文章就介紹到這了,更多相關(guān)Redisson RRateLimiter內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?redis使用報錯Read?timed?out排查及解決過程
項目使用spring集成redis,偶爾會出現(xiàn)read timed out的情況,剛開始以為是網(wǎng)絡(luò)不穩(wěn)定引起的,后面發(fā)現(xiàn)影響業(yè)務(wù)測試的準(zhǔn)確性,這篇文章主要給大家介紹了關(guān)于Spring redis使用報錯Read timed out排查及解決過程的相關(guān)資料,需要的朋友可以參考下2024-02-02redis輕松處理經(jīng)緯度坐標(biāo)點數(shù)據(jù)的實現(xiàn)方法
這篇文章主要介紹了redis輕松處理經(jīng)緯度坐標(biāo)點數(shù)據(jù)的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10一文搞懂阿里云服務(wù)器部署Redis并整合Spring?Boot
這篇文章主要介紹了一文搞懂阿里云服務(wù)器部署Redis并整合Spring?Boot,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09