亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Redisson如何解決Redis分布式鎖提前釋放問(wèn)題

 更新時(shí)間:2022年05月26日 15:18:12   作者:柏油  
本文主要介紹了Redisson如何解決Redis分布式鎖提前釋放問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

前言:

在分布式場(chǎng)景下,相信你或多或少需要使用分布式鎖來(lái)訪(fǎng)問(wèn)臨界資源,或者控制耗時(shí)操作的并發(fā)性。

當(dāng)然,實(shí)現(xiàn)分布式鎖的方案也比較多,比如數(shù)據(jù)庫(kù)、redis、zk 等等。本文主要結(jié)合一個(gè)線(xiàn)上案例,講解 redis 分布式鎖的相關(guān)實(shí)現(xiàn)。

一、問(wèn)題描述:

某天線(xiàn)上出現(xiàn)了數(shù)據(jù)重復(fù)處理問(wèn)題,經(jīng)排查后發(fā)現(xiàn),竟然是單次處理時(shí)間較長(zhǎng),redis 分布式鎖提前釋放導(dǎo)致相同請(qǐng)求并發(fā)處理。

其實(shí),這是一個(gè)鎖續(xù)約的問(wèn)題,對(duì)于一把分布式鎖,我們需要考慮,設(shè)置鎖多長(zhǎng)時(shí)間過(guò)期、出現(xiàn)異常如何釋放鎖?

以上問(wèn)題便是本文要討論的主題。

二、原因分析:

      項(xiàng)目采用較簡(jiǎn)單的自定義 redis 分布式鎖,為避免死鎖定義默認(rèn)過(guò)期時(shí)間 10s,如下:

    override fun lock() {

        while (true) {
            //嘗試獲取鎖
            if (tryLock()) {
                return
            }
            try {
                Thread.sleep(10)
            } catch (e: InterruptedException) {
                e.printStackTrace()
            }

        }
    }

    override fun tryLock(): Boolean {
        val value = getUniqueSign() // 隨機(jī)串
        val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS)
        if (flag != null && flag) {
            VALUE_lOCAL.set(value)
            INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1)
            return true
        }
        return false
    }

缺乏對(duì)鎖自動(dòng)續(xù)期等實(shí)現(xiàn)。

三、解決方案:

1、思考: 

針對(duì)這種場(chǎng)景,可以考慮的是如何給鎖自動(dòng)續(xù)期-當(dāng)業(yè)務(wù)沒(méi)有執(zhí)行結(jié)束的情況下,當(dāng)然也可以自定義實(shí)現(xiàn) 比如開(kāi)一個(gè)后臺(tái)線(xiàn)程定時(shí)的給這些拿到鎖的線(xiàn)程續(xù)期。

Redisson 也正是基于這種思路實(shí)現(xiàn)自動(dòng)續(xù)期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優(yōu)化。

2、Redisson簡(jiǎn)單配置:

@Configuration
@EnableConfigurationProperties(RedissonProperties::class)
class RedissonConfig {

    @Bean
    fun redissonClient(redissonProperties: RedissonProperties): RedissonClient {
        val config = Config()
        val singleServerConfig = redissonProperties.singleServerConfig!!
        config.useSingleServer().setAddress(singleServerConfig.address)
                .setDatabase(singleServerConfig.database)
                .setUsername(singleServerConfig.username)
                .setPassword(singleServerConfig.password)
                .setConnectionPoolSize(singleServerConfig.connectionPoolSize)
              .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize)
                .setConnectTimeout(singleServerConfig.connectTimeout)
                .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout)
                .setRetryInterval(singleServerConfig.retryInterval)
                .setRetryAttempts(singleServerConfig.retryAttempts)
                .setTimeout(singleServerConfig.timeout)
        return Redisson.create(config)
    }

}

@ConfigurationProperties(prefix = "xxx.redisson")
class RedissonProperties {
    var singleServerConfig: SingleServerConfig? = null
}

Redis 服務(wù)使用的騰訊云的哨兵模式架構(gòu),此架構(gòu)對(duì)外開(kāi)放一個(gè)代理地址訪(fǎng)問(wèn),因此這里配置單機(jī)模式配置即可。

