elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析
zenDiscovery實(shí)現(xiàn)機(jī)制
ping是集群發(fā)現(xiàn)的基本手段,通過在網(wǎng)絡(luò)上廣播或者指定ping某些節(jié)點(diǎn)獲取集群信息,從而可以找到集群的master加入集群。zenDiscovery實(shí)現(xiàn)了兩種ping機(jī)制:廣播與單播。本篇將詳細(xì)分析一些這MulticastZenPing機(jī)制的實(shí)現(xiàn)為后面的集群發(fā)現(xiàn)和master選舉做好鋪墊。
廣播的過程
首先看一下廣播(MulticastZenPing),廣播的原理很簡(jiǎn)單,節(jié)點(diǎn)啟動(dòng)后向網(wǎng)絡(luò)發(fā)送廣播信息,任何收到的節(jié)點(diǎn)只要集群名字相同都應(yīng)該對(duì)此廣播信息作出回應(yīng)。這樣該節(jié)點(diǎn)就獲取了集群的相關(guān)信息。它定義了一個(gè)action:"internal:discovery/zen/multicast"和廣播的信息頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通信的基礎(chǔ),但是廣播卻沒有使它。它使用了java的MulticastSocket。這里簡(jiǎn)單的介紹一下MulticastSocket的使用。它是一個(gè)UDP 機(jī)制的socket,用來進(jìn)行多個(gè)數(shù)據(jù)包的廣播。它可以幫到一個(gè)ip形成一個(gè)group,任何MulticastSocket都可以join進(jìn)來,組內(nèi)的socket發(fā)送的信息會(huì)被訂閱了改組的所有機(jī)器接收到。elasticsearch對(duì)其進(jìn)行了封裝形成了MulticastChannel,有興趣可以參考相關(guān)源碼。
首先看一下MulticastZenPing的幾個(gè)輔助內(nèi)部類:
它總共定義了4個(gè)內(nèi)部類,這些內(nèi)部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應(yīng)都用它來存儲(chǔ)。MulticastPingResponseRequestHandler它是response處理類,類似于之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個(gè)messageReceived的方法,當(dāng)收到請(qǐng)求時(shí)直接返回一個(gè)response。
MulticastPingResponse就不用細(xì)說了,它就是一個(gè)響應(yīng)類。最后要著重說一下Receiver類,因?yàn)閺V播并不是使用NettyTransport,因此對(duì)于消息處理邏輯都在Receiver中。在初始化MulticastZenPing時(shí)會(huì)將receiver注冊(cè)進(jìn)去。
protected void doStart() throws ElasticsearchException { try { .... multicastChannel = MulticastChannel.getChannel(nodeName(), shared, new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)), new Receiver());//將receiver注冊(cè)到channel中 } catch (Throwable t) { .... } }
Receiver類基礎(chǔ)了Listener,實(shí)現(xiàn)了3個(gè)方法,消息經(jīng)過onMessage方法區(qū)分,如果是內(nèi)部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區(qū)分方法很簡(jiǎn)單,就是讀取信息都看它是否符合所定義的INTERNAL_HEADER 信息頭。
nodeping處理代碼
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) { .... final DiscoveryNodes discoveryNodes = contextProvider.nodes(); final DiscoveryNode requestingNode = requestingNodeX; if (requestingNode.id().equals(discoveryNodes.localNodeId())) { // 自身發(fā)出的ping,忽略 return; } //只接受本集群ping if (!requestClusterName.equals(clusterName)) { ...return; } // 兩個(gè)client間不需要ping if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return; } //新建一個(gè)response final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); multicastPingResponse.id = id; multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce()); //無法連接的情況 if (!transportService.nodeConnected(requestingNode)) { // do the connect and send on a thread pool threadPool.generic().execute(new Runnable() { @Override public void run() { // connect to the node if possible try { transportService.connectToNode(requestingNode); transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } }); } catch (Exception e) { if (lifecycle.started()) { logger.warn("failed to connect to requesting node {}", e, requestingNode); } } } }); } else { transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { if (lifecycle.started()) { logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); } } }); } } }
另外的一個(gè)方法是處理外部ping信息,處理過程是返回cluster的信息(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應(yīng)MulticastZenPing的過程,收到其它節(jié)點(diǎn)的響應(yīng)信息后它會(huì)把本節(jié)點(diǎn)及集群的master節(jié)點(diǎn)相關(guān)信息返回給廣播節(jié)點(diǎn)。這樣廣播節(jié)點(diǎn)就獲知了集群的相關(guān)信息。在MulticastZenPing類中還有一個(gè)類 MulticastPingResponseRequestHandler,它的作用是廣播節(jié)點(diǎn)對(duì)其它節(jié)點(diǎn)對(duì)廣播信息響應(yīng)的回應(yīng),廣播節(jié)點(diǎn)的第二次發(fā)送信息的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動(dòng)時(shí)注冊(cè)到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。
ping請(qǐng)求的發(fā)送策略
代碼如下:
public void ping(final PingListener listener, final TimeValue timeout) { .... //產(chǎn)生一個(gè)id final int id = pingIdGenerator.incrementAndGet(); try { receivedResponses.put(id, new PingCollection()); sendPingRequest(id);//第一次發(fā)送ping請(qǐng)求 // 等待時(shí)間的1/2后再次發(fā)送一個(gè)請(qǐng)求 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to send second ping request", t, id); finalizePingCycle(id, listener); } @Override public void doRun() { sendPingRequest(id); //再過1/2時(shí)間再次發(fā)送一個(gè)請(qǐng)求 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to send third ping request", t, id); finalizePingCycle(id, listener); } @Override public void doRun() { // make one last ping, but finalize as soon as all nodes have responded or a timeout has past PingCollection collection = receivedResponses.get(id); FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener); receivedResponses.put(id, finalizingPingCollection); logger.trace("[{}] sending last pings", id); sendPingRequest(id); //最后一次發(fā)送請(qǐng)求,超時(shí)的1/4后 threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override public void onFailure(Throwable t) { logger.warn("[{}] failed to finalize ping", t, id); } @Override protected void doRun() throws Exception { finalizePingCycle(id, listener); } }); } }); } }); } catch (Exception e) { logger.warn("failed to ping", e); finalizePingCycle(id, listener); } }
發(fā)送過程主要是調(diào)用sendPingRequest(id)方法,在該方法中會(huì)將id,信息頭,版本,本地節(jié)點(diǎn)信息一起寫入到BytesStreamOutput中然后將其進(jìn)行廣播,這個(gè)廣播信息會(huì)被其它機(jī)器上的Receiver接收并處理,并且響應(yīng)該ping請(qǐng)求。另外一個(gè)需要關(guān)注的是以上加說明的部分,它通過鏈時(shí)的定期發(fā)送請(qǐng)求,在等待時(shí)間內(nèi)可能會(huì)發(fā)出4次請(qǐng)求,這種發(fā)送方式會(huì)造成大量的ping請(qǐng)求重復(fù),幸好ping的資源消耗小,但是好處是可以盡可能保證在timeout這個(gè)時(shí)間段內(nèi)集群的新增節(jié)點(diǎn)都能收到這個(gè)ping信息。在單播中也采用了該策略。
總結(jié)
廣播的過程:廣播使用的是jdk的MulticastSocket,在timeout時(shí)間內(nèi)4次發(fā)生ping請(qǐng)求,ping請(qǐng)求包括一個(gè)id,信息頭,本地節(jié)點(diǎn)的一些信息;這些信息在其它節(jié)點(diǎn)中被接收到交給Receiver處理,Receiver會(huì)將集群的master和本機(jī)的相關(guān)信息通過transport返回給廣播節(jié)點(diǎn)。廣播節(jié)點(diǎn)收到這些信息后會(huì)理解使用transport返回一個(gè)空的response。至此一個(gè)廣播過程完成。
在節(jié)點(diǎn)分布在多個(gè)網(wǎng)段時(shí),廣播就失效了,因?yàn)閺V播信息不可達(dá)。這個(gè)時(shí)間就需要使用單播去ping指定的節(jié)點(diǎn)獲取cluster的相關(guān)信息。這就是單播的用處。單播使用的是NettyTransport,它會(huì)使用跟廣播一樣的鏈?zhǔn)秸?qǐng)求向指定的節(jié)點(diǎn)發(fā)送請(qǐng)求。信息的處理方式是之前所介紹的NettyTransport標(biāo)準(zhǔn)的信息處理過程。
以上就是elasticsearch集群發(fā)現(xiàn)zendiscovery的Ping機(jī)制分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch集群發(fā)現(xiàn)zendiscovery Ping的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot使用Sa-Token實(shí)現(xiàn)賬號(hào)封禁、分類封禁、階梯封禁的示例代碼
本文主要介紹了SpringBoot使用Sa-Token實(shí)現(xiàn)賬號(hào)封禁、分類封禁、階梯封禁的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07springboot yml定義屬性,下文中${} 引用說明
這篇文章主要介紹了springboot yml定義屬性,下文中${} 引用說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-04-04idea項(xiàng)目debug模式無法啟動(dòng)的解決
這篇文章主要介紹了idea項(xiàng)目debug模式無法啟動(dòng)的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02Spring監(jiān)聽器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Spring監(jiān)聽器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07