亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Redisson分布式鎖源碼解析

 更新時(shí)間:2018年08月02日 08:25:22   投稿:laozhang  
文章給大家分享了關(guān)于Redisson分布式鎖源碼相關(guān)的知識(shí)點(diǎn)內(nèi)容,有興趣的朋友們可以參考學(xué)習(xí)下。

Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時(shí),重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細(xì)節(jié)進(jìn)行一一分析。

鎖的接口定義了一下方法:

分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我們來看一下方法的具體實(shí)現(xiàn):

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return true;
  }
  
  time -= (System.currentTimeMillis() - current);
  if (time <= 0) {
   acquireFailed(threadId);
   return false;
  }
  
  current = System.currentTimeMillis();
  final RFuture subscribeFuture = subscribe(threadId);
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
   if (!subscribeFuture.cancel(false)) {
    subscribeFuture.addListener(new FutureListener() {
     @Override
     public void operationComplete(Future future) throws Exception {
      if (subscribeFuture.isSuccess()) {
       unsubscribe(subscribeFuture, threadId);
      }
     }
    });
   }
   acquireFailed(threadId);
   return false;
  }

  try {
   time -= (System.currentTimeMillis() - current);
   if (time <= 0) {
    acquireFailed(threadId);
    return false;
   }
  
   while (true) {
    long currentTime = System.currentTimeMillis();
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     return true;
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time = 0 && ttl < time) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= (System.currentTimeMillis() - currentTime);
    if (time <= 0) {
     acquireFailed(threadId);
     return false;
    }
   }
  } finally {
   unsubscribe(subscribeFuture, threadId);
  }
//  return get(tryLockAsync(waitTime, leaseTime, unit));
 }

首先我們看到調(diào)用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過期時(shí)間TTL來判定的(TTL

下面我們接著看一下tryAcquire的實(shí)現(xiàn):

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
 return get(tryAcquireAsync(leaseTime, unit, threadId));
}

可以看到真正獲取鎖的操作經(jīng)過一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯(cuò)誤,歡迎指正。

get 是由CommandAsyncExecutor(一個(gè)線程Executor)封裝的一個(gè)Executor

設(shè)置一個(gè)單線程的同步控制器CountDownLatch,用于控制單個(gè)線程的中斷信息。個(gè)人理解經(jīng)過中間的這么一步:主要是為了支持線程可中斷操作。

public V get(RFuture future) {
 if (!future.isDone()) {
  final CountDownLatch l = new CountDownLatch(1);
  future.addListener(new FutureListener() {
   @Override
   public void operationComplete(Future future) throws Exception {
    l.countDown();
   }
  });
  
  boolean interrupted = false;
  while (!future.isDone()) {
   try {
    l.await();
   } catch (InterruptedException e) {
    interrupted = true;
   }
  }
  
  if (interrupted) {
   Thread.currentThread().interrupt();
  }
 }

 // commented out due to blocking issues up to 200 ms per minute for each thread:由于每個(gè)線程的阻塞問題,每分鐘高達(dá)200毫秒
 // future.awaitUninterruptibly();
 if (future.isSuccess()) {
  return future.getNow();
 }

 throw convertException(future);
}

我們進(jìn)一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 if (leaseTime != -1) {
  return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.addListener(new FutureListener() {
  @Override
  public void operationComplete(Future future) throws Exception {
   if (!future.isSuccess()) {
    return;
   }

   Long ttlRemaining = future.getNow();
   // lock acquired
   if (ttlRemaining == null) {
    scheduleExpirationRenewal(threadId);
   }
  }
 });
 return ttlRemainingFuture;
}

首先判斷鎖是否有超時(shí)時(shí)間,有過期時(shí)間的話,會(huì)在后面獲取鎖的時(shí)候設(shè)置進(jìn)去。沒有過期時(shí)間的話,則會(huì)用默認(rèn)的

private long lockWatchdogTimeout = 30 * 1000;

下面我們在進(jìn)一步往下分析真正獲取鎖的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
     "redis.call('hset', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
     "redis.call('pexpire', KEYS[1], ARGV[1]); " +
     "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):

1:真正執(zhí)行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執(zhí)行。

2:鎖真正持久化到Redis時(shí),用的hash類型key field value

