亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

elasticsearch節(jié)點(diǎn)的transport請(qǐng)求發(fā)送處理分析

 更新時(shí)間:2022年04月21日 15:54:35   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch節(jié)點(diǎn)的transport請(qǐng)求發(fā)送處理分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

transport請(qǐng)求的發(fā)送和處理過程

前一篇分析對(duì)nettytransport的啟動(dòng)及連接,本篇主要分析transport請(qǐng)求的發(fā)送和處理過程。

cluster中各個(gè)節(jié)點(diǎn)之間需要相互發(fā)送很多信息,如master檢測(cè)其它節(jié)點(diǎn)是否存在,node節(jié)點(diǎn)定期檢測(cè)master節(jié)點(diǎn)是否存儲(chǔ),cluster狀態(tài)的發(fā)布及搜索數(shù)據(jù)請(qǐng)求等等。為了保證信息傳輸,elasticsearch定義了一個(gè)19字節(jié)長(zhǎng)度的信息頭HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'開頭,接著是4字節(jié)int信息長(zhǎng)度,然后是8字節(jié)long型信息id,接著是一個(gè)字節(jié)的status,最后是4字節(jié)int型version。

所有的節(jié)點(diǎn)間的信息都是以這19個(gè)字節(jié)開始。同時(shí)elasticsearch對(duì)于節(jié)點(diǎn)間的所有action都定義 了名字,如對(duì)master的周期檢測(cè)action,internal:discovery/zen/fd/master_ping,每個(gè)action對(duì)應(yīng)著相應(yīng)的messagehandler。接下來會(huì)進(jìn)行詳分析。

request的發(fā)送過程

代碼在nettytransport中如下所示:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
        //參數(shù)說明:node發(fā)送的目的節(jié)點(diǎn),requestId請(qǐng)求id,action action名稱,request請(qǐng)求,options包括以下幾種操作 RECOVERY,BULK,REG,STATE,PING;
     Channel targetChannel = nodeChannel(node, options);//獲取對(duì)應(yīng)節(jié)點(diǎn)的channel,channel在連接節(jié)點(diǎn)時(shí)初始化完成(請(qǐng)參考上一篇)
        if (compress) {
            options.withCompress(true);
        }
        byte status = 0;
     //設(shè)置status 包括以下幾種STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
    status = TransportStatus.setRequest(status); 
     ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始寫出流
        boolean addedReleaseListener = false;
        try {
            bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
            StreamOutput stream = bStream;
            // only compress if asked, and, the request is not bytes, since then only
            // the header part is compressed, and the "body" can't be extracted as compressed
            if (options.compress() && (!(request instanceof BytesTransportRequest))) {
                status = TransportStatus.setCompress(status);
                stream = CompressorFactory.defaultCompressor().streamOutput(stream);
            }
            stream = new HandlesStreamOutput(stream);
            // we pick the smallest of the 2, to support both backward and forward compatibility
            // note, this is the only place we need to do this, since from here on, we use the serialized version
            // as the version to use also when the node receiving this request will send the response with
            Version version = Version.smallest(this.version, node.version());
            stream.setVersion(version);
            stream.writeString(transportServiceAdapter.action(action, version));
            ReleasableBytesReference bytes;
            ChannelBuffer buffer;
            // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
            // that create paged channel buffers, but its tricky to know when to do it (where this option is
            // more explicit).
            if (request instanceof BytesTransportRequest) {
                BytesTransportRequest bRequest = (BytesTransportRequest) request;
                assert node.version().equals(bRequest.version());
                bRequest.writeThin(stream);
                stream.close();
                bytes = bStream.bytes();
                ChannelBuffer headerBuffer = bytes.toChannelBuffer();
                ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
                buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
            } else {
                request.writeTo(stream);
                stream.close();
                bytes = bStream.bytes();
                buffer = bytes.toChannelBuffer();
            }
            NettyHeader.writeHeader(buffer, requestId, status, version);//寫信息頭
            ChannelFuture future = targetChannel.write(buffer);//寫buffer同時(shí)獲取future,發(fā)送信息發(fā)生在這里
            ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
            future.addListener(listener);//添加listener
            addedReleaseListener = true;
            transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
        } finally {
            if (!addedReleaseListener) {
                Releasables.close(bStream.bytes());
            }
        }
    }

