亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

netty?pipeline中的inbound和outbound事件傳播分析

 更新時間:2023年04月25日 11:12:47   作者:寬仔的編程之路  
這篇文章主要為大家介紹了netty?pipeline中的inbound和outbound事件傳播分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

傳播inbound事件

有關(guān)于inbound事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead事件, 就是對方發(fā)來數(shù)據(jù)流的所觸發(fā)的事件, 己方要對這些數(shù)據(jù)進行處理, 這一小節(jié), 以激活channelRead為例講解有關(guān)inbound事件的處理流程。

在業(yè)務(wù)代碼中, 我們自己的handler往往會通過重寫channelRead方法來處理對方發(fā)來的數(shù)據(jù), 那么對方發(fā)來的數(shù)據(jù)是如何走到channelRead方法中了呢, 也是我們這一小節(jié)要剖析的內(nèi)容。

在業(yè)務(wù)代碼中, 傳遞channelRead事件方式是通過fireChannelRead方法進行傳播的。

兩種寫法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}

這里重寫了channelRead方法, 并且方法體內(nèi)繼續(xù)通過fireChannelRead方法進行傳播channelRead事件, 那么這兩種寫法有什么異同?

我們先以寫法2為例, 將這種寫法進行剖析。

這里首先獲取當(dāng)前contextpipeline對象, 然后通過pipeline對象調(diào)用自身的fireChannelRead方法進行傳播, 因為默認創(chuàng)建的DefaultChannelpipeline

DefaultChannelPipeline.fireChannelRead(msg)

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

這里首先調(diào)用的是AbstractChannelHandlerContext類的靜態(tài)方法invokeChannelRead, 參數(shù)傳入head節(jié)點和事件的消息

AbstractChannelHandlerContext.invokeChannelRead(head, msg)

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

這里的m通常就是我們傳入的msg, 而next, 目前是head節(jié)點, 然后再判斷是否為當(dāng)前eventLoop線程, 如果不是則將方法包裝成task交給eventLoop線程處理

AbstractChannelHandlerContext.invokeChannelRead(m)

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

首先通過invokeHandler()判斷當(dāng)前handler是否已添加, 如果添加, 則執(zhí)行當(dāng)前handlerchanelRead方法, 其實這里就明白了, 通過fireChannelRead方法傳遞事件的過程中, 其實就是找到相關(guān)handler執(zhí)行其channelRead方法, 由于我們在這里的handler就是head節(jié)點, 所以我們跟到HeadContextchannelRead方法中

HeadContext的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //向下傳遞channelRead事件
    ctx.fireChannelRead(msg);
}

在這里我們看到, 這里通過fireChannelRead方法繼續(xù)往下傳遞channelRead事件, 而這種調(diào)用方式, 就是我們剛才分析用戶代碼的第一種調(diào)用方式

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}

這里直接通過context對象調(diào)用fireChannelRead方法, 那么和使用pipeline調(diào)用有什么區(qū)別的, 我會回到HeadConetxchannelRead方法, 我們來剖析ctx.fireChannelRead(msg)這句, 大家就會對這個問題有答案了, 跟到ctxfireChannelRead方法中, 這里會走到AbstractChannelHandlerContext類中的fireChannelRead方法中

AbstractChannelHandlerContext.fireChannelRead(msg)

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

這里我們看到, invokeChannelRead方法中傳入了一個findContextInbound()參數(shù), 而這findContextInbound方法其實就是找到當(dāng)前Context的下一個節(jié)點

AbstractChannelHandlerContext.findContextInbound()

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

這里的邏輯也比較簡單, 是通過一個doWhile循環(huán), 找到當(dāng)前handlerContext的下一個節(jié)點, 這里要注意循環(huán)的終止條件, while (!ctx.inbound)表示下一個context標志的事件不是inbound的事件, 則循環(huán)繼續(xù)往下找, 言外之意就是要找到下一個標注inbound事件的節(jié)點

有關(guān)事件的標注, 之前已經(jīng)進行了分析, 如果是用戶定義的handler, 是通過handler繼承的接口而定的, 如果tail或者head, 那么是在初始化的時候就已經(jīng)定義好, 這里不再贅述

回到AbstractChannelHandlerContext.fireChannelRead(msg)

AbstractChannelHandlerContext.fireChannelRead(msg)

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

找到下一個節(jié)點后, 繼續(xù)調(diào)用invokeChannelRead方法, 傳入下一個和消息對象

AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

這里的邏輯我們又不陌生了, 因為我們傳入的是當(dāng)前context的下一個節(jié)點, 所以這里會調(diào)用下一個節(jié)點invokeChannelRead方法, 因我們剛才剖析的是head節(jié)點, 所以下一個節(jié)點有可能是用戶添加的handler的包裝類HandlerConext的對象

