Redisson如何解決Redis分布式鎖提前釋放問題
前言:
在分布式場景下,相信你或多或少需要使用分布式鎖來訪問臨界資源,或者控制耗時操作的并發(fā)性。
當(dāng)然,實現(xiàn)分布式鎖的方案也比較多,比如數(shù)據(jù)庫、redis、zk 等等。本文主要結(jié)合一個線上案例,講解 redis 分布式鎖的相關(guān)實現(xiàn)。
一、問題描述:
某天線上出現(xiàn)了數(shù)據(jù)重復(fù)處理問題,經(jīng)排查后發(fā)現(xiàn),竟然是單次處理時間較長,redis 分布式鎖提前釋放導(dǎo)致相同請求并發(fā)處理。
其實,這是一個鎖續(xù)約的問題,對于一把分布式鎖,我們需要考慮,設(shè)置鎖多長時間過期、出現(xiàn)異常如何釋放鎖?
以上問題便是本文要討論的主題。
二、原因分析:
項目采用較簡單的自定義 redis 分布式鎖,為避免死鎖定義默認(rèn)過期時間 10s,如下:
override fun lock() {
while (true) {
//嘗試獲取鎖
if (tryLock()) {
return
}
try {
Thread.sleep(10)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
override fun tryLock(): Boolean {
val value = getUniqueSign() // 隨機串
val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS)
if (flag != null && flag) {
VALUE_lOCAL.set(value)
INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1)
return true
}
return false
}缺乏對鎖自動續(xù)期等實現(xiàn)。
三、解決方案:
1、思考:
針對這種場景,可以考慮的是如何給鎖自動續(xù)期-當(dāng)業(yè)務(wù)沒有執(zhí)行結(jié)束的情況下,當(dāng)然也可以自定義實現(xiàn) 比如開一個后臺線程定時的給這些拿到鎖的線程續(xù)期。
Redisson 也正是基于這種思路實現(xiàn)自動續(xù)期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優(yōu)化。
2、Redisson簡單配置:
@Configuration
@EnableConfigurationProperties(RedissonProperties::class)
class RedissonConfig {
@Bean
fun redissonClient(redissonProperties: RedissonProperties): RedissonClient {
val config = Config()
val singleServerConfig = redissonProperties.singleServerConfig!!
config.useSingleServer().setAddress(singleServerConfig.address)
.setDatabase(singleServerConfig.database)
.setUsername(singleServerConfig.username)
.setPassword(singleServerConfig.password)
.setConnectionPoolSize(singleServerConfig.connectionPoolSize)
.setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize)
.setConnectTimeout(singleServerConfig.connectTimeout)
.setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout)
.setRetryInterval(singleServerConfig.retryInterval)
.setRetryAttempts(singleServerConfig.retryAttempts)
.setTimeout(singleServerConfig.timeout)
return Redisson.create(config)
}
}
@ConfigurationProperties(prefix = "xxx.redisson")
class RedissonProperties {
var singleServerConfig: SingleServerConfig? = null
}Redis 服務(wù)使用的騰訊云的哨兵模式架構(gòu),此架構(gòu)對外開放一個代理地址訪問,因此這里配置單機模式配置即可。
如果你是自己搭建的 redis 哨兵模式架構(gòu),需要按照文檔配置相關(guān)必要參數(shù)
3、使用樣例:
...
@Autowired
lateinit var redissonClient: RedissonClient
...
fun xxx() {
...
val lock = redissonClient.getLock("mylock")
lock.lock()
try {
...
} finally {
lock.unlock()
}
...
}使用方式和JDK提供的鎖是不是很像?是不是很簡單?
正是Redisson這類優(yōu)秀的開源產(chǎn)品的出現(xiàn),才讓我們將更多的時間投入到業(yè)務(wù)開發(fā)中...
四、源碼分析
下面來看看 Redisson 對常規(guī)分布式鎖的實現(xiàn),主要分析 RedissonLock
1、lock加鎖操作
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
// 租約期限, 也就是expire時間, -1代表未設(shè)置 將使用系統(tǒng)默認(rèn)的30s
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 嘗試拿鎖, 如果能拿到就直接返回
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
// 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
...
}
} finally {
unsubscribe(future, threadId);
}
}1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
// 調(diào)用真正獲取鎖的操作
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
// 這里是成功獲取了鎖, 嘗試給鎖續(xù)約
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 通過lua腳本真正執(zhí)行加鎖的操作
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 如果key不存在, 那正好, 直接set并設(shè)置過期時間
// 如果key存在, 就有兩種情況需要考慮
// - 同一線程獲取重入鎖,直接將field(也就是getLockName(threadId))對應(yīng)的value值+1
// - 不同線程競爭鎖, 此次加鎖失敗, 并直接返回此key對應(yīng)的過期時間
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}1.2、續(xù)約
通過 scheduleExpirationRenewal 給鎖續(xù)約
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 續(xù)約操作
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 設(shè)置延遲任務(wù)task, 在時長internalLockLeaseTime/3之后執(zhí)行, 定期給鎖續(xù)期
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 真正執(zhí)行續(xù)期命令操作
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 這次續(xù)期之后, 繼續(xù)schedule自己, 達到持續(xù)續(xù)期的效果
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// 所謂續(xù)期, 就是將expire過期時間再延長
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// 如果key以及當(dāng)前線程存在, 則延長expire時間, 并返回1代表成功;否則返回0代表失敗
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}2、unlock解鎖操作
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
...
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
// 執(zhí)行解鎖操作
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 操作成功之后做的事
future.onComplete((opStatus, e) -> {
// 取消續(xù)約task
cancelExpirationRenewal(threadId);
...
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 如果key以及當(dāng)前線程對應(yīng)的記錄已經(jīng)不存在, 直接返回空
// 否在將field(也就是getLockName(threadId))對應(yīng)的value減1
// - 如果減去1之后值還大于0, 那么重新延長過期時間
// - 如果減去之后值小于等于0, 那么直接刪除key, 并發(fā)布訂閱消息
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}以上便是 redisson 客戶端工具對 redis 分布式鎖的加/解鎖具體實現(xiàn),主要解決了以下幾個問題
1、死鎖問題:設(shè)置過期時間
2、可重入問題:重入+1, 釋放鎖-1,當(dāng)值=0時代表完全釋放鎖
3、續(xù)約問題:可解決鎖提前釋放問題
4、鎖釋放:誰加鎖就由誰來釋放
總結(jié):
本文由一個線上問題做引子,通過 redis 分布式鎖的常用實現(xiàn)方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實現(xiàn)細節(jié)
相關(guān)參考:
到此這篇關(guān)于Redisson如何解決Redis分布式鎖提前釋放問題的文章就介紹到這了,更多相關(guān)Redis分布式鎖提前釋放內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))
本文主要介紹了Redis的六種底層數(shù)據(jù)結(jié)構(gòu),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01
詳解redis desktop manager安裝及連接方式
這篇文章主要介紹了redis desktop manager安裝及連接方式,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-09-09

