Java Spring Boot 集成Zookeeper
集成步驟
1.pom.xml文件配置,引入相關(guān)jar包
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。
<!-- 封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>特殊說明: 1.無需引入curator-framework,因為curator-recipes自動關(guān)聯(lián)依賴引入curator-framework。 2.curator會默認(rèn)引入zookeeper的jar報,需要檢查版本與服務(wù)器的版本是否一致,如果不一致則需要排除引入 3.
2. 核心配置類
@Configuration
public class ZookeeperConfig implements Serializable
{
private static final long serialVersionUID = -9025878246972668136L;
private final ZooKeeperProperty zooKeeperProperty;
public ZookeeperConfig(ZooKeeperProperty zooKeeperProperty) {
this.zooKeeperProperty = zooKeeperProperty;
}
@Bean
public CuratorFramework curatorFramework()
{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(),
zooKeeperProperty.getMaxRetries());
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zooKeeperProperty.getServers())
.connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout())
.sessionTimeoutMs(zooKeeperProperty.getSessionTimeout())
.retryPolicy(retryPolicy)
.build();
client.start();
return client;
}
@Bean
@ConditionalOnMissingBean
public ZooKeeperUtils zooKeeperTemplate(CuratorFramework client) {
return new ZooKeeperUtils(client);
}
}
@ConfigurationProperties(prefix="zookeeper")
@Component
public class ZooKeeperProperty implements Serializable
{
private static final long serialVersionUID = 8650758711482699256L;
/**
* zk連接集群,多個用逗號隔開
*/
private String servers;
/**
* 會話超時時間
*/
private int sessionTimeout = 60000;
/**
* 連接超時時間
*/
private int connectionTimeout = 15000;
/**
* 初始重試等待時間(毫秒)
*/
private int baseSleepTime = 1000;
/**
* 重試最大次數(shù)
*/
private int maxRetries = 10;
//省略get、set方法
......
}3.常用API功能
@Component
public class ZooKeeperUtils
{
private static final Logger logger = LoggerFactory
.getLogger(ZooKeeperUtils.class);
/**
* 路徑分隔符
*/
private static final String PATH_SEPARATOR = "/";
/**
* zk連接
*/
private final CuratorFramework client;
public ZooKeeperUtils(CuratorFramework client)
{
this.client = client;
}
/**
* 創(chuàng)建空節(jié)點,默認(rèn)持久節(jié)點
*
* @param path
* 節(jié)點路徑
* @param node
* 節(jié)點名稱
* @return 完整路徑
*/
public String createNode(String path, String node)
{
return createNode(path, node, CreateMode.PERSISTENT);
}
/**
* 創(chuàng)建帶類型的空節(jié)點
*
* @param path
* 節(jié)點路徑
* @param node
* 節(jié)點名稱
* @param createMode
* 類型 CreateMode.PERSISTENT: 創(chuàng)建節(jié)點后,不刪除就永久存在
* CreateMode.PERSISTENT_SEQUENTIAL:節(jié)點path末尾會追加一個10位數(shù)的單調(diào)遞增的序列
* CreateMode.EPHEMERAL:創(chuàng)建后,回話結(jié)束節(jié)點會自動刪除
* CreateMode.EPHEMERAL_SEQUENTIAL:節(jié)點path末尾會追加一個10位數(shù)的單調(diào)遞增的序列
* @return 路徑
*/
public String createNode(String path, String node, CreateMode createMode)
{
path = buildPath(path, node);
logger.info("create node for path: {} with createMode: {}", path,
createMode.name());
try
{
client.create().creatingParentsIfNeeded().withMode(createMode)
.forPath(path);
logger.info("create node :{} sucessfully", node);
return path;
}
catch (Exception e)
{
logger.error(
"create node for path: {} with createMode: {} failed!",
path, createMode.name(), e);
return null;
}
}
/**
* 創(chuàng)建節(jié)點,默認(rèn)持久節(jié)點
*
* @param path
* 節(jié)點路徑
* @param node
* 節(jié)點名稱
* @param value
* 節(jié)點值
* @return 完整路徑
*/
public String createNode(String path, String node, String value)
{
return createNode(path, node, value, CreateMode.PERSISTENT);
}
/**
* 創(chuàng)建節(jié)點,默認(rèn)持久節(jié)點
*
* @param path
* 節(jié)點路徑
* @param node
* 節(jié)點名稱
* @param value
* 節(jié)點值
* @param createMode
* 節(jié)點類型
* @return 完整路徑
*/
public String createNode(String path, String node, String value,
CreateMode createMode)
{
if (Objects.isNull(value))
{
logger.error("ZooKeeper節(jié)點值不能為空!");
}
path = buildPath(path, node);
logger.info("create node for path: {}, value: {}, with createMode: {}",
path, value, createMode.name());
try
{
client.create().creatingParentsIfNeeded().withMode(createMode)
.forPath(path, value.getBytes());
return path;
}
catch (Exception e)
{
logger.error(
"create node for path: {}, value: {}, with createMode: {} failed!",
path, value, createMode.name(), e);
}
return null;
}
/**
* 獲取節(jié)點數(shù)據(jù)
*
* @param path
* 路徑
* @param node
* 節(jié)點名稱
* @return 完整路徑
*/
public String get(String path, String node)
{
path = buildPath(path, node);
try
{
byte[] bytes = client.getData().forPath(path);
if (bytes.length > 0)
{
return new String(bytes);
}
}
catch (Exception e)
{
logger.error("get value for path: {}, node: {} failed!", path,
node, e);
}
return null;
}
/**
* 更新節(jié)點數(shù)據(jù)
*
* @param path
* 節(jié)點路徑
* @param node
* 節(jié)點名稱
* @param value
* 更新值
* @return 完整路徑
*/
public String update(String path, String node, String value)
{
if (Objects.isNull(value))
{
logger.error("ZooKeeper節(jié)點值不能為空!");
}
path = buildPath(path, node);
logger.info("update path: {} to value: {}", path, value);
try
{
client.setData().forPath(path, value.getBytes());
return path;
}
catch (Exception e)
{
logger.error("update path: {} to value: {} failed!", path, value);
}
return null;
}
/**
* 刪除節(jié)點,并且遞歸刪除子節(jié)點
*
* @param path
* 路徑
* @param node
* 節(jié)點名稱
* @return 路徑
*/
public boolean delete(String path, String node)
{
path = buildPath(path, node);
logger.info("delete node for path: {}", path);
try
{
client.delete().deletingChildrenIfNeeded().forPath(path);
return true;
}
catch (Exception e)
{
logger.error("delete node for path: {} failed!", path);
}
return false;
}
/**
* 獲取子節(jié)點
*
* @param path
* 節(jié)點路徑
* @return
*/
public List<String> getChildren(String path)
{
if (StringUtils.isEmpty(path))
{
return null;
}
if (!path.startsWith(PATH_SEPARATOR))
{
path = PATH_SEPARATOR + path;
}
try
{
return client.getChildren().forPath(path);
}
catch (Exception e)
{
logger.error("get children path:{} error", path, e);
}
return null;
}
/**
* 判斷節(jié)點是否存在
*
* @param path
* 路徑
* @param node
* 節(jié)點名稱
* @return 結(jié)果
*/
public boolean exists(String path, String node)
{
try
{
List<String> list = getChildren(path);
return !CollectionUtils.isEmpty(list) && list.contains(node);
}
catch (Exception e)
{
return false;
}
}
/**
* 申請鎖,指定請求等待時間
*
* @param path
* 加鎖zk節(jié)點
* @param time
* 時間
* @param unit
* 時間單位
* @param runnable
* 執(zhí)行方法
*/
public void lock(String path, long time, TimeUnit unit, Runnable runnable)
{
try
{
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit))
{
try
{
runnable.run();
}
finally
{
lock.release();
}
}
else
{
logger.error("獲取鎖超時:{}!", path);
}
}
catch (Exception e)
{
logger.error("獲取鎖失敗: {}!", path);
}
}
/**
* 申請鎖,指定請求等待時間
*
* @param path
* 加鎖zk節(jié)點
* @param time
* 時間
* @param unit
* 時間單位
* @param callable
* 執(zhí)行方法
* @return .
*/
public <T> T lock(String path, long time, TimeUnit unit,
Callable<T> callable)
{
try
{
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit))
{
try
{
return callable.call();
}
finally
{
lock.release();
}
}
else
{
logger.error("獲取鎖超時:{}!", path);
}
}
catch (Exception e)
{
logger.error("獲取鎖失敗: {}!", path);
}
return null;
}
/* *//**
* 對一個節(jié)點進(jìn)行監(jiān)聽,監(jiān)聽事件包括指定的路徑節(jié)點的增、刪、改的操作
*
* @param path
* 節(jié)點路徑
* @param listener
* 回調(diào)方法
* @throws Exception
*/
public void watchNode(String path,boolean dataIsCompressed,final ZooKeeperCallback zooKeeperCallback)throws Exception
{
try
{
final NodeCache nodeCache = new NodeCache(client, path,dataIsCompressed);
nodeCache.getListenable().addListener(new NodeCacheListener()
{
public void nodeChanged() throws Exception
{
ChildData childData = nodeCache.getCurrentData();
logger.info("ZNode節(jié)點狀態(tài)改變, path={}", childData.getPath());
logger.info("ZNode節(jié)點狀態(tài)改變, data={}", childData.getData());
logger.info("ZNode節(jié)點狀態(tài)改變, stat={}", childData.getStat());
//處理業(yè)務(wù)邏輯
zooKeeperCallback.call();
}
});
nodeCache.start();
}
catch (Exception e)
{
logger.error("創(chuàng)建NodeCache監(jiān)聽失敗, path={}",path);
}
}
/**
* 對指定的路徑節(jié)點的一級子目錄進(jìn)行監(jiān)聽,不對該節(jié)點的操作進(jìn)行監(jiān)聽,對其子目錄的節(jié)點進(jìn)行增、刪、改的操作監(jiān)聽
*
* @param path
* 節(jié)點路徑
* @param listener
* 回調(diào)方法
*/
public void watchChildren(String path, PathChildrenCacheListener listener)
{
try
{
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,
path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
pathChildrenCache.getListenable().addListener(listener);
}
catch (Exception e)
{
logger.error("watch children failed for path: {}", path, e);
}
}
/**
* 將指定的路徑節(jié)點作為根節(jié)點(祖先節(jié)點),對其所有的子節(jié)點操作進(jìn)行監(jiān)聽,呈現(xiàn)樹形目錄的監(jiān)聽,可以設(shè)置監(jiān)聽深度,最大監(jiān)聽深度為2147483647(
* int類型的最大值)
*
* @param path
* 節(jié)點路徑
* @param maxDepth
* 回調(diào)方法
* @param listener
* 監(jiān)聽
*/
public void watchTree(String path, int maxDepth, TreeCacheListener listener)
{
try
{
TreeCache treeCache = TreeCache.newBuilder(client, path)
.setMaxDepth(maxDepth).build();
treeCache.start();
treeCache.getListenable().addListener(listener);
}
catch (Exception e)
{
logger.error("watch tree failed for path: {}", path, e);
}
}
public String buildPath(String path, String node)
{
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(node))
{
logger.error("ZooKeeper路徑或者節(jié)點名稱不能為空!");
}
if (!path.startsWith(PATH_SEPARATOR))
{
path = PATH_SEPARATOR + path;
}
if (PATH_SEPARATOR.equals(path))
{
return path + node;
}
else
{
return path + PATH_SEPARATOR + node;
}
}
}4.基本使用
@Autowired
private ZooKeeperUtils zooKeeperUtil;
@RequestMapping("/addNode")
public String addNode()
{
String path= zooKeeperUtil.createNode("/zookeeper", "node1");
return path;
}特殊說明:關(guān)于zookeeper的分布式鎖,后續(xù)講解常用分布式鎖的時候,會詳細(xì)說明。
常見錯誤和解決辦法
問題1:調(diào)用api創(chuàng)建zookeeper節(jié)點時,報KeeperErrorCode = Unimplemented for /test錯誤。
原因:服務(wù)器安裝zookeeper的版本與程序中的zookeeper版本不一致。
解決方案: 登錄服務(wù)器,查看zookeeper安裝版本,執(zhí)行如下命令:
echo stat|nc 127.0.0.1 2181