如果你是自己搭建的 redis 哨兵模式架構(gòu),需要按照文檔配置相關(guān)必要參數(shù)

3、使用樣例:

    ...
  
    @Autowired
    lateinit var redissonClient: RedissonClient

 
    ... 

    fun xxx() {

      ...

      val lock = redissonClient.getLock("mylock")
      lock.lock()
      try {
        
        ... 

      } finally {
        lock.unlock()
      }

        ...

    }

使用方式和JDK提供的鎖是不是很像?是不是很簡(jiǎn)單?

正是Redisson這類(lèi)優(yōu)秀的開(kāi)源產(chǎn)品的出現(xiàn),才讓我們將更多的時(shí)間投入到業(yè)務(wù)開(kāi)發(fā)中...

四、源碼分析

下面來(lái)看看 Redisson 對(duì)常規(guī)分布式鎖的實(shí)現(xiàn),主要分析 RedissonLock

1、lock加鎖操作

    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }



    // 租約期限, 也就是expire時(shí)間, -1代表未設(shè)置 將使用系統(tǒng)默認(rèn)的30s
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 嘗試拿鎖, 如果能拿到就直接返回
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                ...

            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

1.1、tryAcquire

    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        // 調(diào)用真正獲取鎖的操作
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            // 這里是成功獲取了鎖, 嘗試給鎖續(xù)約
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    // 通過(guò)lua腳本真正執(zhí)行加鎖的操作
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        // 如果key不存在, 那正好, 直接set并設(shè)置過(guò)期時(shí)間
        // 如果key存在, 就有兩種情況需要考慮
        //   - 同一線(xiàn)程獲取重入鎖,直接將field(也就是getLockName(threadId))對(duì)應(yīng)的value值+1
        //   - 不同線(xiàn)程競(jìng)爭(zhēng)鎖, 此次加鎖失敗, 并直接返回此key對(duì)應(yīng)的過(guò)期時(shí)間
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

1.2、續(xù)約

通過(guò) scheduleExpirationRenewal 給鎖續(xù)約

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            // 續(xù)約操作
            renewExpiration();
        }
    }

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        // 設(shè)置延遲任務(wù)task, 在時(shí)長(zhǎng)internalLockLeaseTime/3之后執(zhí)行, 定期給鎖續(xù)期
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 真正執(zhí)行續(xù)期命令操作
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    // 這次續(xù)期之后, 繼續(xù)schedule自己, 達(dá)到持續(xù)續(xù)期的效果
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

    // 所謂續(xù)期, 就是將expire過(guò)期時(shí)間再延長(zhǎng)
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        // 如果key以及當(dāng)前線(xiàn)程存在, 則延長(zhǎng)expire時(shí)間, 并返回1代表成功;否則返回0代表失敗
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

