Redisson分布式閉鎖RCountDownLatch的使用詳細講解
更新時間:2023年02月11日 11:04:12 作者:每天都要進步一點點
分布式鎖和我們java基礎中學習到的synchronized略有不同,synchronized中我們的鎖是個對象,當前系統(tǒng)部署在不同的服務實例上,單純使用synchronized或者lock已經無法滿足對庫存一致性的判斷。本次主要講解基于rediss實現的分布式鎖
本篇文章基于redisson-3.17.6版本源碼進行分析
一、RCountDownLatch的使用
RCountDownLatch的功能跟CountDownLatch,用于實現某個線程需要等待其他線程都完成之后,我再去執(zhí)行,這種場景就可以使用CountDownLatch。
@Test public void testRCountDownLatch() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redissonClient = Redisson.create(config); RCountDownLatch rCountDownLatch = redissonClient.getCountDownLatch("anyCountDownLatch"); rCountDownLatch.trySetCount(5); for (int i = 1; i <= 5; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "離開教師..."); rCountDownLatch.countDown(); }, "A" + i).start(); } try { rCountDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("班長鎖門..."); }
A1離開教師...
A2離開教師...
A4離開教師...
A3離開教師...
A5離開教師...
班長鎖門...
二、trySetCount()設置計數器
/** * 僅當先前的計數已達到零或根本未設置時才設置新的計數值。 */ boolean trySetCount(long count);
public RFuture<Boolean> trySetCountAsync(long count) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 往redis中寫入一個String類型的數據 anyCountDownLatch:5 "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count); }
同樣,在redis中寫入了一個{key}:{計數器總數}的String類型的數據。
三、countDown()源碼
減少鎖存器的計數器。當計數達到零時通知所有等待線程。
public RFuture<Void> countDownAsync() { return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 減少redis中計數器的值 "local v = redis.call('decr', KEYS[1]);" + // 計數器減為0后,刪除對應的key "if v <= 0 then redis.call('del', KEYS[1]) end;" + "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE); }
四、await()源碼
等到計數器達到零。
public void await() throws InterruptedException { // 如果計數器為0,直接返回 if (getCount() == 0) { return; } // 訂閱redisson_countdownlatch__channel__{anyCountDownLatch}的消息 CompletableFuture<RedissonCountDownLatchEntry> future = subscribe(); RedissonCountDownLatchEntry entry = commandExecutor.getInterrupted(future); try { // 不斷循環(huán)判斷計數器的值是否大于0,大于0說明還有線程沒執(zhí)行完成,在這里阻塞:LockSupport.park(this) while (getCount() > 0) { // waiting for open state entry.getLatch().await(); } } finally { unsubscribe(entry); } }
到此這篇關于Redisson分布式閉鎖RCountDownLatch的使用詳細講解的文章就介紹到這了,更多相關Redisson RCountDownLatch內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!