Redisson如何解決Redis分布式鎖提前釋放問(wèn)題
前言:
在分布式場(chǎng)景下,相信你或多或少需要使用分布式鎖來(lái)訪(fǎng)問(wèn)臨界資源,或者控制耗時(shí)操作的并發(fā)性。
當(dāng)然,實(shí)現(xiàn)分布式鎖的方案也比較多,比如數(shù)據(jù)庫(kù)、redis、zk 等等。本文主要結(jié)合一個(gè)線(xiàn)上案例,講解 redis 分布式鎖的相關(guān)實(shí)現(xiàn)。
一、問(wèn)題描述:
某天線(xiàn)上出現(xiàn)了數(shù)據(jù)重復(fù)處理問(wèn)題,經(jīng)排查后發(fā)現(xiàn),竟然是單次處理時(shí)間較長(zhǎng),redis 分布式鎖提前釋放
導(dǎo)致相同請(qǐng)求并發(fā)處理。
其實(shí),這是一個(gè)鎖續(xù)約
的問(wèn)題,對(duì)于一把分布式鎖,我們需要考慮,設(shè)置鎖多長(zhǎng)時(shí)間過(guò)期、出現(xiàn)異常如何釋放鎖?
以上問(wèn)題便是本文要討論的主題。
二、原因分析:
項(xiàng)目采用較簡(jiǎn)單的自定義 redis 分布式鎖,為避免死鎖定義默認(rèn)過(guò)期時(shí)間 10s,如下:
override fun lock() { while (true) { //嘗試獲取鎖 if (tryLock()) { return } try { Thread.sleep(10) } catch (e: InterruptedException) { e.printStackTrace() } } } override fun tryLock(): Boolean { val value = getUniqueSign() // 隨機(jī)串 val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS) if (flag != null && flag) { VALUE_lOCAL.set(value) INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1) return true } return false }
缺乏對(duì)鎖自動(dòng)續(xù)期等實(shí)現(xiàn)。
三、解決方案:
1、思考:
針對(duì)這種場(chǎng)景,可以考慮的是如何給鎖自動(dòng)續(xù)期-當(dāng)業(yè)務(wù)沒(méi)有執(zhí)行結(jié)束的情況下,當(dāng)然也可以自定義實(shí)現(xiàn) 比如開(kāi)一個(gè)后臺(tái)線(xiàn)程定時(shí)的給這些拿到鎖的線(xiàn)程續(xù)期。
Redisson 也正是基于這種思路實(shí)現(xiàn)自動(dòng)續(xù)期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優(yōu)化。
2、Redisson簡(jiǎn)單配置:
@Configuration @EnableConfigurationProperties(RedissonProperties::class) class RedissonConfig { @Bean fun redissonClient(redissonProperties: RedissonProperties): RedissonClient { val config = Config() val singleServerConfig = redissonProperties.singleServerConfig!! config.useSingleServer().setAddress(singleServerConfig.address) .setDatabase(singleServerConfig.database) .setUsername(singleServerConfig.username) .setPassword(singleServerConfig.password) .setConnectionPoolSize(singleServerConfig.connectionPoolSize) .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize) .setConnectTimeout(singleServerConfig.connectTimeout) .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout) .setRetryInterval(singleServerConfig.retryInterval) .setRetryAttempts(singleServerConfig.retryAttempts) .setTimeout(singleServerConfig.timeout) return Redisson.create(config) } } @ConfigurationProperties(prefix = "xxx.redisson") class RedissonProperties { var singleServerConfig: SingleServerConfig? = null }
Redis 服務(wù)使用的騰訊云的哨兵模式架構(gòu),此架構(gòu)對(duì)外開(kāi)放一個(gè)代理地址訪(fǎng)問(wèn),因此這里配置單機(jī)模式配置即可。
如果你是自己搭建的 redis 哨兵模式架構(gòu),需要按照文檔配置相關(guān)必要參數(shù)
3、使用樣例:
... @Autowired lateinit var redissonClient: RedissonClient ... fun xxx() { ... val lock = redissonClient.getLock("mylock") lock.lock() try { ... } finally { lock.unlock() } ... }
使用方式和JDK提供的鎖是不是很像?是不是很簡(jiǎn)單?
正是Redisson這類(lèi)優(yōu)秀的開(kāi)源產(chǎn)品的出現(xiàn),才讓我們將更多的時(shí)間投入到業(yè)務(wù)開(kāi)發(fā)中...
四、源碼分析
下面來(lái)看看 Redisson 對(duì)常規(guī)分布式鎖的實(shí)現(xiàn),主要分析 RedissonLock
1、lock加鎖操作
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } // 租約期限, 也就是expire時(shí)間, -1代表未設(shè)置 將使用系統(tǒng)默認(rèn)的30s private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 嘗試拿鎖, 如果能拿到就直接返回 long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止 try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } ... } } finally { unsubscribe(future, threadId); } }
1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 調(diào)用真正獲取鎖的操作 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired // 這里是成功獲取了鎖, 嘗試給鎖續(xù)約 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 通過(guò)lua腳本真正執(zhí)行加鎖的操作 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 如果key不存在, 那正好, 直接set并設(shè)置過(guò)期時(shí)間 // 如果key存在, 就有兩種情況需要考慮 // - 同一線(xiàn)程獲取重入鎖,直接將field(也就是getLockName(threadId))對(duì)應(yīng)的value值+1 // - 不同線(xiàn)程競(jìng)爭(zhēng)鎖, 此次加鎖失敗, 并直接返回此key對(duì)應(yīng)的過(guò)期時(shí)間 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
1.2、續(xù)約
通過(guò) scheduleExpirationRenewal 給鎖續(xù)約
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); // 續(xù)約操作 renewExpiration(); } } private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 設(shè)置延遲任務(wù)task, 在時(shí)長(zhǎng)internalLockLeaseTime/3之后執(zhí)行, 定期給鎖續(xù)期 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正執(zhí)行續(xù)期命令操作 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 這次續(xù)期之后, 繼續(xù)schedule自己, 達(dá)到持續(xù)續(xù)期的效果 if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // 所謂續(xù)期, 就是將expire過(guò)期時(shí)間再延長(zhǎng) protected RFuture<Boolean> renewExpirationAsync(long threadId) { // 如果key以及當(dāng)前線(xiàn)程存在, 則延長(zhǎng)expire時(shí)間, 并返回1代表成功;否則返回0代表失敗 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
2、unlock解鎖操作
public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { ... } } public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); // 執(zhí)行解鎖操作 RFuture<Boolean> future = unlockInnerAsync(threadId); // 操作成功之后做的事 future.onComplete((opStatus, e) -> { // 取消續(xù)約task cancelExpirationRenewal(threadId); ... }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 如果key以及當(dāng)前線(xiàn)程對(duì)應(yīng)的記錄已經(jīng)不存在, 直接返回空 // 否在將field(也就是getLockName(threadId))對(duì)應(yīng)的value減1 // - 如果減去1之后值還大于0, 那么重新延長(zhǎng)過(guò)期時(shí)間 // - 如果減去之后值小于等于0, 那么直接刪除key, 并發(fā)布訂閱消息 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
以上便是 redisson 客戶(hù)端工具對(duì) redis 分布式鎖的加/解鎖具體實(shí)現(xiàn),主要解決了以下幾個(gè)問(wèn)題
1、死鎖問(wèn)題:設(shè)置過(guò)期時(shí)間
2、可重入問(wèn)題:重入+1, 釋放鎖-1,當(dāng)值=0時(shí)代表完全釋放鎖
3、續(xù)約問(wèn)題:可解決鎖提前釋放問(wèn)題
4、鎖釋放:誰(shuí)加鎖就由誰(shuí)來(lái)釋放
總結(jié):
本文由一個(gè)線(xiàn)上問(wèn)題做引子,通過(guò) redis 分布式鎖的常用實(shí)現(xiàn)方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實(shí)現(xiàn)細(xì)節(jié)
相關(guān)參考:
到此這篇關(guān)于Redisson如何解決Redis分布式鎖提前釋放問(wèn)題的文章就介紹到這了,更多相關(guān)Redis分布式鎖提前釋放內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Redis實(shí)現(xiàn)用戶(hù)積分排行榜的教程
這篇文章主要介紹了使用Redis實(shí)現(xiàn)用戶(hù)積分排行榜的教程,包括一個(gè)用PHP腳本進(jìn)行操作的例子,需要的朋友可以參考下2015-04-04Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))
本文主要介紹了Redis的六種底層數(shù)據(jù)結(jié)構(gòu),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01詳解redis desktop manager安裝及連接方式
這篇文章主要介紹了redis desktop manager安裝及連接方式,本文圖文并茂給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-09-09Redis的使用模式之計(jì)數(shù)器模式實(shí)例
這篇文章主要介紹了Redis的使用模式之計(jì)數(shù)器模式實(shí)例,本文講解了匯總計(jì)數(shù)器、按時(shí)間匯總的計(jì)數(shù)器、速度控制、使用 Hash 數(shù)據(jù)類(lèi)型維護(hù)大量計(jì)數(shù)器等內(nèi)容,需要的朋友可以參考下2015-03-03