Redisson之分布式鎖原理全面分析
Redisson是一個(gè) Redis的開源客戶端,也提供了分布式鎖的實(shí)現(xiàn)。
Redisson官網(wǎng):
Redisson 分布式鎖使用
Redisson分布式鎖使用起來還是蠻簡(jiǎn)單的。
1、添加 Redisson 配置類
引入依賴:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.5</version>
</dependency>
創(chuàng)建 Redisson 配置類,注入 RedissonClient客戶端。
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
/**
* reids配置,支持單機(jī)、主從、哨兵、集群等配置。這里使用單機(jī)配置
*/
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
}
2、使用 Redisson分布式鎖
代碼如下:
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedissonClient redissonClient;
public void disLockDemo(long productId) {
String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId;
//設(shè)置鎖定資源名稱,并獲取分布式鎖對(duì)象。
RLock redissonLock = redissonClient.getLock(lockKey);
//1.加鎖
redissonLock.lock();
//boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
try {
//2.執(zhí)行業(yè)務(wù)代碼
// TODO
//if (isLock) {
// TODO
//}
} finally {
//3.解鎖
redissonLock.unlock();
}
}

Redisson 分布式鎖源碼分析
使用分布式鎖必須要考慮的一些問題:
- 互斥性:在任意時(shí)刻,只能有一個(gè)進(jìn)程持有鎖。
- 防死鎖:即使有一個(gè)進(jìn)程在持有鎖的期間崩潰而未能主動(dòng)釋放鎖,要有其他方式去釋放鎖從而保證其他進(jìn)程能獲取到鎖。
- 不能釋放別人的鎖:加鎖和解鎖的必須是同一個(gè)進(jìn)程。
- 鎖的續(xù)期問題:業(yè)務(wù)執(zhí)行時(shí)間超過鎖的過期時(shí)間時(shí),需要提前給鎖的續(xù)期。
Redisson 是 Redis 官方推薦分布式鎖實(shí)現(xiàn)方案,它采用 Watch Dog機(jī)制能夠很好的解決鎖續(xù)期的問題。
執(zhí)行 lua腳本保證了多條命令執(zhí)行的原子性操作。
帶著上面分布式鎖的一些問題查看源碼。
1、獲取分布式鎖對(duì)象
簡(jiǎn)單了解一下。
1.1 創(chuàng)建 RedissonClient
我們?cè)谂渲妙愔型ㄟ^ Redisson.create(config)方法創(chuàng)建了 RedissonClient對(duì)象,并注入到 IOC容器中。

1.2 獲取分布式鎖對(duì)象
使用 Redisson 客戶端來 獲取分布式鎖對(duì)象。

2、加鎖代碼

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//線程id
long threadId = Thread.currentThread().getId();
// 1.嘗試獲取鎖
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
//2.死循環(huán),反復(fù)去調(diào)用tryAcquire嘗試獲取鎖
while(true) {
// 再次嘗試獲取鎖
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// ttl為null時(shí)表示別的線程已經(jīng)unlock了,自己加鎖成功
if (ttl == null) {
return;
}
// 3.鎖互斥:通過 JDK的信號(hào)量 Semaphore來阻塞線程
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
// 4.無論是否獲得鎖,都要取消訂閱解鎖消息
this.unsubscribe(future, threadId);
}
}
}
2.1 異步加鎖機(jī)制
查看 tryAcquire()加鎖方法。