以上就是request的發(fā)送過程,獲取目標(biāo)node的channel封裝請(qǐng)求寫入信息頭,然后發(fā)送并使用listener監(jiān)聽,這里transportRequest是一個(gè)抽象類,它繼承了TransportMessage同時(shí)實(shí)現(xiàn)了streamable接口。cluster中對(duì)它的實(shí)現(xiàn)非常多,各個(gè)功能都有相應(yīng)的request,這里就不一一列舉,后面的代碼分析中會(huì)時(shí)常涉及。

request的接受過程

request發(fā)送只是transport的一部分功能,有發(fā)送就要有接收,這樣transport的功能才完整。接下來就是對(duì)接收過程的分析。上一篇中簡(jiǎn)單介紹過netty的使用,message的處理是通過MessageHandler處理,因此nettyTransport的信息處理邏輯都在MessageChannelHandler的messageReceived()方法中,代碼如下所示:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Transports.assertTransportThread();
        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {//非buffer之間返回
            ctx.sendUpstream(e);
            return;
        }
     //解析message頭
        ChannelBuffer buffer = (ChannelBuffer) m;
        int size = buffer.getInt(buffer.readerIndex() - 4);
        transportServiceAdapter.received(size + 6);
        // we have additional bytes to read, outside of the header
        boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
        int markedReaderIndex = buffer.readerIndex();
        int expectedIndexReader = markedReaderIndex + size;
        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
        // buffer, or in the cumlation buffer, which is cleaned each time
        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
      //讀取信息頭中的幾個(gè)重要元數(shù)據(jù)
        long requestId = buffer.readLong();
        byte status = buffer.readByte();
        Version version = Version.fromId(buffer.readInt());
        StreamInput wrappedStream;
      …………
        if (TransportStatus.isRequest(status)) {//處理請(qǐng)求
            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                } else {
                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                }
                buffer.readerIndex(expectedIndexReader);
            }
        } else {//處理響應(yīng)
            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
            // ignore if its null, the adapter logs it
            if (handler != null) {
                if (TransportStatus.isError(status)) {
                    handlerResponseError(wrappedStream, handler);
                } else {
                    handleResponse(ctx.getChannel(), wrappedStream, handler);
                }
            } else {
                // if its null, skip those bytes
                buffer.readerIndex(markedReaderIndex + size);
            }
          …………
        wrappedStream.close();
    }

以上就是信息處理邏輯,這個(gè)方法基礎(chǔ)自netty的SimpleChannelUpstreamHandler類。作為MessageHandler會(huì)在client和server啟動(dòng)時(shí)加入到handler鏈中,在信息到達(dá)后netty會(huì)自動(dòng)調(diào)用handler鏈依次處理。這是netty的內(nèi)容,就不詳細(xì)說明,請(qǐng)參考netty文檔。

request和response是如何被處理

request的處理

代碼如下所示:

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();//讀出action的名字
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//獲取處理該信息的handler
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);//使用該handler處理信息。
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }

幾個(gè)關(guān)鍵部分在代碼中進(jìn)行了標(biāo)注。這里仍舊不能看到請(qǐng)求是如何處理的。因?yàn)閏luster中的請(qǐng)求各種各樣,如ping,discovery,index等等,因此不可能使用同一種處理方式。因此request最終又被提交給handler處理。每個(gè)功能請(qǐng)求都實(shí)現(xiàn)了自己的handler,當(dāng)請(qǐng)求被提交給handler時(shí)會(huì)做對(duì)應(yīng)的處理。這里再說一下transportServiceAdapter,消息的處理都是通過它適配轉(zhuǎn)發(fā)完成。request的完整處理流程是:messageReceived()方法收到信息判斷是request會(huì)將其轉(zhuǎn)發(fā)到transportServiceAdapter的handler方法,handler方法查找對(duì)應(yīng)的requesthandler,使用將信息轉(zhuǎn)發(fā)給該handler進(jìn)行處理。這里就不舉例說明,在后面的discover分析中我們會(huì)看到發(fā)現(xiàn),ping等請(qǐng)求的處理過程。

response的處理過程