3:獲取鎖的三個(gè)參數(shù):getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時(shí)間;getLockName則是鎖對應(yīng)的線程級(jí)別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學(xué)可能會(huì)問,這樣不是很縝密:不同的JVM可能會(huì)生成相同的threadId,所以Redission這里加了一個(gè)區(qū)分度很高的UUID;

Lua腳本中的執(zhí)行分為以下三步:

1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時(shí)把邏輯鎖名稱KEYS[1],線程級(jí)別的鎖名稱[ARGV[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過期時(shí)間ARGV[2],返回;

2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時(shí)會(huì)自增對應(yīng)的value值,記錄重入次數(shù);并更新鎖的過期時(shí)間

3:key不存,直接返回key的剩余過期時(shí)間(-2)

相關(guān)文章

  • 分布式系統(tǒng)中的降級(jí)熔斷設(shè)計(jì)問題面試

    分布式系統(tǒng)中的降級(jí)熔斷設(shè)計(jì)問題面試

    這篇文章主要為大家介紹了分布式系統(tǒng)中的降級(jí)熔斷設(shè)計(jì)問題面試解答,有需要的朋友可以借鑒參考下,希望能有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-03-03
  • 解決Mybatis的@Param()注解導(dǎo)致分頁失效的問題

    解決Mybatis的@Param()注解導(dǎo)致分頁失效的問題

    這篇文章主要介紹了解決Mybatis的@Param()注解導(dǎo)致分頁失效的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • spring拓展之如何定義自己的namespace

    spring拓展之如何定義自己的namespace

    這篇文章主要介紹了spring拓展之如何定義自己的namespace方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java四舍五入時(shí)保留指定小數(shù)位數(shù)的五種方式

    Java四舍五入時(shí)保留指定小數(shù)位數(shù)的五種方式

    這篇文章主要介紹了Java四舍五入時(shí)保留指定小數(shù)位數(shù)的五種方式,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-09-09
  • SpringMVC數(shù)據(jù)響應(yīng)詳細(xì)介紹

    SpringMVC數(shù)據(jù)響應(yīng)詳細(xì)介紹

    Spring MVC 是 Spring 提供的一個(gè)基于 MVC 設(shè)計(jì)模式的輕量級(jí) Web 開發(fā)框架,本質(zhì)上相當(dāng)于 Servlet,Spring MVC 角色劃分清晰,分工明細(xì),本章來講解SpringMVC數(shù)據(jù)響應(yīng)
    2023-02-02
  • 詳解Spring Boot中MyBatis的使用方法

    詳解Spring Boot中MyBatis的使用方法

    mybatis初期使用比較麻煩,需要各種配置文件、實(shí)體類、dao層映射關(guān)聯(lián)、還有一大推其它配置。當(dāng)然mybatis也發(fā)現(xiàn)了這種弊端。下面通過本文給大家詳細(xì)介紹Spring Boot中MyBatis的使用方法,感興趣的朋友一起看看吧
    2017-07-07
  • 基于@Bean修飾的方法參數(shù)的注入方式

    基于@Bean修飾的方法參數(shù)的注入方式

    這篇文章主要介紹了@Bean修飾的方法參數(shù)的注入方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java?在游戲中探索數(shù)組二維數(shù)組

    Java?在游戲中探索數(shù)組二維數(shù)組

    數(shù)組和二維數(shù)組感覺用王者榮耀的裝備欄來舉例解釋,應(yīng)該更易懂一些。從基礎(chǔ)開始講,后續(xù)會(huì)講到JAVA高級(jí),中間會(huì)穿插面試題和項(xiàng)目實(shí)戰(zhàn),希望能給大家?guī)韼椭?/div> 2022-03-03
  • Java實(shí)現(xiàn)局域網(wǎng)聊天室功能(私聊、群聊)

    Java實(shí)現(xiàn)局域網(wǎng)聊天室功能(私聊、群聊)

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)局域網(wǎng)聊天室功能,包括私聊、群聊,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • Spring AI 使用本地 Ollama Embeddings的操作方法

    Spring AI 使用本地 Ollama Embeddings的操作方法

    使用 OpenAI 的 Embeddings 接口是有費(fèi)用的,如果想對大量文檔進(jìn)行測試,使用本地部署的 Embeddings 就能省去大量的費(fèi)用,所以我們嘗試使用本地的 Ollama Embeddings,這篇文章主要介紹了Spring AI 使用本地 Ollama Embeddings,需要的朋友可以參考下
    2024-05-05

最新評(píng)論