ZooKeeper入門(mén)教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼
ZooKeeper入門(mén)教程一簡(jiǎn)介與核心概念
ZooKeeper入門(mén)教程二在單機(jī)和集群環(huán)境下的安裝搭建及使用
1.0版本
首先我們先介紹一個(gè)簡(jiǎn)單的zookeeper實(shí)現(xiàn)分布式鎖的思路:
用zookeeper中一個(gè)臨時(shí)節(jié)點(diǎn)代表鎖,比如在/exlusive_lock下創(chuàng)建臨時(shí)子節(jié)點(diǎn)/exlusive_lock/lock。
- 所有客戶(hù)端爭(zhēng)相創(chuàng)建此節(jié)點(diǎn),但只有一個(gè)客戶(hù)端創(chuàng)建成功。
- 創(chuàng)建成功代表獲取鎖成功,此客戶(hù)端執(zhí)行業(yè)務(wù)邏輯
- 未創(chuàng)建成功的客戶(hù)端,監(jiān)聽(tīng)/exlusive_lock變更
- 獲取鎖的客戶(hù)端執(zhí)行完成后,刪除/exlusive_lock/lock,表示鎖被釋放
- 鎖被釋放后,其他監(jiān)聽(tīng)/exlusive_lock變更的客戶(hù)端得到通知,再次爭(zhēng)相創(chuàng)建臨時(shí)子節(jié)點(diǎn)/exlusive_lock/lock。此時(shí)相當(dāng)于回到了第2步。
我們的程序按照上述邏輯直至搶占到鎖,執(zhí)行完業(yè)務(wù)邏輯。
上述是較為簡(jiǎn)單的分布式鎖實(shí)現(xiàn)方式。能夠應(yīng)付一般使用場(chǎng)景,但存在著如下兩個(gè)問(wèn)題:
1、鎖的獲取順序和最初客戶(hù)端爭(zhēng)搶順序不一致,這不是一個(gè)公平鎖。每次鎖獲取都是當(dāng)次最先搶到鎖的客戶(hù)端。
2、羊群效應(yīng),所有沒(méi)有搶到鎖的客戶(hù)端都會(huì)監(jiān)聽(tīng)/exlusive_lock變更。當(dāng)并發(fā)客戶(hù)端很多的情況下,所有的客戶(hù)端都會(huì)接到通知去爭(zhēng)搶鎖,此時(shí)就出現(xiàn)了羊群效應(yīng)。
為了解決上面的問(wèn)題,我們重新設(shè)計(jì)。
2.0版本
我們?cè)?.0版本中,讓每個(gè)客戶(hù)端在/exlusive_lock下創(chuàng)建的臨時(shí)節(jié)點(diǎn)為有序節(jié)點(diǎn),這樣每個(gè)客戶(hù)端都在/exlusive_lock下有自己對(duì)應(yīng)的鎖節(jié)點(diǎn),而序號(hào)排在最前面的節(jié)點(diǎn),代表對(duì)應(yīng)的客戶(hù)端獲取鎖成功。排在后面的客戶(hù)端監(jiān)聽(tīng)自己前面一個(gè)節(jié)點(diǎn),那么在他前序客戶(hù)端執(zhí)行完成后,他將得到通知,獲得鎖成功。邏輯修改如下:
- 每個(gè)客戶(hù)端往/exlusive_lock下創(chuàng)建有序臨時(shí)節(jié)點(diǎn)/exlusive_lock/lock_。創(chuàng)建成功后/exlusive_lock下面會(huì)有每個(gè)客戶(hù)端對(duì)應(yīng)的節(jié)點(diǎn),如/exlusive_lock/lock_000000001
- 客戶(hù)端取得/exlusive_lock下子節(jié)點(diǎn),并進(jìn)行排序,判斷排在最前面的是否為自己。如果自己的鎖節(jié)點(diǎn)在第一位,代表獲取鎖成功,此客戶(hù)端執(zhí)行業(yè)務(wù)邏輯
- 如果自己的鎖節(jié)點(diǎn)不在第一位,則監(jiān)聽(tīng)自己前一位的鎖節(jié)點(diǎn)。例如,自己鎖節(jié)點(diǎn)lock_000000002,那么則監(jiān)聽(tīng)lock_000000001.
- 當(dāng)前一位鎖節(jié)點(diǎn)(lock_000000001)對(duì)應(yīng)的客戶(hù)端執(zhí)行完成,釋放了鎖,將會(huì)觸發(fā)監(jiān)聽(tīng)客戶(hù)端(lock_000000002)的邏輯。
- 監(jiān)聽(tīng)客戶(hù)端重新執(zhí)行第2步邏輯,判斷自己是否獲得了鎖。
如此修改后,每個(gè)客戶(hù)端只關(guān)心自己前序鎖是否釋放,所以每次只會(huì)有一個(gè)客戶(hù)端得到通知。而且,所有客戶(hù)端的執(zhí)行順序和最初鎖創(chuàng)建的順序是一致的。解決了1.0版本的兩個(gè)問(wèn)題。
接下來(lái)我們看看代碼如何實(shí)現(xiàn)。
LockSample類(lèi)
此類(lèi)是分布式鎖類(lèi),實(shí)現(xiàn)了2個(gè)分布式鎖的相關(guān)方法:
1、獲取鎖
2、釋放鎖
主要程序邏輯圍繞著這兩個(gè)方法的實(shí)現(xiàn),特別是獲取鎖的邏輯。我們先看一下該類(lèi)的成員變量:
private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath;
定義了zkClient,用來(lái)操作zookeeper。
鎖的根路徑,及自增節(jié)點(diǎn)的前綴。此處生產(chǎn)環(huán)境應(yīng)該由客戶(hù)端傳入。
當(dāng)前鎖的路徑。
構(gòu)造方法
public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); }
創(chuàng)建zkClient,同時(shí)創(chuàng)建了狀態(tài)監(jiān)聽(tīng)。此監(jiān)聽(tīng)可以去掉,這里只是打印出失去連接狀態(tài)。
獲取鎖實(shí)現(xiàn)
暴露出來(lái)的獲取鎖的方法為acquireLock(),邏輯很簡(jiǎn)單:
public void acquireLock() throws InterruptedException, KeeperException { //創(chuàng)建鎖節(jié)點(diǎn) createLock(); //嘗試獲取鎖 attemptLock(); }
首先創(chuàng)建鎖節(jié)點(diǎn),然后嘗試去取鎖。真正的邏輯都在這兩個(gè)方法中。
createLock()
先判斷鎖的根節(jié)點(diǎn)/Locks是否存在,不存在的話創(chuàng)建。然后在/Locks下創(chuàng)建有序臨時(shí)節(jié)點(diǎn),并設(shè)置當(dāng)前的鎖路徑變量lockPath。
代碼如下:
private void createLock() throws KeeperException, InterruptedException { //如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn) Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創(chuàng)建EPHEMERAL_SEQUENTIAL類(lèi)型節(jié)點(diǎn) String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath); this.lockPath=lockPath; }
attemptLock()
這是最核心的方法,客戶(hù)端嘗試去獲取鎖,是對(duì)2.0版本邏輯的實(shí)現(xiàn),這里就不再重復(fù)邏輯,直接看代碼:
private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節(jié)點(diǎn),按照節(jié)點(diǎn)序號(hào)排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號(hào)最小的節(jié)點(diǎn),則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號(hào)最小的節(jié)點(diǎn),監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn) String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個(gè)節(jié)點(diǎn)不存在了,比如說(shuō)執(zhí)行完畢,或者執(zhí)行節(jié)點(diǎn)掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當(dāng)前進(jìn)程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } }
注意這一行代碼
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
我們?cè)讷@取前一個(gè)節(jié)點(diǎn)的時(shí)候,同時(shí)設(shè)置了監(jiān)聽(tīng)watcher。如果前鎖存在,則阻塞主線程。
watcher定義代碼如下:
private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } };
watcher只是notifyAll,讓主線程繼續(xù)執(zhí)行,以便再次調(diào)用attemptLock(),去嘗試獲取lock。如果沒(méi)有異常情況的話,此時(shí)當(dāng)前客戶(hù)端應(yīng)該能夠成功獲取鎖。
釋放鎖實(shí)現(xiàn)
釋放鎖原語(yǔ)實(shí)現(xiàn)很簡(jiǎn)單,參照releaseLock()方法。代碼如下:
public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); }
關(guān)于分布式鎖的代碼到此就講解完了,我們?cè)倏聪驴蛻?hù)端如何使用它。
我們創(chuàng)建一個(gè)TicketSeller類(lèi),作為客戶(hù)端來(lái)使用分布式鎖。
TicketSeller類(lèi)
sell()
不帶鎖的業(yè)務(wù)邏輯方法,代碼如下:
private void sell(){ System.out.println("售票開(kāi)始"); // 線程隨機(jī)休眠數(shù)毫秒,模擬現(xiàn)實(shí)中的費(fèi)時(shí)操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復(fù)雜邏輯執(zhí)行了一段時(shí)間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結(jié)束"); }
僅是為了演示,sleep了一段時(shí)間。
sellTicketWithLock()
此方法中,加鎖后執(zhí)行業(yè)務(wù)邏輯,代碼如下:
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); }
測(cè)試入口
接下來(lái)我們寫(xiě)一個(gè)main函數(shù)做測(cè)試:
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } }
main函數(shù)中我們循環(huán)調(diào)用ticketSeller.sellTicketWithLock(),執(zhí)行加鎖后的賣(mài)票邏輯。
測(cè)試方法
1、先啟動(dòng)一個(gè)java程序運(yùn)行,可以看到日志輸出如下:
main 鎖創(chuàng)建: /Locks/Lock_0000000391 main 鎖獲得, lockPath: /Locks/Lock_0000000391 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000391 main 鎖創(chuàng)建: /Locks/Lock_0000000392 main 鎖獲得, lockPath: /Locks/Lock_0000000392 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000392 main 鎖創(chuàng)建: /Locks/Lock_0000000393 main 鎖獲得, lockPath: /Locks/Lock_0000000393 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000393
可見(jiàn)每次執(zhí)行都是按照鎖的順序執(zhí)行,而且由于只有一個(gè)進(jìn)程,并沒(méi)有鎖的爭(zhēng)搶發(fā)生。
2、我們?cè)賳?dòng)一個(gè)同樣的程序,鎖的爭(zhēng)搶此時(shí)發(fā)生了,可以看到雙方的日志輸出如下:
程序1:
main 鎖獲得, lockPath: /Locks/Lock_0000000471 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000471 main 鎖創(chuàng)建: /Locks/Lock_0000000473 等待前鎖釋放,prelocakPath:Lock_0000000472 /Locks/Lock_0000000472 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000473 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000473
可以看到Lock_0000000471執(zhí)行完成后,該進(jìn)程獲取的鎖為L(zhǎng)ock_0000000473,這說(shuō)明Lock_0000000472被另外一個(gè)進(jìn)程創(chuàng)建了。此時(shí)Lock_0000000473在等待前鎖釋放。Lock_0000000472釋放后,Lock_0000000473才獲得鎖,然后才執(zhí)行業(yè)務(wù)邏輯。
我們?cè)倏闯绦?的日志:
main 鎖獲得, lockPath: /Locks/Lock_0000000472 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000472 main 鎖創(chuàng)建: /Locks/Lock_0000000474 等待前鎖釋放,prelocakPath:Lock_0000000473 /Locks/Lock_0000000473 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000474 售票開(kāi)始 售票結(jié)束 鎖釋放:/Locks/Lock_0000000474
可以看到,確實(shí)是進(jìn)程2獲取了Lock_0000000472。
zookeeper實(shí)現(xiàn)分布式鎖就先講到這。注意代碼只做演示用,并不適合生產(chǎn)環(huán)境使用。
代碼清單如下:
1、LockSample
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; public class LockSample { //ZooKeeper配置信息 private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath; // 監(jiān)控lockPath的前一個(gè)節(jié)點(diǎn)的watcher private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } }; public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); } //獲取鎖的原語(yǔ)實(shí)現(xiàn). public void acquireLock() throws InterruptedException, KeeperException { //創(chuàng)建鎖節(jié)點(diǎn) createLock(); //嘗試獲取鎖 attemptLock(); } //創(chuàng)建鎖的原語(yǔ)實(shí)現(xiàn)。在lock節(jié)點(diǎn)下創(chuàng)建該線程的鎖節(jié)點(diǎn) private void createLock() throws KeeperException, InterruptedException { //如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn) Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創(chuàng)建EPHEMERAL_SEQUENTIAL類(lèi)型節(jié)點(diǎn) String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath); this.lockPath=lockPath; } private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節(jié)點(diǎn),按照節(jié)點(diǎn)序號(hào)排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號(hào)最小的節(jié)點(diǎn),則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號(hào)最小的節(jié)點(diǎn),監(jiān)控前一個(gè)節(jié)點(diǎn) String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個(gè)節(jié)點(diǎn)不存在了,比如說(shuō)執(zhí)行完畢,或者執(zhí)行節(jié)點(diǎn)掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當(dāng)前進(jìn)程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } } //釋放鎖的原語(yǔ)實(shí)現(xiàn) public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); } }
2、TicketSeller
import org.apache.zookeeper.KeeperException; import java.io.IOException; public class TicketSeller { private void sell(){ System.out.println("售票開(kāi)始"); // 線程隨機(jī)休眠數(shù)毫秒,模擬現(xiàn)實(shí)中的費(fèi)時(shí)操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復(fù)雜邏輯執(zhí)行了一段時(shí)間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結(jié)束"); } public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); } public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } } }
以上就是ZooKeeper入門(mén)教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼的詳細(xì)內(nèi)容,更多關(guān)于ZooKeeper分布式鎖實(shí)現(xiàn)源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot項(xiàng)目將mybatis升級(jí)為mybatis-plus的方法
本文主要介紹了SpringBoot項(xiàng)目將mybatis升級(jí)為mybatis-plus的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01SpringBoot項(xiàng)目如何將Bean注入到普通類(lèi)中
這篇文章主要介紹了SpringBoot項(xiàng)目如何將Bean注入到普通類(lèi)中,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11IDEA配置Gradle及Gradle安裝的實(shí)現(xiàn)步驟
本文主要介紹了IDEA配置Gradle及Gradle安裝的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08Java中?equals?重寫(xiě)時(shí)為什么一定也要重寫(xiě)?hashCode
這篇文章主要介紹了Java中?equals?重寫(xiě)時(shí)為什么一定也要重寫(xiě)?hashCode,equals?方法和?hashCode?方法是?Object?類(lèi)中的兩個(gè)基礎(chǔ)方法,它們共同協(xié)作來(lái)判斷兩個(gè)對(duì)象是否相等,所以之間到底有什么聯(lián)系呢,接下來(lái)和小編一起進(jìn)入文章學(xué)習(xí)該內(nèi)容吧2022-05-05spring?項(xiàng)目實(shí)現(xiàn)限流方法示例
這篇文章主要為大家介紹了spring項(xiàng)目實(shí)現(xiàn)限流的方法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07javaweb頁(yè)面附件、圖片下載及打開(kāi)(實(shí)現(xiàn)方法)
下面小編就為大家?guī)?lái)一篇javaweb頁(yè)面附件、圖片下載及打開(kāi)(實(shí)現(xiàn)方法)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06java 8 lambda表達(dá)式list操作分組、過(guò)濾、求和、最值、排序、去重代碼詳解
java8的lambda表達(dá)式提供了一些方便list操作的方法,主要涵蓋分組、過(guò)濾、求和、最值、排序、去重,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01Spring Boot實(shí)現(xiàn)微信小程序登錄
這篇文章主要為大家詳細(xì)介紹了Spring Boot實(shí)現(xiàn)微信小程序登錄,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-04-04