基于Zookeeper實(shí)現(xiàn)分布式鎖詳解
1、什么是Zookeeper?
Zookeeper是一個(gè)分布式的,開(kāi)源的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Hadoop和hbase的重要組件。
引用官網(wǎng)的圖例:
特征:
- zookeeper的數(shù)據(jù)機(jī)構(gòu)是一種節(jié)點(diǎn)樹(shù)的數(shù)據(jù)結(jié)構(gòu),zNode是基本的單位,znode是一種和unix文件系統(tǒng)相似的節(jié)點(diǎn),可以往這個(gè)節(jié)點(diǎn)存儲(chǔ)或向這個(gè)節(jié)點(diǎn)獲取數(shù)據(jù)
- 通過(guò)客戶(hù)端可以對(duì)znode進(jìn)行數(shù)據(jù)操作,還可以注冊(cè)watcher監(jiān)控znode的改變
2、Zookeeper節(jié)點(diǎn)類(lèi)型
- 持久節(jié)點(diǎn)(Persistent)
- 持久順序節(jié)點(diǎn)(Persistent_Sequential)
- 臨時(shí)節(jié)點(diǎn)(Ephemeral)
- 臨時(shí)順序節(jié)點(diǎn)(Ephemeral_Sequential)
3、Zookeeper環(huán)境搭建
下載zookeeper,官網(wǎng)鏈接,https://zookeeper.apache.org/releases.html#download,去官網(wǎng)找到對(duì)應(yīng)的軟件下載到本地
修改配置文件,${ZOOKEEPER_HOME}\conf,找到zoo_sample.cfg文件,先備份一份,另外一份修改為zoo.cfg
解壓后點(diǎn)擊zkServer.cmd運(yùn)行服務(wù)端:
4、Zookeeper基本使用
在cmd窗口或者直接在idea編輯器里的terminal輸入命令:
zkCli.cmd -server 127.0.0.1:2181
輸入命令help查看幫助信息:
ZooKeeper -server host:port -client-configuration properties-file cmd args addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path [-b batch size] delquota [-n|-b|-N|-B] path get [-s] [-w] path getAcl [-s] path getAllChildrenNumber path getEphemerals path history listquota path ls [-s] [-w] [-R] path printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b|-N|-B val path stat [-w] path sync path version whoami
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
,-s
表示順序節(jié)點(diǎn),-e
表示臨時(shí)節(jié)點(diǎn),若不指定表示持久節(jié)點(diǎn),acl
是來(lái)進(jìn)行權(quán)限控制的
[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0 Created /zk-test0000000000
查看
[zk: 127.0.0.1:2181(CONNECTED) 4] ls / [zk-test0000000000, zookeeper]
設(shè)置修改節(jié)點(diǎn)數(shù)據(jù)
set /zk-test 123
獲取節(jié)點(diǎn)數(shù)據(jù)
get /zk-test
ps,zookeeper命令詳情查看help幫助文檔,也可以去官網(wǎng)看看文檔
ok,然后java寫(xiě)個(gè)例子,進(jìn)行watcher監(jiān)聽(tīng)
package com.example.concurrent.zkSample; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; /** * <pre> * Zookeeper 例子 * </pre> * * <pre> * @author mazq * 修改記錄 * 修改后版本: 修改人: 修改日期: 2021/12/09 16:57 修改內(nèi)容: * </pre> */ public class ZookeeperSample { public static void main(String[] args) { ZkClient client = new ZkClient("localhost:2181"); client.setZkSerializer(new MyZkSerializer()); client.subscribeDataChanges("/zk-test", new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("監(jiān)聽(tīng)到節(jié)點(diǎn)數(shù)據(jù)改變!"); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("監(jiān)聽(tīng)到節(jié)點(diǎn)數(shù)據(jù)被刪除了"); } }); try { Thread.sleep(1000 * 60 * 2); } catch (InterruptedException e) { e.printStackTrace(); } } }
5、Zookeeper應(yīng)用場(chǎng)景
Zookeeper有什么典型的應(yīng)用場(chǎng)景:
- 注冊(cè)中心(Dubbo)
- 命名服務(wù)
- Master選舉
- 集群管理
- 分布式隊(duì)列
- 分布式鎖
6、Zookeeper分布式鎖
Zookeeper適合用來(lái)做分布式鎖,然后具體實(shí)現(xiàn)是利用什么原理?我們知道zookeeper是類(lèi)似于unix的文件系統(tǒng),文件系統(tǒng)我們也知道在一個(gè)文件夾下面,會(huì)有文件名稱(chēng)不能一致的特性的,也就是互斥的特性。同樣zookeeper也有這個(gè)特性,在同個(gè)znode節(jié)點(diǎn)下面,子節(jié)點(diǎn)命名不能重復(fù)。所以利用這個(gè)特性可以來(lái)實(shí)現(xiàn)分布式鎖
業(yè)務(wù)場(chǎng)景:在高并發(fā)的情況下面進(jìn)行訂單場(chǎng)景,這是一個(gè)典型的電商場(chǎng)景
自定義的Zookeeper序列化類(lèi):
package com.example.concurrent.zkSample; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import java.io.UnsupportedEncodingException; public class MyZkSerializer implements ZkSerializer { private String charset = "UTF-8"; @Override public byte[] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes , charset); } catch (UnsupportedEncodingException e) { throw new ZkMarshallingError(); } } }
訂單編號(hào)生成器類(lèi),因?yàn)镾impleDateFormat是線(xiàn)程不安全的,所以還是要加上ThreadLocal
package com.example.concurrent.zkSample; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; public class OrderCodeGenerator { private static final String DATE_FORMAT = "yyyyMMddHHmmss"; private static AtomicInteger ai = new AtomicInteger(0); private static int i = 0; private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(DATE_FORMAT); } }; public static DateFormat getDateFormat() { return (DateFormat) threadLocal.get(); } public static String generatorOrderCode() { try { return getDateFormat().format(new Date(System.currentTimeMillis())) + i++; } finally { threadLocal.remove(); } } }
pom.xml加上zookeeper客戶(hù)端的配置:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
實(shí)現(xiàn)一個(gè)zookeeper分布式鎖,思路是獲取節(jié)點(diǎn),這個(gè)是多線(xiàn)程競(jìng)爭(zhēng)的,能獲取到鎖,也就是創(chuàng)建節(jié)點(diǎn)成功,就執(zhí)行業(yè)務(wù),其它搶不到鎖的線(xiàn)程,阻塞等待,注冊(cè)watcher監(jiān)聽(tīng)鎖是否釋放了,釋放了,取消注冊(cè)watcher,繼續(xù)搶鎖
package com.example.concurrent.zkSample; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j public class ZKDistributeLock implements Lock { private String localPath; private ZkClient zkClient; ZKDistributeLock(String localPath) { super(); this.localPath = localPath; zkClient = new ZkClient("localhost:2181"); zkClient.setZkSerializer(new MyZkSerializer()); } @Override public void lock() { while (!tryLock()) { waitForLock(); } } private void waitForLock() { // 創(chuàng)建countdownLatch協(xié)同 CountDownLatch countDownLatch = new CountDownLatch(1); // 注冊(cè)watcher監(jiān)聽(tīng) IZkDataListener listener = new IZkDataListener() { @Override public void handleDataChange(String path, Object o) throws Exception { //System.out.println("zookeeper data has change!!!"); } @Override public void handleDataDeleted(String s) throws Exception { // System.out.println("zookeeper data has delete!!!"); // 監(jiān)聽(tīng)到鎖釋放了,釋放線(xiàn)程 countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(localPath , listener); // 線(xiàn)程等待 if (zkClient.exists(localPath)) { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注冊(cè) zkClient.unsubscribeDataChanges(localPath , listener); } @Override public void unlock() { zkClient.delete(localPath); } @Override public boolean tryLock() { try { zkClient.createEphemeral(localPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } }
訂單服務(wù)api
package com.example.concurrent.zkSample; public interface OrderService { void createOrder(); }
訂單服務(wù)實(shí)現(xiàn)類(lèi),加上zookeeper分布式鎖
package com.example.concurrent.zkSample; import java.util.concurrent.locks.Lock; public class OrderServiceInvoker implements OrderService{ @Override public void createOrder() { Lock zkLock = new ZKDistributeLock("/zk-test"); //Lock zkLock = new ZKDistributeImproveLock("/zk-test"); String orderCode = null; try { zkLock.lock(); orderCode = OrderCodeGenerator.generatorOrderCode(); } finally { zkLock.unlock(); } System.out.println(String.format("thread name : %s , orderCode : %s" , Thread.currentThread().getName(), orderCode)); } }
因?yàn)榇罱ǚ植际江h(huán)境比較繁瑣,所以這里使用juc里的并發(fā)協(xié)同工具類(lèi),CyclicBarrier模擬多線(xiàn)程并發(fā)的場(chǎng)景,模擬分布式環(huán)境的高并發(fā)場(chǎng)景
package com.example.concurrent.zkSample; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class ConcurrentDistributeTest { public static void main(String[] args) { // 多線(xiàn)程數(shù) int threadSize = 30; // 創(chuàng)建多線(xiàn)程循環(huán)屏障 CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{ System.out.println("準(zhǔn)備完成!"); }) ; // 模擬分布式集群的場(chǎng)景 for (int i = 0 ; i < threadSize ; i ++) { new Thread(()->{ OrderService orderService = new OrderServiceInvoker(); // 所有線(xiàn)程都等待 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } // 模擬并發(fā)請(qǐng)求 orderService.createOrder(); }).start(); } } }
跑多幾次,沒(méi)有發(fā)現(xiàn)訂單號(hào)重復(fù)的情況,分布式鎖還是有點(diǎn)效果的
thread name : Thread-6 , orderCode : 202112100945110
thread name : Thread-1 , orderCode : 202112100945111
thread name : Thread-13 , orderCode : 202112100945112
thread name : Thread-11 , orderCode : 202112100945113
thread name : Thread-14 , orderCode : 202112100945114
thread name : Thread-0 , orderCode : 202112100945115
thread name : Thread-8 , orderCode : 202112100945116
thread name : Thread-17 , orderCode : 202112100945117
thread name : Thread-10 , orderCode : 202112100945118
thread name : Thread-5 , orderCode : 202112100945119
thread name : Thread-2 , orderCode : 2021121009451110
thread name : Thread-16 , orderCode : 2021121009451111
thread name : Thread-19 , orderCode : 2021121009451112
thread name : Thread-4 , orderCode : 2021121009451113
thread name : Thread-18 , orderCode : 2021121009451114
thread name : Thread-3 , orderCode : 2021121009451115
thread name : Thread-9 , orderCode : 2021121009451116
thread name : Thread-12 , orderCode : 2021121009451117
thread name : Thread-15 , orderCode : 2021121009451118
thread name : Thread-7 , orderCode : 2021121009451219
package com.example.concurrent.zkSample; import java.util.concurrent.locks.Lock; public class OrderServiceInvoker implements OrderService{ @Override public void createOrder() { //Lock zkLock = new ZKDistributeLock("/zk-test"); //Lock zkLock = new ZKDistributeImproveLock("/zk-test"); String orderCode = null; try { //zkLock.lock(); orderCode = OrderCodeGenerator.generatorOrderCode(); } finally { //zkLock.unlock(); } System.out.println(String.format("thread name : %s , orderCode : %s" , Thread.currentThread().getName(), orderCode)); } }
跑多幾次,發(fā)現(xiàn)出現(xiàn)訂單號(hào)重復(fù)的情況,所以分布式鎖是可以保證分布式環(huán)境的線(xiàn)程安全的
7、公平式Zookeeper分布式鎖
上面例子是一種非公平鎖的方式,一旦監(jiān)聽(tīng)到鎖釋放了,所有線(xiàn)程都會(huì)去搶鎖,所以容易出現(xiàn)“驚群效應(yīng)”:
- 巨大的服務(wù)器性能損耗
- 網(wǎng)絡(luò)沖擊
- 可能造成宕機(jī)
所以,需要改進(jìn)分布式鎖,改成一種公平鎖的模式
公平鎖:多個(gè)線(xiàn)程按照申請(qǐng)鎖的順序去獲取鎖,線(xiàn)程會(huì)在隊(duì)列里排隊(duì),按照順序去獲取鎖。只有隊(duì)列第1個(gè)線(xiàn)程才能獲取到鎖,獲取到鎖之后,其它線(xiàn)程都會(huì)阻塞等待,等到持有鎖的線(xiàn)程釋放鎖,其它線(xiàn)程才會(huì)被喚醒。
非公平鎖:多個(gè)線(xiàn)程都會(huì)去競(jìng)爭(zhēng)獲取鎖,獲取不到就進(jìn)入隊(duì)列等待,競(jìng)爭(zhēng)得到就直接獲取鎖;然后持有鎖的線(xiàn)程釋放鎖之后,所有等待的線(xiàn)程就都會(huì)去競(jìng)爭(zhēng)鎖。
流程圖:
代碼改進(jìn):
package com.example.concurrent.zkSample;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ZKDistributeImproveLock implements Lock {
private String localPath;
private ZkClient zkClient;
private String currentPath;
private String beforePath;
ZKDistributeImproveLock(String localPath) {
super();
this.localPath = localPath;
zkClient = new ZkClient("localhost:2181");
zkClient.setZkSerializer(new MyZkSerializer());
if (!zkClient.exists(localPath)) {
try {
this.zkClient.createPersistent(localPath);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public void lock() {
while (!tryLock()) {
waitForLock();
}
}
private void waitForLock() {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 注冊(cè)watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 監(jiān)聽(tīng)到鎖釋放,喚醒線(xiàn)程
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(beforePath, listener);
// 線(xiàn)程等待
if (zkClient.exists(beforePath)) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注冊(cè)
zkClient.unsubscribeDataChanges(beforePath , listener);
}
@Override
public void unlock() {
zkClient.delete(this.currentPath);
}
@Override
public boolean tryLock() {
if (this.currentPath == null) {
currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123");
}
// 獲取Znode節(jié)點(diǎn)下面的所有子節(jié)點(diǎn)
List<String> children = zkClient.getChildren(localPath);
// 列表排序
Collections.sort(children);
if (currentPath.equals(localPath + "/" + children.get(0))) { // 當(dāng)前節(jié)點(diǎn)是第1個(gè)節(jié)點(diǎn)
return true;
} else {
//得到當(dāng)前的索引號(hào)
int index = children.indexOf(currentPath.substring(localPath.length() + 1));
//取到前一個(gè)
beforePath = localPath + "/" + children.get(index - 1);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
thread name : Thread-13 , orderCode : 202112100936140
thread name : Thread-3 , orderCode : 202112100936141
thread name : Thread-14 , orderCode : 202112100936142
thread name : Thread-16 , orderCode : 202112100936143
thread name : Thread-1 , orderCode : 202112100936144
thread name : Thread-9 , orderCode : 202112100936145
thread name : Thread-4 , orderCode : 202112100936146
thread name : Thread-5 , orderCode : 202112100936147
thread name : Thread-7 , orderCode : 202112100936148
thread name : Thread-2 , orderCode : 202112100936149
thread name : Thread-17 , orderCode : 2021121009361410
thread name : Thread-15 , orderCode : 2021121009361411
thread name : Thread-0 , orderCode : 2021121009361412
thread name : Thread-10 , orderCode : 2021121009361413
thread name : Thread-18 , orderCode : 2021121009361414
thread name : Thread-19 , orderCode : 2021121009361415
thread name : Thread-8 , orderCode : 2021121009361416
thread name : Thread-12 , orderCode : 2021121009361417
thread name : Thread-11 , orderCode : 2021121009361418
thread name : Thread-6 , orderCode : 2021121009361419
8、zookeeper和Redis鎖對(duì)比?
Redis和Zookeeper都可以用來(lái)實(shí)現(xiàn)分布式鎖,兩者可以進(jìn)行對(duì)比:
基于Redis實(shí)現(xiàn)分布式鎖
- 實(shí)現(xiàn)比較復(fù)雜
- 存在死鎖的可能
- 性能比較好,基于內(nèi)存 ,而且保證的是高可用,redis優(yōu)先保證的是AP(分布式CAP理論)
基于Zookeeper實(shí)現(xiàn)分布式鎖
- 實(shí)現(xiàn)相對(duì)簡(jiǎn)單
- 可靠性高,因?yàn)閦ookeeper保證的是CP(分布式CAP理論)
- 性能相對(duì)較好 并發(fā)1~2萬(wàn)左右,并發(fā)太高,還是redis性能好
本博客代碼可以在GitHub找到下載鏈接
相關(guān)文章
java之StringBuffer常見(jiàn)使用方法解析
這篇文章主要介紹了java之StringBuffer常見(jiàn)使用方法解析,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11Java中過(guò)濾器、監(jiān)聽(tīng)器和攔截器的區(qū)別詳解
這篇文章主要介紹了Java中過(guò)濾器、監(jiān)聽(tīng)器和攔截器的區(qū)別詳解,有些朋友可能不了解過(guò)濾器、監(jiān)聽(tīng)器和攔截器的區(qū)別,本文就來(lái)詳細(xì)講一下,相信看完你會(huì)有所收獲,需要的朋友可以參考下2024-01-01Elasticsearch8.1中的Script使用實(shí)例深入解讀
這篇文章主要為大家介紹了Elasticsearch8.1中的Script使用實(shí)例深入解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10java虛擬機(jī)原理:類(lèi)加載過(guò)程詳解
這篇文章主要介紹了Java中類(lèi)加載過(guò)程全面解析,具有一定參考價(jià)值,需要的朋友可以了解下,希望能夠給你帶來(lái)幫助2021-09-09Java?基于Hutool實(shí)現(xiàn)DES加解密示例詳解
這篇文章主要介紹了Java基于Hutool實(shí)現(xiàn)DES加解密,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08Mybatis Integer類(lèi)型參數(shù)值為0時(shí)得到為空的解決方法
這篇文章主要介紹了Mybatis Integer類(lèi)型參數(shù)值為0時(shí)得到為空的解決方法,有需要的朋友們可以學(xué)習(xí)下。2019-08-08