通過源碼,看到加鎖其實(shí)是通過一段 lua 腳本實(shí)現(xiàn)的,如下:
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
return redis.call('pttl', KEYS[1]);
- KEYS[1] 代表的是你加鎖的 key。
- ARGV[1] 代表的是鎖 key 的默認(rèn)生存時(shí)間,默認(rèn) 30 秒。
- ARGV[2] 代表的是加鎖的客戶端的線程 ID。通過
getLockName方法組裝了一下。 - ARGV[2] 后面的 1:為了支持可重入鎖做的計(jì)數(shù)統(tǒng)計(jì)。
Redisson 實(shí)現(xiàn)分布式鎖的共享資源的存儲(chǔ)結(jié)構(gòu)是 hash數(shù)據(jù)結(jié)構(gòu):
key 是鎖的名稱,field 是客戶端 ID,value 是該客戶端加鎖(可重入)的次數(shù)。
假設(shè)此時(shí),客戶端 1 來嘗試加鎖,查看加鎖的 lua 腳本:
第一段 if 判斷語句,如果你要加鎖的那個(gè)鎖 key 不存在的話,進(jìn)行加鎖。此時(shí)鎖 key不存在,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù),則客戶端 1加鎖成功,返回 null。
2.1.1 鎖的續(xù)期機(jī)制
客戶端 1 加鎖的那個(gè)鎖 key 默認(rèn)生存時(shí)間才 30 秒,如果超過了 30 秒,客戶端 1 還想一直持有這把鎖,就需要提前進(jìn)行鎖的續(xù)期操作。
Redisson 提供了一個(gè) Watch dog 機(jī)制來解決鎖的續(xù)期問題, 只要客戶端 1 一旦加鎖成功,就會(huì)啟動(dòng)一個(gè) Watch Dog。
進(jìn)入 scheduleExpirationRenewal方法,重點(diǎn)查看 renewExpiration方法。

鎖續(xù)期的 lua 腳本如下:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
return 0;
從源碼我們看到 leaseTime 必須是 -1 才會(huì)開啟 Watch Dog 機(jī)制,我們發(fā)現(xiàn):
- 如果想開啟 Watch Dog 機(jī)制必須使用默認(rèn)的加鎖時(shí)間為 30s。
- 如果自己自定義時(shí)間,即使用 tryLock,鎖并不會(huì)延長(zhǎng),不會(huì)觸發(fā)Watch Dog 機(jī)制。
Watch Dog 機(jī)制其實(shí)就是一個(gè)后臺(tái)定時(shí)任務(wù)線程,獲取鎖成功之后,會(huì)將持有鎖的線程放入到一個(gè) RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 1 還持有鎖 key(判斷客戶端是否還持有 key,
其實(shí)就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據(jù)線程 id 去 Redis 中查,
如果存在就會(huì)延長(zhǎng) key 的時(shí)間),那么就會(huì)不斷的延長(zhǎng)鎖 key 的生存時(shí)間。
如果服務(wù)宕機(jī)了,Watch Dog 機(jī)制線程也就沒有了,此時(shí)就不會(huì)延長(zhǎng) key 的過期時(shí)間,到了 30s 之后就會(huì)自動(dòng)過期了,其他線程就可以獲取到鎖。
2.1.2 可重入加鎖機(jī)制
Redisson 也是支持可重入鎖的,比如:客戶端 1 加鎖代碼:
@Override
public void demo() {
RLock lock = redissonSingle.getLock("myLock");
try {
lock.lock();
// TODO 執(zhí)行業(yè)務(wù)
//鎖重入
lock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
lock.unlock();
}
}
此時(shí),如果客戶端 1 又來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本。
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶端 1 的 ID,
- 此時(shí)數(shù)據(jù)結(jié)構(gòu)的是客戶端 1 的 ID,即包含客戶端 1的 ID,然后就執(zhí)行行可重入鎖的命令,將 hash 結(jié)構(gòu)的 value數(shù)據(jù) + 1,返回 null。
2.2 鎖互斥機(jī)制
上面客戶端 1加鎖成功,此時(shí),如果客戶端 2 來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本:
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶端 2 的 ID,如果包含就是執(zhí)行可重入鎖的賦值,此時(shí) hash數(shù)據(jù)結(jié)構(gòu)是客戶端 1 的 ID,不包含客戶端 2的 ID,所以,返回加鎖的那個(gè)鎖 key的剩余存活時(shí)間。
接著查看 lock方法中的 死循環(huán)部分。