AbstractChannelHandlerContext.invokeChannelRead(Object msg)

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try { 
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //發(fā)生異常的時候在這里捕獲異常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

又是我們熟悉的邏輯, 調(diào)用了自身handlerchannelRead方法, 如果是用戶自定義的handler, 則會走到用戶定義的channelRead()方法中去, 所以這里就解釋了為什么通過傳遞channelRead事件, 最終會走到用戶重寫的channelRead方法中去

同樣, 也解釋了該小節(jié)最初提到過的兩種寫法的區(qū)別

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //寫法1:
    ctx.fireChannelRead(msg);
    //寫法2
    ctx.pipeline().fireChannelRead(msg);
}
  • 寫法1是通過當(dāng)前節(jié)點往下傳播事件
  • 寫法2是通過頭節(jié)點往下傳遞事件
  • 所以, 在handler中如果要在channelRead方法中傳遞channelRead事件, 一定要采用寫法1的方式向下傳遞, 或者交給其父類處理, 如果采用2的寫法則每次事件傳輸?shù)竭@里都會繼續(xù)從head節(jié)點傳輸, 從而陷入死循環(huán)或者發(fā)生異常
  • 還有一點需要注意, 如果用戶代碼中channelRead方法, 如果沒有顯示的調(diào)用ctx.fireChannelRead(msg)那么事件則不會再往下傳播, 則事件會在這里終止, 所以如果我們寫業(yè)務(wù)代碼的時候要考慮有關(guān)資源釋放的相關(guān)操作

如果ctx.fireChannelRead(msg)則事件會繼續(xù)往下傳播, 如果每一個handler都向下傳播事件, 當(dāng)然, 根據(jù)我們之前的分析channelRead事件只會在標識為inbound事件的HandlerConetext中傳播, 傳播到最后, 則最終會調(diào)用到tail節(jié)點的channelRead方法

tailConext的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

onUnhandledInboundMessage(msg)

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //釋放資源
        ReferenceCountUtil.release(msg);
    }
}

這里做了釋放資源的相關(guān)的操作

到這里,對于inbound事件的傳輸流程以及channelRead方法的執(zhí)行流程已經(jīng)分析完畢。

傳播outBound事件

有關(guān)于outBound事件, 和inbound正好相反,以自己為基準, 流向?qū)Ψ降氖录? 比如最常見的wirte事件

在業(yè)務(wù)代碼中, , 有可能使用wirte方法往寫數(shù)據(jù)

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

當(dāng)然, 直接調(diào)用write方法是不能往對方channel中寫入數(shù)據(jù)的, 因為這種方式只能寫入到緩沖區(qū), 還要調(diào)用flush方法才能將緩沖區(qū)數(shù)據(jù)刷到channel中, 或者直接調(diào)用writeAndFlush方法, 有關(guān)邏輯, 我們會在后面章節(jié)中詳細講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程

兩種寫法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

這兩種寫法有什么區(qū)別, 首先分析第一種寫法

//這里獲取ctx所綁定的channel
ctx.channel().write("test data");

AbstractChannel.write(Object msg)

public ChannelFuture write(Object msg) {
	//這里pipeline是DefaultChannelPipeline
    return pipeline.write(msg);
}

繼續(xù)跟蹤DefaultChannelPipeline.write(msg)

DefaultChannelPipeline.write(msg)

public final ChannelFuture write(Object msg) {
    //從tail節(jié)點開始(從最后的節(jié)點往前寫)
    return tail.write(msg);
}

這里調(diào)用tail節(jié)點write方法, 這里我們應(yīng)該能分析到, outbound事件, 是通過tail節(jié)點開始往上傳播的。

其實tail節(jié)點并沒有重寫write方法, 最終會調(diào)用其父類AbstractChannelHandlerContext.write方法

AbstractChannelHandlerContext.write(Object msg)

public ChannelFuture write(Object msg) { 
    return write(msg, newPromise());
}

這里有個newPromise()這個方法, 這里是創(chuàng)建一個Promise對象, 有關(guān)Promise的相關(guān)知識會在以后章節(jié)進行分析,繼續(xù)分析write

AbstractChannelHandlerContext.write(final Object msg, final ChannelPromise promise)

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    /**
     * 省略
     * */
    write(msg, false, promise);
    return promise;
}

AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這里跟我們之前分析過channelRead方法有點類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()是獲取上一個標注outbound事件的HandlerContext

AbstractChannelHandlerContext.findContextOutbound()

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

這里的邏輯跟之前的findContextInbound()方法有點像, 只是過程是反過來的

在這里, 會找到當(dāng)前context的上一個節(jié)點, 如果標注的事件不是outbound事件, 則繼續(xù)往上找, 意思就是找到上一個標注outbound事件的節(jié)點