2、unlock解鎖操作

  public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            ...
        }
     
    }

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        // 執(zhí)行解鎖操作
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        // 操作成功之后做的事
        future.onComplete((opStatus, e) -> {
            // 取消續(xù)約task
            cancelExpirationRenewal(threadId);
            
            ...

        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 如果key以及當(dāng)前線(xiàn)程對(duì)應(yīng)的記錄已經(jīng)不存在, 直接返回空
        // 否在將field(也就是getLockName(threadId))對(duì)應(yīng)的value減1
        //   - 如果減去1之后值還大于0, 那么重新延長(zhǎng)過(guò)期時(shí)間
        //   - 如果減去之后值小于等于0, 那么直接刪除key, 并發(fā)布訂閱消息
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

以上便是 redisson 客戶(hù)端工具對(duì) redis 分布式鎖的加/解鎖具體實(shí)現(xiàn),主要解決了以下幾個(gè)問(wèn)題

    1、死鎖問(wèn)題:設(shè)置過(guò)期時(shí)間

    2、可重入問(wèn)題:重入+1, 釋放鎖-1,當(dāng)值=0時(shí)代表完全釋放鎖

    3、續(xù)約問(wèn)題:可解決鎖提前釋放問(wèn)題

    4、鎖釋放:誰(shuí)加鎖就由誰(shuí)來(lái)釋放

總結(jié):

本文由一個(gè)線(xiàn)上問(wèn)題做引子,通過(guò) redis 分布式鎖的常用實(shí)現(xiàn)方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實(shí)現(xiàn)細(xì)節(jié)

相關(guān)參考:

到此這篇關(guān)于Redisson如何解決Redis分布式鎖提前釋放問(wèn)題的文章就介紹到這了,更多相關(guān)Redis分布式鎖提前釋放內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談Redis 緩存的三大問(wèn)題及其解決方案

    淺談Redis 緩存的三大問(wèn)題及其解決方案

    Redis 經(jīng)常用于系統(tǒng)中的緩存,這樣可以解決目前 IO 設(shè)備無(wú)法滿(mǎn)足互聯(lián)網(wǎng)應(yīng)用海量的讀寫(xiě)請(qǐng)求的問(wèn)題。本文主要介紹了淺談Redis 緩存的三大問(wèn)題及其解決方案,感興趣的可以了解一下
    2021-07-07
  • 一篇文章帶你徹底搞懂Redis?事務(wù)

    一篇文章帶你徹底搞懂Redis?事務(wù)

    這篇文章主要介紹了一篇文章帶你徹底搞懂Redis?事務(wù)的相關(guān)資料,需要的朋友可以參考下
    2022-10-10
  • Redis Cluster 集群搭建你會(huì)嗎

    Redis Cluster 集群搭建你會(huì)嗎

    這篇文章主要介紹了Redis Cluster 集群搭建過(guò)程,本文分步驟通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-08-08
  • 使用Redis實(shí)現(xiàn)用戶(hù)積分排行榜的教程

    使用Redis實(shí)現(xiàn)用戶(hù)積分排行榜的教程

    這篇文章主要介紹了使用Redis實(shí)現(xiàn)用戶(hù)積分排行榜的教程,包括一個(gè)用PHP腳本進(jìn)行操作的例子,需要的朋友可以參考下
    2015-04-04
  • Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))

    Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))

    本文主要介紹了Redis的六種底層數(shù)據(jù)結(jié)構(gòu),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • 詳解redis desktop manager安裝及連接方式

    詳解redis desktop manager安裝及連接方式

    這篇文章主要介紹了redis desktop manager安裝及連接方式,本文圖文并茂給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-09-09
  • 淺析Redis如何保證數(shù)據(jù)不丟失

    淺析Redis如何保證數(shù)據(jù)不丟失

    Redis是一種Nosql類(lèi)型的數(shù)據(jù)存儲(chǔ),全稱(chēng)Remote?Dictionary?Server,也就是遠(yuǎn)程字典服務(wù)器,本文主要來(lái)和大家討論一下Redis如何保證數(shù)據(jù)不丟失,需要的可以參考下
    2024-02-02
  • 淺談Redis處理接口冪等性的兩種方案

    淺談Redis處理接口冪等性的兩種方案

    本文主要介紹了淺談Redis處理接口冪等性的兩種方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • redis數(shù)據(jù)傾斜處理方法

    redis數(shù)據(jù)傾斜處理方法

    我們?cè)谑褂肦edis分片集群時(shí),集群最好的狀態(tài)就是每個(gè)實(shí)例可以處理相同或相近比例的請(qǐng)求,但如果不是這樣,則會(huì)出現(xiàn)某些實(shí)例壓力特別大,而某些實(shí)例特別空閑的情況發(fā)生,本文就一起來(lái)看下這種情況是如何發(fā)生的以及如何處理
    2022-12-12
  • Redis的使用模式之計(jì)數(shù)器模式實(shí)例

    Redis的使用模式之計(jì)數(shù)器模式實(shí)例

    這篇文章主要介紹了Redis的使用模式之計(jì)數(shù)器模式實(shí)例,本文講解了匯總計(jì)數(shù)器、按時(shí)間匯總的計(jì)數(shù)器、速度控制、使用 Hash 數(shù)據(jù)類(lèi)型維護(hù)大量計(jì)數(shù)器等內(nèi)容,需要的朋友可以參考下
    2015-03-03

最新評(píng)論