流程大致如下:
- 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個(gè)ttl,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時(shí)間。
- 如果此時(shí)客戶端 2 進(jìn)程獲取鎖失敗,那么使用客戶端 2 的線程 id,通過 Redis 的 channel 訂閱鎖釋放的事件。
- 進(jìn)入死循環(huán)中,嘗試重新獲取鎖。
- 如果在重試中拿到了鎖,則直接返回。
- 如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息。通過使用了 JDK 的信號(hào)量 Semaphore 來阻塞線程,當(dāng) ttl 為鎖的剩余存活時(shí)間為0后,信號(hào)量的 release() 方法會(huì)被調(diào)用,此時(shí)被信號(hào)量阻塞的等待隊(duì)列中的一個(gè)線程就可以繼續(xù)嘗試獲取鎖了。
注意:
當(dāng)鎖正在被占用時(shí),等待獲取鎖的進(jìn)程并不是真正通過一個(gè) while(true) 死循環(huán)去獲取鎖(占 CPU資源),而時(shí)使用 JDK 的信號(hào)量 Semaphore 來阻塞線程(間斷性的不斷嘗試獲取鎖),是會(huì)釋放 CPU資源的。
3、鎖釋放代碼

public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 1. 異步釋放鎖
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 2. 取消 Watch Dog 機(jī)制
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
3.1 異步釋放鎖機(jī)制
查看unlockInnerAsync方法。
釋放鎖也是執(zhí)行的 lua 腳本:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end ;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end ;
return nil;
首先,第一段 if 判斷語句,判斷 key 是否存在的話,進(jìn)行加鎖。此時(shí)鎖 key不存在,則客戶端 1加鎖成功,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù)。返回 null。
然后,第二個(gè) if 判斷,判斷一下該客戶端對(duì)應(yīng)的鎖的 hash 結(jié)構(gòu)的 value 值是否遞減為 0,
- 如果遞減不為 0,則重入鎖的解鎖,返回0。
- 如果遞減為 0,則進(jìn)行刪除,返回1。
3.2 取消 Watch Dog機(jī)制
查看 cancelExpirationRenewal方法。

取消 Watch Dog 機(jī)制,即將 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的線程 id 刪除。
3.3 通知阻塞等待的進(jìn)程
利用 Redis 的發(fā)布訂閱機(jī)制,廣播釋放鎖的消息,通知阻塞等待的進(jìn)程(向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息)。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java?GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java?GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01
使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作
這篇文章主要介紹了使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringCloud @FeignClient注入Spring容器原理分析
本文詳細(xì)分析了Spring Boot中@FeignClient注解的掃描和注入過程,重點(diǎn)探討了@EnableFeignClients注解的工作原理,通過源碼分析,揭示了@EnableFeignClients如何通過@Import注解和FeignClientsRegistrar類實(shí)現(xiàn)bean定義的加載2024-12-12
spring對(duì)JDBC和orm的支持實(shí)例詳解
這篇文章主要介紹了spring對(duì)JDBC和orm的支持實(shí)例詳解,需要的朋友可以參考下2017-09-09
解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題
這篇文章主要介紹了解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
JAVA匿名內(nèi)部類(Anonymous Classes)的具體使用
本文主要介紹了JAVA匿名內(nèi)部類,匿名內(nèi)部類在我們JAVA程序員的日常工作中經(jīng)常要用到,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
java 數(shù)值類型分秒時(shí)間格式化的實(shí)例代碼
這篇文章主要介紹了java 數(shù)值類型分秒時(shí)間格式化的實(shí)例代碼的相關(guān)資料,將秒或分鐘的值轉(zhuǎn)換為xx天xx小時(shí)xx分鐘xx秒 如果 “xx” 為0 自動(dòng)缺省,需要的朋友可以參考下2017-07-07
Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式
這篇文章主要介紹了Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03
Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載操作
這篇文章主要為大家介紹了Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載的操作過程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02