當(dāng)前引入的zookeeper版本為3.4.13,而zookeeper的版本與curator對應(yīng)關(guān)系如下:
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x Curator 4.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc. Curator 5.x.x compatible only with ZooKeeper 3.6.x+
問題2:啟動項目的日志中會有Will not attempt to authenticate using SASL錯誤
起初我認(rèn)為是zookeeper需要進(jìn)行SASL認(rèn)證,但是通過查閱相關(guān)資料后,才知道3.4之前版本,zookeeper默認(rèn)會采用SASL認(rèn)證,3.4以后的版本沒有此類問題。
到此這篇關(guān)于Java Spring Boot 集成Zookeeper的文章就介紹到這了,更多相關(guān)Spring Boot 集成Zookeeper內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 淺談Java(SpringBoot)基于zookeeper的分布式鎖實現(xiàn)
- 使用dubbo+zookeeper+spring boot構(gòu)建服務(wù)的方法詳解
- SpringBoot中dubbo+zookeeper實現(xiàn)分布式開發(fā)的應(yīng)用詳解
- SpringBoot集成Curator實現(xiàn)Zookeeper基本操作的代碼示例
- SpringBoot系列教程之dubbo和Zookeeper集成方法
- SpringBoot整合Dubbo+Zookeeper實現(xiàn)RPC調(diào)用
- springboot應(yīng)用訪問zookeeper的流程
- SpringBoot整合Zookeeper詳細(xì)教程
- SpringBoot讀取ZooKeeper(ZK)屬性的方法實現(xiàn)
相關(guān)文章
Java使用FutureTask實現(xiàn)預(yù)加載的示例詳解
基于FutureTask的特性,通??梢允褂肍utureTask做一些預(yù)加載工作,比如一些時間較長的計算等,本文就來和大家講講具體實現(xiàn)方法吧,感興趣的可以了解一下2023-06-06
Spring boot2基于Mybatis實現(xiàn)多表關(guān)聯(lián)查詢
這篇文章主要介紹了Spring boot2基于Mybatis實現(xiàn)多表關(guān)聯(lián)查詢,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04
java開發(fā)使用StringUtils.split避坑詳解
這篇文章主要為大家介紹了java開發(fā)使用StringUtils.split避坑詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11