response通過handleResponse方法進(jìn)行處理,代碼如下:

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
        final TransportResponse response = handler.newInstance();
        response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
        response.remoteAddress();
        try {
            response.readFrom(buffer);
        } catch (Throwable e) {
            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        try {
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.handleResponse(response);//轉(zhuǎn)發(fā)給對(duì)應(yīng)的handler
            } else {
                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
            }
        } catch (Throwable e) {
            handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

response的處理過程跟request很類似。每個(gè)request都會(huì)對(duì)應(yīng)一個(gè)handler和一個(gè)response的處理handler,會(huì)在時(shí)候的時(shí)候注冊(cè)到transportService中。請(qǐng)求到達(dá)時(shí)根據(jù)action名稱獲取到handler處理request,根據(jù)requestId獲取對(duì)應(yīng)的response handler進(jìn)行響應(yīng)。

最后總結(jié)

nettyTransport的信息處理過程:信息通過request方法發(fā)送到目標(biāo)節(jié)點(diǎn),目標(biāo)節(jié)點(diǎn)的messagehandler會(huì)受到該信息,確定是request還是response,將他們分別轉(zhuǎn)發(fā)給transportServiceAdapter,TransportServiceAdapter會(huì)查詢到對(duì)應(yīng)的handler,信息最終會(huì)被轉(zhuǎn)發(fā)給對(duì)應(yīng)的handler處理并反饋。

對(duì)于nettyTransport信息發(fā)送的分析就到這里,在下一篇的cluster discovery分析中,我們會(huì)看到信息發(fā)送及處理的具體過程,希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java基數(shù)排序radix sort原理及用法解析

    Java基數(shù)排序radix sort原理及用法解析

    這篇文章主要介紹了Java基數(shù)排序radix sort原理及用法解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Spring使用Setter完成依賴注入方式

    Spring使用Setter完成依賴注入方式

    這篇文章主要介紹了Spring使用Setter完成依賴注入方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java 3種方法實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換

    Java 3種方法實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換

    這篇文章主要介紹了Java 3種方法實(shí)現(xiàn)進(jìn)制轉(zhuǎn)換,幫助大家利用Java處理數(shù)據(jù),感興趣的朋友可以了解下
    2020-09-09
  • 解決idea創(chuàng)建版本時(shí)只有Java21和Java17選項(xiàng)

    解決idea創(chuàng)建版本時(shí)只有Java21和Java17選項(xiàng)

    你是否在使用IntelliJ?IDEA創(chuàng)建新項(xiàng)目時(shí)遇到了只有Java?21和Java?17的選項(xiàng)?別擔(dān)心,我們的指南將為你提供解決方案,通過簡(jiǎn)單的步驟,你將能夠選擇你需要的任何Java版本,繼續(xù)閱讀,讓我們開始吧!
    2024-03-03
  • 最全Gson使用

    最全Gson使用

    GSON彌補(bǔ)了JSON的許多不足的地方,在實(shí)際應(yīng)用中更加適用于Java開發(fā),本文主要介紹了最全Gson使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • Java線程等待用法實(shí)例分析

    Java線程等待用法實(shí)例分析

    這篇文章主要介紹了Java線程等待用法,結(jié)合實(shí)例形式分析了obj.wait()實(shí)現(xiàn)線程等待相關(guān)原理與操作技巧,需要的朋友可以參考下
    2018-09-09
  • 淺析Java中關(guān)鍵詞volatile底層的實(shí)現(xiàn)原理

    淺析Java中關(guān)鍵詞volatile底層的實(shí)現(xiàn)原理

    在 Java 并發(fā)編程中,有 3 個(gè)最常用的關(guān)鍵字:synchronized、ReentrantLock 和 volatile,這篇文章主要來和大家聊聊volatile底層的實(shí)現(xiàn)原理,感興趣的可以了解下
    2024-02-02
  • Java中實(shí)現(xiàn)WebSocket方法詳解

    Java中實(shí)現(xiàn)WebSocket方法詳解

    這篇文章主要介紹了Java中實(shí)現(xiàn)WebSocket方法詳解,WebSocket?是一種新型的網(wǎng)絡(luò)協(xié)議,它允許客戶端和服務(wù)器之間進(jìn)行雙向通信,可以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)交互,需要的朋友可以參考下
    2023-07-07
  • MyBatis的核心配置文件以及映射文件

    MyBatis的核心配置文件以及映射文件

    這篇文章主要介紹了MyBatis的核心配置文件以及映射文件,Mybatis它是一款半自動(dòng)的ORM持久層框架,具有較高的SQL靈活性,支持高級(jí)映射(一對(duì)一,一對(duì)多),動(dòng)態(tài)SQL,延遲加載和緩存等特性,但它的數(shù)據(jù)庫無關(guān)性較低,需要的朋友可以參考下
    2023-05-05
  • Idea中如何查看SpringSecurity各Filter信息

    Idea中如何查看SpringSecurity各Filter信息

    這篇文章主要介紹了Idea中如何查看SpringSecurity各Filter信息,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-01-01

最新評(píng)論