淺談Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)
通過(guò)zookeeper實(shí)現(xiàn)分布式鎖
1、創(chuàng)建zookeeper的client
首先通過(guò)CuratorFrameworkFactory創(chuàng)建一個(gè)連接zookeeper的連接CuratorFramework client
public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class); private String connectionString; private int sessionTimeoutMs; private int connectionTimeoutMs; private RetryPolicy retryPolicy; private CuratorFramework client; public CuratorFactoryBean(String connectionString) { this(connectionString, 500, 500); } public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) { this.connectionString = connectionString; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; } @Override public void destroy() throws Exception { LOGGER.info("Closing curator framework..."); this.client.close(); LOGGER.info("Closed curator framework."); } @Override public CuratorFramework getObject() throws Exception { return this.client; } @Override public Class<?> getObjectType() { return this.client != null ? this.client.getClass() : CuratorFramework.class; } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { if (StringUtils.isEmpty(this.connectionString)) { throw new IllegalStateException("connectionString can not be empty."); } else { if (this.retryPolicy == null) { this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000); } this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy); this.client.start(); this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS); } } public void setConnectionString(String connectionString) { this.connectionString = connectionString; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public void setRetryPolicy(RetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; } public void setClient(CuratorFramework client) { this.client = client; } }
2、封裝分布式鎖
根據(jù)CuratorFramework創(chuàng)建InterProcessMutex(分布式可重入排它鎖)對(duì)一行數(shù)據(jù)進(jìn)行上鎖
public InterProcessMutex(CuratorFramework client, String path) { this(client, path, new StandardLockInternalsDriver()); }
使用 acquire方法
1、acquire() :入?yún)榭?,調(diào)用該方法后,會(huì)一直堵塞,直到搶奪到鎖資源,或者zookeeper連接中斷后,上拋異常。
2、acquire(long time, TimeUnit unit):入?yún)魅氤瑫r(shí)時(shí)間、單位,搶奪時(shí),如果出現(xiàn)堵塞,會(huì)在超過(guò)該時(shí)間后,返回false。
public void acquire() throws Exception { if (!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } } public boolean acquire(long time, TimeUnit unit) throws Exception { return this.internalLock(time, unit); }
釋放鎖 mutex.release();
public void release() throws Exception { Thread currentThread = Thread.currentThread(); InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData == null) { throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath); } else { int newLockCount = lockData.lockCount.decrementAndGet(); if (newLockCount <= 0) { if (newLockCount < 0) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath); } else { try { this.internals.releaseLock(lockData.lockPath); } finally { this.threadData.remove(currentThread); } } } } }
封裝后的DLock代碼
1、調(diào)用InterProcessMutex processMutex = dLock.mutex(path);
2、手動(dòng)釋放鎖processMutex.release();
3、需要手動(dòng)刪除路徑dLock.del(path);
推薦 使用:
都是 函數(shù)式編程
在業(yè)務(wù)代碼執(zhí)行完畢后 會(huì)釋放鎖和刪除path
1、這個(gè)有返回結(jié)果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、這個(gè)無(wú)返回結(jié)果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)
public class DLock { private final Logger logger; private static final long TIMEOUT_D = 100L; private static final String ROOT_PATH_D = "/dLock"; private String lockRootPath; private CuratorFramework client; public DLock(CuratorFramework client) { this("/dLock", client); } public DLock(String lockRootPath, CuratorFramework client) { this.logger = LoggerFactory.getLogger(DLock.class); this.lockRootPath = lockRootPath; this.client = client; } public InterProcessMutex mutex(String path) { if (!StringUtils.startsWith(path, "/")) { path = Constant.keyBuilder(new Object[]{"/", path}); } return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path})); } public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException { return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS); } public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException { String finalPath = this.getLockPath(path); InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath); try { if (!mutex.acquire(time, timeUnit)) { throw new ZkLockException("acquire zk lock return false"); } } catch (Exception var13) { throw new ZkLockException("acquire zk lock failed.", var13); } T var8; try { var8 = zkLockCallback.doInLock(); } finally { this.releaseLock(finalPath, mutex); } return var8; } private void releaseLock(String finalPath, InterProcessMutex mutex) { try { mutex.release(); this.logger.info("delete zk node path:{}", finalPath); this.deleteInternal(finalPath); } catch (Exception var4) { this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4); // LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4}); } } public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException { String finalPath = this.getLockPath(path); InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath); try { if (!mutex.acquire(time, timeUnit)) { throw new ZkLockException("acquire zk lock return false"); } } catch (Exception var13) { throw new ZkLockException("acquire zk lock failed.", var13); } try { zkLockCallback.response(); } finally { this.releaseLock(finalPath, mutex); } } public String getLockPath(String customPath) { if (!StringUtils.startsWith(customPath, "/")) { customPath = Constant.keyBuilder(new Object[]{"/", customPath}); } String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath}); return finalPath; } private void deleteInternal(String finalPath) { try { ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath); } catch (Exception var3) { this.logger.info("delete zk node path:{} failed", finalPath); } } public void del(String customPath) { String lockPath = ""; try { lockPath = this.getLockPath(customPath); ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath); } catch (Exception var4) { this.logger.info("delete zk node path:{} failed", lockPath); } } }
@FunctionalInterface public interface ZkLockCallback<T> { T doInLock(); } @FunctionalInterface public interface ZkVoidCallBack { void response(); } public class ZkLockException extends Exception { public ZkLockException() { } public ZkLockException(String message) { super(message); } public ZkLockException(String message, Throwable cause) { super(message, cause); } }
配置CuratorConfig
@Configuration public class CuratorConfig { @Value("${zk.connectionString}") private String connectionString; @Value("${zk.sessionTimeoutMs:500}") private int sessionTimeoutMs; @Value("${zk.connectionTimeoutMs:500}") private int connectionTimeoutMs; @Value("${zk.dLockRoot:/dLock}") private String dLockRoot; @Bean public CuratorFactoryBean curatorFactoryBean() { return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs); } @Bean @Autowired public DLock dLock(CuratorFramework client) { return new DLock(dLockRoot, client); } }
測(cè)試代碼
@RestController @RequestMapping("/dLock") public class LockController { @Autowired private DLock dLock; @RequestMapping("/lock") public Map testDLock(String no){ final String path = Constant.keyBuilder("/test/no/", no); Long mutex=0l; try { System.out.println("在拿鎖:"+path+System.currentTimeMillis()); mutex = dLock.mutex(path, () -> { try { System.out.println("拿到鎖了" + System.currentTimeMillis()); Thread.sleep(10000); System.out.println("操作完成了" + System.currentTimeMillis()); } finally { return System.currentTimeMillis(); } }, 1000, TimeUnit.MILLISECONDS); } catch (ZkLockException e) { System.out.println("拿不到鎖呀"+System.currentTimeMillis()); } return Collections.singletonMap("ret",mutex); } @RequestMapping("/dlock") public Map testDLock1(String no){ final String path = Constant.keyBuilder("/test/no/", no); Long mutex=0l; try { System.out.println("在拿鎖:"+path+System.currentTimeMillis()); InterProcessMutex processMutex = dLock.mutex(path); processMutex.acquire(); System.out.println("拿到鎖了" + System.currentTimeMillis()); Thread.sleep(10000); processMutex.release(); System.out.println("操作完成了" + System.currentTimeMillis()); } catch (ZkLockException e) { System.out.println("拿不到鎖呀"+System.currentTimeMillis()); e.printStackTrace(); }catch (Exception e){ e.printStackTrace(); } return Collections.singletonMap("ret",mutex); } @RequestMapping("/del") public Map delDLock(String no){ final String path = Constant.keyBuilder("/test/no/", no); dLock.del(path); return Collections.singletonMap("ret",1); } }
以上所述是小編給大家介紹的Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)詳解整合,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
- 使用dubbo+zookeeper+spring boot構(gòu)建服務(wù)的方法詳解
- SpringBoot中dubbo+zookeeper實(shí)現(xiàn)分布式開(kāi)發(fā)的應(yīng)用詳解
- SpringBoot集成Curator實(shí)現(xiàn)Zookeeper基本操作的代碼示例
- SpringBoot系列教程之dubbo和Zookeeper集成方法
- SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用
- springboot應(yīng)用訪問(wèn)zookeeper的流程
- SpringBoot整合Zookeeper詳細(xì)教程
- Java Spring Boot 集成Zookeeper
- SpringBoot讀取ZooKeeper(ZK)屬性的方法實(shí)現(xiàn)
相關(guān)文章
Java關(guān)鍵字final的實(shí)現(xiàn)原理分析
這篇文章主要介紹了Java關(guān)鍵字final的實(shí)現(xiàn)原理分析,在JDK8之前,如果在匿名內(nèi)部類(lèi)中需要訪問(wèn)局部變量,那么這個(gè)局部變量一定是final修飾的,但final關(guān)鍵字可以省略,需要的朋友可以參考下2024-01-01java學(xué)生管理系統(tǒng)界面簡(jiǎn)單實(shí)現(xiàn)(全)
這篇文章主要為大家詳細(xì)介紹了java學(xué)生管理系統(tǒng)界面的簡(jiǎn)單實(shí)現(xiàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01利用Java實(shí)現(xiàn)和可被K整除的子數(shù)組完整實(shí)例
這篇文章主要給大家介紹了關(guān)于利用Java實(shí)現(xiàn)和可被K整除的子數(shù)組的相關(guān)資料,這道題來(lái)自力扣,通過(guò)學(xué)習(xí)這道題的解題思路以及代碼對(duì)大家的學(xué)習(xí)或者工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2024-01-01SpringBoot集成QQ第三方登陸的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot集成QQ第三方登陸的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11java基本教程之synchronized關(guān)鍵字 java多線程教程
這篇文章主要介紹了java的synchronized原理、synchronized基本規(guī)則、synchronized方法 和 synchronized代碼塊、實(shí)例鎖和全局鎖2014-01-01springboot項(xiàng)目使用nohup將日志指定輸出文件過(guò)大問(wèn)題及解決辦法
在Spring Boot項(xiàng)目中,使用nohup命令重定向日志輸出到文件可能會(huì)使日志文件過(guò)大,文章介紹了兩種解決方法:一是創(chuàng)建腳本直接清除日志文件,二是創(chuàng)建腳本保留部分日志內(nèi)容,并將這些腳本加入定時(shí)任務(wù)中,這可以有效控制日志文件的大小,避免占用過(guò)多磁盤(pán)空間2024-10-10Java中的Runnable,Callable,F(xiàn)uture,F(xiàn)utureTask的比較
這篇文章主要介紹了Java中的Runnable,Callable,F(xiàn)uture,F(xiàn)utureTask的比較的相關(guān)資料,需要的朋友可以參考下2017-02-02Lombok的@CustomLog流暢的公司多場(chǎng)景日志
這篇文章主要為大家介紹了Lombok的@CustomLog流暢的公司多場(chǎng)景日志開(kāi)發(fā)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02