elasticsearch集群cluster?discovery可配式模塊示例分析
前言
elasticsearch cluster實(shí)現(xiàn)了自己發(fā)現(xiàn)機(jī)制zen。Discovery功能主要包括以下幾部分內(nèi)容:master選舉,master錯(cuò)誤探測(cè),集群中其它節(jié)點(diǎn)探測(cè),單播多播ping。本篇會(huì)首先概述以下Discovery這一部分的功能,然后介紹節(jié)點(diǎn)檢測(cè)。其它內(nèi)容會(huì)在接下來(lái)介紹。
Discovery模塊的概述
discovery是可配式模塊,官方支持亞馬遜的Azure discovery,Google Compute Engine,EC2 Discovery三種發(fā)現(xiàn)機(jī)制,根據(jù)插件規(guī)則完全可以自己實(shí)現(xiàn)其它的發(fā)現(xiàn)機(jī)制。整個(gè)模塊通過(guò)實(shí)現(xiàn)guice的DiscoveryModule對(duì)外提供模塊的注冊(cè)和啟動(dòng), 默認(rèn)使用zen discovery。發(fā)現(xiàn)模塊對(duì)外接口為DiscoveryService,它的方法如下所示:
它本質(zhì)上是discovery的一個(gè)代理,所有的功能最終都是由所綁定的discovery所實(shí)現(xiàn)的。節(jié)點(diǎn)啟動(dòng)時(shí)通過(guò)DiscoveryModule獲取DiscoveryService,然后啟動(dòng)DiscoveryService,DiscoveryService啟動(dòng)綁定的Discovery,整個(gè)功能模塊就完成了加載和啟動(dòng)。這也是elasticsearch所有模塊的實(shí)現(xiàn)方式,通過(guò)module對(duì)外提供綁定和獲取,通過(guò)service接口對(duì)外提供模塊的功能,在后面的分析中會(huì)經(jīng)常遇到。
cluster節(jié)點(diǎn)探測(cè)
接下來(lái)分析cluster的一個(gè)重要功能就是節(jié)點(diǎn)探測(cè)。cluster中不能沒(méi)有master節(jié)點(diǎn),因此集群中所有節(jié)點(diǎn)都要周期探測(cè)master節(jié)點(diǎn),一旦無(wú)法檢測(cè)到,將會(huì)進(jìn)行master選舉。同時(shí)作為master,對(duì)于節(jié)點(diǎn)變動(dòng)也要時(shí)刻關(guān)注,因此它需要周期性探測(cè)集群中所有節(jié)點(diǎn),確保及時(shí)剔除已經(jīng)宕機(jī)的節(jié)點(diǎn)。這種相互間的心跳檢測(cè)就是cluster的faultdetection。下圖是faultdetection的繼承關(guān)系:
有兩種實(shí)現(xiàn)方式,分別是master探測(cè)集群中其它節(jié)點(diǎn)和其它節(jié)點(diǎn)對(duì)master節(jié)點(diǎn)的探測(cè)。
FaultDetection只要一個(gè)抽象方法handleTransportDisconnect,該方法在內(nèi)部類(lèi)FDConnectionListener中被調(diào)用。在elasticsearch中大量使用了listener的異步方式,異步可以極大提升系統(tǒng)性能。它的代碼如下所示:
private class FDConnectionListener implements TransportConnectionListener { @Override public void onNodeConnected(DiscoveryNode node) { } @Override public void onNodeDisconnected(DiscoveryNode node) { handleTransportDisconnect(node); } }
faultdetection啟動(dòng)時(shí)會(huì)注冊(cè)相應(yīng)的FDConnetionListener,當(dāng)探測(cè)到節(jié)點(diǎn)丟失,會(huì)通過(guò)onNodeDisconnected方法回調(diào)對(duì)于的handleTransportDisconnect進(jìn)行處理。
MasterFaultDetection的啟動(dòng)代碼
private?void?innerStart(final?DiscoveryNode masterNode) { this.masterNode = masterNode; this.retryCount = 0; this.notifiedMasterFailure.set(false); // 嘗試連接master節(jié)點(diǎn) try { transportService.connectToNode(masterNode); } catch (final Exception e) { // 連接失敗通知masterNode失敗 notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]"); return; } //關(guān)閉之前的masterping,重啟新的masterping if (masterPinger != null) { masterPinger.stop(); } this.masterPinger = new MasterPinger(); // 周期之后啟動(dòng)masterPing,這里并沒(méi)有周期啟動(dòng)masterPing,只是設(shè)定了延遲時(shí)間。 threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger); }
代碼有有詳細(xì)注釋?zhuān)筒辉龠^(guò)多解釋。
master連接失敗的邏輯
代碼如下:
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) { if (notifiedMasterFailure.compareAndSet(false, true)) { threadPool.generic().execute(new Runnable() { @Override public void run() { //通知所有l(wèi)istener master丟失 for (Listener listener : listeners) { listener.onMasterFailure(masterNode, reason); } } }); stop("master failure, " + reason); } }
在ZenDiscovery中實(shí)現(xiàn)了listener.onMasterFailure接口。會(huì)進(jìn)行master丟失的相關(guān)處理,在后面再分析。
MasterPing的關(guān)鍵代碼
private class MasterPinger implements Runnable { private volatile boolean running = true; public void stop() { this.running = false; } @Override public void run() { if (!running) { // return and don't spawn... return; } final DiscoveryNode masterToPing = masterNode; final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() { @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); } @Override public void handleResponse(MasterPingResponseResponse response) { if (!running) { return; } // reset the counter, we got a good result MasterFaultDetection.this.retryCount = 0; // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { // 啟動(dòng)新的ping周期 threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); } } @Override public void handleException(TransportException exp) { if (!running) { return; } synchronized (masterNodeMutex) { // check if the master node did not get switched on us... if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { handleTransportDisconnect(masterToPing); return; } else if (exp.getCause() instanceof NoLongerMasterException) { logger.debug("[master] pinging a master {} that is no longer a master", masterNode); notifyMasterFailure(masterToPing, "no longer master"); return; } else if (exp.getCause() instanceof NotMasterException) { logger.debug("[master] pinging a master {} that is not the master", masterNode); notifyMasterFailure(masterToPing, "not master"); return; } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) { logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode); notifyMasterFailure(masterToPing, "do not exists on master, act as master failure"); return; } int retryCount = ++MasterFaultDetection.this.retryCount; logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount); if (retryCount >= pingRetryCount) { logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout); // not good, failure notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this); } } } } ); } }
MasterPing是一個(gè)線(xiàn)程,在innerStart的方法中沒(méi)有設(shè)定周期啟動(dòng)masterping,但是masterping需要周期進(jìn)行,這個(gè)秘密就在run 方法中,如果ping成功就會(huì)重啟一個(gè)新的ping。這樣既保證了ping線(xiàn)程的唯一性同時(shí)也保證了ping的順序和間隔。
ping的方式跟之前一樣是也是通過(guò)transport發(fā)送一個(gè)masterpingrequest,進(jìn)行一個(gè)連接。節(jié)點(diǎn)收到該請(qǐng)求后,如果已不再是master會(huì)拋出NotMasterException,狀態(tài)更新出差會(huì)拋出其它異常,異常會(huì)通過(guò)。否則會(huì)正常響應(yīng)notifyMasterFailure方法處理跟啟動(dòng)邏輯一樣。
對(duì)于網(wǎng)絡(luò)問(wèn)題導(dǎo)致的無(wú)響應(yīng)情況,會(huì)調(diào)用handleTransportDisconnect(masterToPing)方法處理。masterfaultDetection對(duì)該方法的實(shí)現(xiàn)如下:
protected void handleTransportDisconnect(DiscoveryNode node) { //這里需要同步 synchronized (masterNodeMutex) { //master 已經(jīng)換成其它節(jié)點(diǎn),就沒(méi)必要再連接 if (!node.equals(this.masterNode)) { return; } if (connectOnNetworkDisconnect) { try { //嘗試再次連接 transportService.connectToNode(node); // if all is well, make sure we restart the pinger if (masterPinger != null) { masterPinger.stop(); } //連接成功啟動(dòng)新的masterping this.masterPinger = new MasterPinger(); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger); } catch (Exception e) { //連接出現(xiàn)異常,啟動(dòng)master節(jié)點(diǎn)丟失通知 logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); notifyMasterFailure(masterNode, "transport disconnected (with verified connect)"); } } else { //不需要重連,通知master丟失。 logger.trace("[master] [{}] transport disconnected", node); notifyMasterFailure(node, "transport disconnected"); } } }
這就是masterfaultDetection的整個(gè)流程:?jiǎn)?dòng)中如果master丟失則通知節(jié)點(diǎn)丟失,否則在一定延遲(3s)后啟動(dòng)masterping,masterping線(xiàn)程嘗試連接master節(jié)點(diǎn),如果master節(jié)點(diǎn)網(wǎng)絡(luò)失聯(lián),嘗試再次連接。master節(jié)點(diǎn)收到masterpingrequest后首先看一下自己還是不是master,如果不是則拋出異常,否則正常回應(yīng)。節(jié)點(diǎn)如果收到響應(yīng)是異常則啟動(dòng)master丟失通知,否則此次ping結(jié)束。在一定延遲后啟動(dòng)新的masterping線(xiàn)程。
NodeFaultDetection的邏輯跟實(shí)現(xiàn)上跟MasterFualtDetetion相似,區(qū)別主要在于ping異常處理上。當(dāng)某個(gè)節(jié)點(diǎn)出現(xiàn)異?;蛘邲](méi)有響應(yīng)時(shí),會(huì)啟動(dòng)節(jié)點(diǎn)丟失機(jī)制,只是受到通知后的處理邏輯不通。就不再詳細(xì)分析,有興趣可以參考具體代碼,希望大家以后多多支持腳本之家!
- Nacos?Discovery服務(wù)治理解決方案
- 關(guān)于IDEA中spring-cloud-starter-alibaba-nacos-discovery 無(wú)法引入問(wèn)題
- 關(guān)于Nacos和Eureka的區(qū)別及說(shuō)明
- 關(guān)于Nacos配置管理的統(tǒng)一配置管理、自動(dòng)刷新詳解
- elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析
- elasticsearch的zenDiscovery和master選舉機(jī)制原理分析
- nacos-discovery包名層級(jí)問(wèn)題解決
相關(guān)文章
Java數(shù)據(jù)結(jié)構(gòu)中的HashMap和HashSet詳解
HashMap和HashSet都是存儲(chǔ)在哈希桶之中,通過(guò)本文我們可以先了解一些哈希桶是什么,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2023-10-10解析web.xml中在Servlet中獲取context-param和init-param內(nèi)的參數(shù)
本篇文章是對(duì)web.xml中在Servlet中獲取context-param和init-param內(nèi)的參數(shù)進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-07-07Springboot Vue可配置調(diào)度任務(wù)實(shí)現(xiàn)示例詳解
這篇文章主要為大家介紹了Springboot Vue可配置調(diào)度任務(wù)實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01java分類(lèi)樹(shù),我從2s優(yōu)化到0.1s
這篇文章主要介紹了java分類(lèi)樹(shù),我從2s優(yōu)化到0.1s的相關(guān)資料,需要的朋友可以參考下2023-05-05詳解eclipse將項(xiàng)目打包成jar文件的兩種方法及問(wèn)題解決方法
本文給大家介紹了eclipse中將項(xiàng)目打包成jar文件的兩種方法及其遇到問(wèn)題解決方法,本文圖文并茂給大家介紹的非常詳細(xì),需要的朋友可以參考下2017-12-12淺談MyBatis-Plus學(xué)習(xí)之Oracle的主鍵Sequence設(shè)置的方法
這篇文章主要介紹了淺談MyBatis-Plus學(xué)習(xí)之Oracle的主鍵Sequence設(shè)置的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08spring中12種@Transactional的失效場(chǎng)景(小結(jié))
日常我們進(jìn)行業(yè)務(wù)開(kāi)發(fā)時(shí),基本上使用的都是聲明式事務(wù),即為使用@Transactional注解的方式,本文主要介紹了spring中12種@Transactional的失效場(chǎng)景,感興趣的小伙伴們可以參考一下2022-01-01