回到AbstractChannelHandlerContext.write方法

AbstractChannelHandlerContext next = findContextOutbound();

這里將找到節(jié)點賦值到next屬性中,因為我們之前分析的write事件是從tail節(jié)點傳播的, 所以上一個節(jié)點就有可能是用戶自定的handler所屬的context

然后判斷是否為當(dāng)前eventLoop線程, 如果是不是, 則封裝成task異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush方法, 因為我們這里沒有調(diào)用, 所以會執(zhí)行到next.invokeWrite(m, promise)

AbstractChannelHandlerContext.invokeWrite(Object msg, ChannelPromise promise)

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

這里會判斷當(dāng)前handler的狀態(tài)是否是添加狀態(tài), 這里返回的是true, 將會走到invokeWrite0(msg, promise)這一步

AbstractChannelHandlerContext.invokeWrite0(Object msg, ChannelPromise promise)

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //調(diào)用當(dāng)前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

這里的邏輯也似曾相識, 調(diào)用了當(dāng)前節(jié)點包裝的handlerwrite方法, 如果用戶沒有重寫write方法, 則會交給其父類處理

ChannelOutboundHandlerAdapter.write

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

這里調(diào)用了當(dāng)前ctxwrite方法, 這種寫法和我們小節(jié)開始的寫法是相同的, 我們回顧一下

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法

AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點的上一個標注事件為outbound事件的節(jié)點, 繼續(xù)執(zhí)行invokeWrite方法, 根據(jù)之前的剖析, 我們知道最終會執(zhí)行到上一個handlerwrite方法中。

走到這里已經(jīng)不難理解, ctx.channel().write("test data")其實是從tail節(jié)點開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件。

所以, 在handler中如果重寫了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因為會造成每次事件傳輸?shù)竭@里都會從tail節(jié)點重新傳輸, 導(dǎo)致不可預(yù)知的錯誤。

如果用代碼中沒有重寫handlerwrite方法, 則事件會一直往上傳輸, 當(dāng)傳輸完所有的outbound節(jié)點之后, 最后會走到head節(jié)點的wirte方法中。

HeadContext.write

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我們看到write事件最終會流向這里, 通過unsafe對象進行最終的寫操作

inbound事件和outbound事件的傳輸流程圖

以上就是netty中pipeline的inbound和outbound事件傳播分析的詳細內(nèi)容,更多關(guān)于netty pipeline事件傳播的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java 如何判斷是否是26個英文字母

    java 如何判斷是否是26個英文字母

    這篇文章主要介紹了java 如何判斷是否是26個英文字母的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • Java如何使用ReentrantLock實現(xiàn)長輪詢

    Java如何使用ReentrantLock實現(xiàn)長輪詢

    這篇文章主要介紹了如何使用ReentrantLock實現(xiàn)長輪詢,對ReentrantLock感興趣的同學(xué),可以參考下
    2021-04-04
  • SpringBoot讀取配置的6種方式

    SpringBoot讀取配置的6種方式

    本文主要介紹了SpringBoot讀取配置的6種方式,主要包括使用默認配置、使用application.properties文件、使用application.yml文件、使用@Value注解、使用Environment對象和使用ConfigurableEnvironment對象,感興趣的可以了解一下
    2023-08-08
  • SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收

    SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收

    這篇文章主要介紹了SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • 解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的區(qū)別及在springboot常用地方

    解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstr

    這篇文章主要介紹了解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的區(qū)別及在springboot常用地方,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java數(shù)據(jù)結(jié)構(gòu)之散列表詳解

    Java數(shù)據(jù)結(jié)構(gòu)之散列表詳解

    散列表(Hash table,也叫哈希表),是根據(jù)關(guān)鍵碼值(Key value)而直接進行訪問的數(shù)據(jù)結(jié)構(gòu)。本文將為大家具體介紹一下散列表的原理及其代碼實現(xiàn)
    2022-01-01
  • java  Iterator接口和LIstIterator接口分析

    java Iterator接口和LIstIterator接口分析

    這篇文章主要介紹了java Iterator接口和LIstIterator接口分析的相關(guān)資料,需要的朋友可以參考下
    2017-05-05
  • feign 如何獲取請求真實目的ip地址

    feign 如何獲取請求真實目的ip地址

    這篇文章主要介紹了feign 獲取請求真實目的ip地址操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Java?Map雙列集合使代碼更高效

    Java?Map雙列集合使代碼更高效

    這篇文章主要介紹了Java?Map雙列集合使用,使你的代碼更高效,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-12-12
  • Mybatis重置Criteria的正確姿勢分享

    Mybatis重置Criteria的正確姿勢分享

    這篇文章主要介紹了Mybatis重置Criteria的正確姿勢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12

最新評論