redisson 實(shí)現(xiàn)分布式鎖的源碼解析
redisson
redisson 實(shí)現(xiàn)分布式鎖的機(jī)制如下:
依賴版本
implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'
測試代碼
下面是模擬一個(gè)商品秒殺的場景,示例代碼如下:
public class RedissonTest { public static void main(String[] args) { //1. 配置部分 Config config = new Config(); String address = "redis://127.0.0.1:6379"; SingleServerConfig serverConfig = config.useSingleServer(); serverConfig.setAddress(address); serverConfig.setDatabase(0); config.setLockWatchdogTimeout(5000); Redisson redisson = (Redisson) Redisson.create(config); RLock rLock = redisson.getLock("goods:1000:1"); //2. 加鎖 rLock.lock(); try { System.out.println("todo 邏輯處理 1000000."); } finally { if (rLock.isLocked() && rLock.isHeldByCurrentThread()) { //3. 解鎖 rLock.unlock(); } } } }
加鎖設(shè)計(jì)
rLock.lock();
是加鎖的核心代碼,我們一起來看看調(diào)用棧
加鎖的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
其實(shí)它的本質(zhì)是調(diào)用一段 LUA 腳本進(jìn)行加鎖。
鎖續(xù)期設(shè)計(jì)
鎖的續(xù)期是在 org.redisson.RedissonLock#tryAcquireAsync
方法中調(diào)用 scheduleExpirationRenewal
實(shí)現(xiàn)的。
續(xù)期需要注意的是,看門狗是設(shè)置在主線程的延遲隊(duì)列的線程中。
tryAcquireAsync
代碼如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 鎖過期時(shí)間續(xù)期 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
鎖續(xù)期 scheduleExpirationRenewal
代碼如下:
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } }
然后在調(diào)用 renewExpiration();
執(zhí)行續(xù)期邏輯
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 創(chuàng)建延遲任務(wù) Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正的續(xù)期,調(diào)用 LUA 腳本續(xù)期 RFuture<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 如果續(xù)期成功 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
renewExpirationAsync
方法, 里面還是一段 LUA 腳本,進(jìn)行重新設(shè)置鎖的過期時(shí)間。
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
鎖的自旋重試
org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)
在執(zhí)行獲取鎖失敗的時(shí)候,會進(jìn)入重試。其實(shí)這里就會執(zhí)行 18 行以后的 while (true)
邏輯
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // 阻塞鎖的超時(shí)時(shí)間,等鎖過期后再嘗試加鎖 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
其實(shí)這里就是一個(gè)間歇性自旋。 等到上次鎖過期的時(shí)間,在喚醒進(jìn)行搶鎖 entry.getLatch().acquire();
還有一個(gè)邏輯就是
CompletableFuture future = subscribe(threadId);
這里其實(shí)是會訂閱一個(gè)消息,如果解鎖過后,會發(fā)布解鎖的消息。
解鎖設(shè)計(jì)
rLock.unlock(); 的核心就是釋放鎖,撤銷續(xù)期和喚醒在等待加鎖的線程(發(fā)布解鎖成功消息)。
核心方法(解鎖): org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + // 發(fā)布解鎖成功消息 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
還是 LUA 的執(zhí)行方式。
撤銷鎖續(xù)期
核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)
@Override public RFuture<Void> unlockAsync(long threadId) { // 解鎖 RFuture<Boolean> future = unlockInnerAsync(threadId); // 撤銷續(xù)期 CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f); }
解鎖成功喚排隊(duì)線程
在 org.redisson.pubsub.LockPubSub#onMessage
中回去喚醒阻塞的線程,讓執(zhí)行前面的鎖自旋邏輯,具體代碼如下:
@Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } }
到此這篇關(guān)于redisson 實(shí)現(xiàn)分布式鎖的文章就介紹到這了,更多相關(guān)redisson 分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解JavaWeb如何實(shí)現(xiàn)文件上傳和下載功能
這篇文章主要介紹了如何利用JavaWeb實(shí)現(xiàn)文件的上傳和下載功能,文中的示例代碼講解詳細(xì),對我們的學(xué)習(xí)或工作有一定的幫助,感興趣的小伙伴可以學(xué)習(xí)一下2021-12-12Admin - SpringBoot + Maven 多啟動環(huán)境配置實(shí)例詳解
這篇文章主要介紹了Admin - SpringBoot + Maven 多啟動環(huán)境配置,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03idea環(huán)境下Maven無法正常下載pom中配置的包問題
這篇文章主要介紹了idea環(huán)境下Maven無法正常下載pom中配置的包的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06Java規(guī)則引擎easy-rules詳細(xì)介紹
本文主要介紹了Java規(guī)則引擎easy-rules詳細(xì)介紹,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01初識Java基礎(chǔ)之?dāng)?shù)據(jù)類型與運(yùn)算符
Java是一種強(qiáng)類型語言,每個(gè)變量都必須聲明其數(shù)據(jù)類型,下面這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)之?dāng)?shù)據(jù)類型與運(yùn)算符的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-10-10