亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

redisson 實(shí)現(xiàn)分布式鎖的源碼解析

 更新時(shí)間:2022年05月05日 16:06:59   作者:心城以北  
這篇文章主要介紹了redisson 實(shí)現(xiàn)分布式鎖的源碼解析,通過模擬一個(gè)商品秒殺的場景結(jié)合示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

redisson

redisson 實(shí)現(xiàn)分布式鎖的機(jī)制如下:

依賴版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

測試代碼

下面是模擬一個(gè)商品秒殺的場景,示例代碼如下:

public class RedissonTest {
    public static void main(String[] args) {
        //1. 配置部分
        Config config = new Config();
        String address = "redis://127.0.0.1:6379";
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress(address);
        serverConfig.setDatabase(0);
        config.setLockWatchdogTimeout(5000);
        Redisson redisson = (Redisson) Redisson.create(config);
        RLock rLock = redisson.getLock("goods:1000:1");
        //2. 加鎖
        rLock.lock();
        try {
            System.out.println("todo 邏輯處理 1000000.");
        } finally {
            if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                //3. 解鎖
                rLock.unlock();
            }
        }
    }
}

加鎖設(shè)計(jì)

rLock.lock();是加鎖的核心代碼,我們一起來看看調(diào)用棧

加鎖的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

其實(shí)它的本質(zhì)是調(diào)用一段 LUA 腳本進(jìn)行加鎖。

鎖續(xù)期設(shè)計(jì)

鎖的續(xù)期是在 org.redisson.RedissonLock#tryAcquireAsync方法中調(diào)用 scheduleExpirationRenewal實(shí)現(xiàn)的。

續(xù)期需要注意的是,看門狗是設(shè)置在主線程的延遲隊(duì)列的線程中。

tryAcquireAsync 代碼如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 鎖過期時(shí)間續(xù)期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

鎖續(xù)期 scheduleExpirationRenewal代碼如下:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

然后在調(diào)用 renewExpiration();執(zhí)行續(xù)期邏輯

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 創(chuàng)建延遲任務(wù)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 真正的續(xù)期,調(diào)用 LUA 腳本續(xù)期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                // 如果續(xù)期成功
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}

renewExpirationAsync方法, 里面還是一段 LUA 腳本,進(jìn)行重新設(shè)置鎖的過期時(shí)間。

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

鎖的自旋重試

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在執(zhí)行獲取鎖失敗的時(shí)候,會進(jìn)入重試。其實(shí)這里就會執(zhí)行 18 行以后的 while (true) 邏輯

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 阻塞鎖的超時(shí)時(shí)間,等鎖過期后再嘗試加鎖
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其實(shí)這里就是一個(gè)間歇性自旋。 等到上次鎖過期的時(shí)間,在喚醒進(jìn)行搶鎖 entry.getLatch().acquire();

還有一個(gè)邏輯就是

CompletableFuture future = subscribe(threadId);

這里其實(shí)是會訂閱一個(gè)消息,如果解鎖過后,會發(fā)布解鎖的消息。

解鎖設(shè)計(jì)

rLock.unlock(); 的核心就是釋放鎖,撤銷續(xù)期和喚醒在等待加鎖的線程(發(fā)布解鎖成功消息)。

核心方法(解鎖): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        // 發(fā)布解鎖成功消息
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

還是 LUA 的執(zhí)行方式。

撤銷鎖續(xù)期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 解鎖
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    // 撤銷續(xù)期
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);
        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }
        return null;
    });
    return new CompletableFutureWrapper<>(f);
}

解鎖成功喚排隊(duì)線程

org.redisson.pubsub.LockPubSub#onMessage中回去喚醒阻塞的線程,讓執(zhí)行前面的鎖自旋邏輯,具體代碼如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }
        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

到此這篇關(guān)于redisson 實(shí)現(xiàn)分布式鎖的文章就介紹到這了,更多相關(guān)redisson 分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解JavaWeb如何實(shí)現(xiàn)文件上傳和下載功能

    詳解JavaWeb如何實(shí)現(xiàn)文件上傳和下載功能

    這篇文章主要介紹了如何利用JavaWeb實(shí)現(xiàn)文件的上傳和下載功能,文中的示例代碼講解詳細(xì),對我們的學(xué)習(xí)或工作有一定的幫助,感興趣的小伙伴可以學(xué)習(xí)一下
    2021-12-12
  • Java 隨機(jī)生成任意組電話號碼過程解析

    Java 隨機(jī)生成任意組電話號碼過程解析

    這篇文章主要介紹了Java 隨機(jī)生成任意組電話號碼過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • Spring配置文件的拆分和整合過程分析

    Spring配置文件的拆分和整合過程分析

    在實(shí)際應(yīng)用里,隨著應(yīng)用規(guī)模的增加,系統(tǒng)中 Bean 數(shù)量也大量增加,導(dǎo)致配置文件非常龐大。為了避免這種情況的產(chǎn)生,提高配置文件的可讀性與可維護(hù)性,可以將Spring 配置文件分解成多個(gè)配置文件,感興趣的朋友跟隨小編一起看看吧
    2022-10-10
  • springboot后端解決跨域問題

    springboot后端解決跨域問題

    今天小編就為大家分享一篇關(guān)于springboot后端解決跨域問題,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • SpringBoot模板引擎之Thymeleaf的使用

    SpringBoot模板引擎之Thymeleaf的使用

    這篇文章主要介紹了SpringBoot模板引擎之Thymeleaf的使用,模板引擎是以業(yè)務(wù)邏輯層和表現(xiàn)層分離為目的的,將規(guī)定格式的模板代碼轉(zhuǎn)換為業(yè)務(wù)數(shù)據(jù)的算法實(shí)現(xiàn),它可以是一個(gè)過程代碼、一個(gè)類,甚至是一個(gè)類庫,需要的朋友可以參考下
    2023-10-10
  • Spring中常見的7種BeanDefinition詳解

    Spring中常見的7種BeanDefinition詳解

    在?Spring?容器中,我們廣泛使用的是一個(gè)一個(gè)的?Bean,BeanDefinition?從名字上就可以看出是關(guān)于?Bean?的定義,下面就跟隨小編一起深入了解一下常見的7中BeanDefinition吧
    2023-09-09
  • Admin - SpringBoot + Maven 多啟動環(huán)境配置實(shí)例詳解

    Admin - SpringBoot + Maven 多啟動環(huán)境配置實(shí)例詳解

    這篇文章主要介紹了Admin - SpringBoot + Maven 多啟動環(huán)境配置,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • idea環(huán)境下Maven無法正常下載pom中配置的包問題

    idea環(huán)境下Maven無法正常下載pom中配置的包問題

    這篇文章主要介紹了idea環(huán)境下Maven無法正常下載pom中配置的包的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Java規(guī)則引擎easy-rules詳細(xì)介紹

    Java規(guī)則引擎easy-rules詳細(xì)介紹

    本文主要介紹了Java規(guī)則引擎easy-rules詳細(xì)介紹,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • 初識Java基礎(chǔ)之?dāng)?shù)據(jù)類型與運(yùn)算符

    初識Java基礎(chǔ)之?dāng)?shù)據(jù)類型與運(yùn)算符

    Java是一種強(qiáng)類型語言,每個(gè)變量都必須聲明其數(shù)據(jù)類型,下面這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)之?dāng)?shù)據(jù)類型與運(yùn)算符的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2021-10-10

最新評論