Zookeeper實現(xiàn)分布式鎖代碼實例
Zookeeper分布式鎖原理
臨時順序節(jié)點
該類型的節(jié)點創(chuàng)建完成之后,若客戶端與服務端斷開連接之前沒有執(zhí)行delete操作,Zookeeper會自動刪除該類型的節(jié)點,同時該類型的節(jié)點創(chuàng)建之后,Zookeeper會為該類型的節(jié)點在節(jié)點名稱的基礎上增加一個單調遞增的編號;
Zookeeper 分布式鎖應用了其 臨時順序節(jié)點 的特性。實現(xiàn)步驟如下:
獲取鎖
首先在Zookeeper中創(chuàng)建一個持久節(jié)點ParentLock,當?shù)谝粋€客戶端要獲取鎖時,在ParentLock節(jié)點下創(chuàng)建一個臨時順序節(jié)點Lock1;
接下來客戶端1會獲取ParentLock下的所有臨時順序子節(jié)點并進行排序,然后與自身創(chuàng)建的Lock1比較,判斷Lock1是不是最小的(最靠前的),如果是最靠前的,則獲取鎖成功;
此時,假設又有一個客戶端2前來獲取鎖,則在ParentLock下創(chuàng)建一個臨時順序節(jié)點Lock2;
接下來客戶端2會獲取ParentLock下的所有臨時順序子節(jié)點并進行排序,然后與自身創(chuàng)建的Lock2比較,判斷Lock2是不是最小的(最靠前的),結果發(fā)現(xiàn)Lock2不是最小的;
于是,Lock2向排序比它靠前的第一個節(jié)點Lock1注冊一個Watcher,用于監(jiān)聽Lock1是否存在,此時意味著客戶端2搶鎖失敗,客戶端2進入等待狀態(tài);
此時,假設現(xiàn)在又有一個客戶端3前來獲取鎖,則在ParentLock下創(chuàng)建一個臨時順序節(jié)點Lock3;
接下來客戶端3將獲取ParentLock下的所有臨時順序節(jié)點并排序,與自身創(chuàng)建的節(jié)點Lock3比較,判斷Lock3是不是最小的(最靠前的),結果發(fā)現(xiàn)Lock3不是最小的;
于是,Lock3向比它靠前的第一個節(jié)點Lock2注冊一個Watcher,用于監(jiān)聽Lock2是否存在,此時意味著Lock3也搶鎖失敗,進入阻塞狀態(tài); 這樣的話,客戶端1獲取到了鎖,客戶端2監(jiān)聽了Lock1、客戶端3監(jiān)聽了Lock2;
客戶端崩潰釋放鎖(避免死鎖)
假設由于網絡原因或者其他物理原因,導致客戶端1與Zookeeper失去連接,根據(jù)臨時節(jié)點的特性,Zookeeper會自動刪除相關聯(lián)的節(jié)點Lock1。 Lock1刪除之后,因為客戶端2在Lock1上注冊了Watcher,因此Lock1刪除之后,客戶端2會立即收到通知,這時客戶端2會再次獲取ParentLock下的所有臨時順序子節(jié)點并進行排序,然后與自身創(chuàng)建的Lock2比較,判斷Lock2是不是最小的(最靠前的),結果發(fā)現(xiàn)Lock2是最小的,因此客戶端2獲取鎖成功;
客戶端2崩潰同理;
顯示釋放(客戶端任務執(zhí)行完畢,主動釋放鎖)
當客戶端2執(zhí)行完任務之后,調用delete方法刪除Lock2節(jié)點,因為客戶端3在Lock2上注冊了Watcher,因此Lock2刪除之后,客戶端3會立即收到通知,這時客戶端3會再次獲取ParentLock下的所有臨時順序子節(jié)點并進行排序,然后與自身創(chuàng)建的Lock3比較,判斷Lock3是不是最小的(最靠前的),結果發(fā)現(xiàn)Lock3是最小的,因此客戶端3獲取鎖成功;
zookeeper分布式鎖實現(xiàn)
配置文件pom.xml
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipesss</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency>
配置文件application.properties
##ZooKeeper 集成 Curator zk.url = 127.0.0.1:2181
conf
@Configuration @Slf4j public class ZooKeeperConf { @Value("${zk.url}") private String zkUrl; @Bean public CuratorFramework getCuratorFramework() { // 用于重連策略,1000毫秒是初始化的間隔時間,3代表嘗試重連次數(shù) RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy); //必須調用start開始連接ZooKeeper client.start(); // /** // * 使用Curator,可以通過LeaderSelector來實現(xiàn)領導選??; // * 領導選?。哼x出一個領導節(jié)點來負責其他節(jié)點;如果領導節(jié)點不可用,則在剩下的機器里再選出一個領導節(jié)點 // */ // // 構造一個監(jiān)聽器 // LeaderSelectorListenerAdapter listener = new LeaderSelectorListenerAdapter() { // @Override // public void takeLeadership(CuratorFramework curatorFramework) throws Exception { // log.info("get leadership"); // // 領導節(jié)點,方法結束后退出領導。zk會再次重新選擇領導 // // } // }; // LeaderSelector selector = new LeaderSelector(client, "/schedule", listener); // selector.autoRequeue(); // selector.start(); return client; } }
controller
@RestController @RequestMapping("/zookeeper") public class ZooKeeperController { @Autowired private CuratorFramework zkClient; @Autowired private ZooKeeperImpl zooKeeper; /** * zookeeper 獲取節(jié)點下的數(shù)據(jù) * <p> * post請求: http://localhost:8082/zookeeper/makeOrder?path=/task * * @param path * @return */ @PostMapping("/getData") public String getData(@RequestParam String path) { byte[] bytes = null; try { bytes = zkClient.getData().forPath(path); } catch (Exception e) { e.printStackTrace(); } String str = new String(bytes); return str; } @PostMapping("/create") public String create(@RequestParam String path) { try { zkClient.create().forPath(path); } catch (Exception e) { e.printStackTrace(); } return "success"; } @PostMapping("/delete") public String delete(@RequestParam String path) { try { zkClient.delete().forPath(path); } catch (Exception e) { e.printStackTrace(); } return "success"; } @PostMapping("/setData") public String setData(@RequestParam(value = "path") String path, @RequestParam(value = "data") String data) { try { zkClient.setData().forPath(path, data.getBytes()); } catch (Exception e) { e.printStackTrace(); } return "success"; } @PostMapping("/check") public String check(@RequestParam(value = "path") String path) { Stat stat = null; try { stat = zkClient.checkExists().forPath(path); } catch (Exception e) { e.printStackTrace(); } return "stat" + stat; } @PostMapping("/children") public String children(@RequestParam(value = "path") String path) { List<String> children = null; try { children = zkClient.getChildren().forPath(path); } catch (Exception e) { e.printStackTrace(); } return "children" + children; } @PostMapping("/watch") public String watch(@RequestParam(value = "path") String path) { Stat stat = null; try { stat = zkClient.checkExists().watched().forPath(path); } catch (Exception e) { e.printStackTrace(); } return "watch " + stat; } /** * zookeeper分布式鎖 * * @param product * @return */ @PostMapping("/makeOrder") public String makeOrder(@RequestParam(value = "product") String product) { zooKeeper.makeOrder(product); return "success"; } }
service
@Service @Slf4j public class ZooKeeperImpl { private static final String lockPath = "/lock/order"; @Autowired private CuratorFramework zkClient; public void makeOrder(String product) { log.info("try do job for " + product); String path = lockPath + "/" + product; try { // InterProcessMutex 構建一個分布式鎖 InterProcessMutex lock = new InterProcessMutex(zkClient, path); try { if (lock.acquire(5, TimeUnit.HOURS)) { // 模擬業(yè)務處理耗時5秒 Thread.sleep(5*1000); log.info("do job " + product + "done"); } } finally { // 釋放該鎖 lock.release(); } } catch (Exception e) { // zk異常 e.printStackTrace(); } } }
到此這篇關于Zookeeper實現(xiàn)分布式鎖代碼實例的文章就介紹到這了,更多相關Zookeeper分布式鎖內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring配置多個數(shù)據(jù)源并實現(xiàn)數(shù)據(jù)源的動態(tài)切換功能
這篇文章主要介紹了Spring配置多個數(shù)據(jù)源并實現(xiàn)數(shù)據(jù)源的動態(tài)切換功能,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01SpringBoot整合mybatis使用Druid做連接池的方式
這篇文章主要介紹了SpringBoot整合mybatis使用Druid做連接池的方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08SpringBoot實現(xiàn)Tomcat集群的會話管理功能
在使用 Tomcat 集群時,由于每個 Tomcat 實例的 Session 存儲是獨立的,導致無法實現(xiàn) Session 的共享,這可能影響到用戶跨節(jié)點的訪問,為了實現(xiàn)跨 Tomcat 實例共享 Session,可以使用 Spring Session 配合 Redis 進行集中式會話管理,需要的朋友可以參考下2024-12-12Mybatis自定義SQL的關系映射、分頁、排序功能的實現(xiàn)
這篇文章主要介紹了Mybatis自定義SQL的關系映射、分頁、排序功能的實現(xiàn),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01