netty?pipeline中的inbound和outbound事件傳播分析
傳播inbound事件
有關(guān)于inbound
事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead
事件, 就是對方發(fā)來數(shù)據(jù)流的所觸發(fā)的事件, 己方要對這些數(shù)據(jù)進行處理, 這一小節(jié), 以激活channelRead
為例講解有關(guān)inbound
事件的處理流程。
在業(yè)務(wù)代碼中, 我們自己的handler
往往會通過重寫channelRead
方法來處理對方發(fā)來的數(shù)據(jù), 那么對方發(fā)來的數(shù)據(jù)是如何走到channelRead
方法中了呢, 也是我們這一小節(jié)要剖析的內(nèi)容。
在業(yè)務(wù)代碼中, 傳遞channelRead
事件方式是通過fireChannelRead
方法進行傳播的。
兩種寫法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里重寫了channelRead
方法, 并且方法體內(nèi)繼續(xù)通過fireChannelRead
方法進行傳播channelRead
事件, 那么這兩種寫法有什么異同?
我們先以寫法2為例, 將這種寫法進行剖析。
這里首先獲取當(dāng)前context
的pipeline
對象, 然后通過pipeline
對象調(diào)用自身的fireChannelRead
方法進行傳播, 因為默認創(chuàng)建的DefaultChannelpipeline
。
DefaultChannelPipeline.fireChannelRead(msg)
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
這里首先調(diào)用的是AbstractChannelHandlerContext
類的靜態(tài)方法invokeChannelRead
, 參數(shù)傳入head
節(jié)點和事件的消息
AbstractChannelHandlerContext.invokeChannelRead(head, msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的m
通常就是我們傳入的msg
, 而next
, 目前是head
節(jié)點, 然后再判斷是否為當(dāng)前eventLoop
線程, 如果不是則將方法包裝成task
交給eventLoop
線程處理
AbstractChannelHandlerContext.invokeChannelRead(m)
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
首先通過invokeHandler()
判斷當(dāng)前handler
是否已添加, 如果添加, 則執(zhí)行當(dāng)前handler
的chanelRead
方法, 其實這里就明白了, 通過fireChannelRead
方法傳遞事件的過程中, 其實就是找到相關(guān)handler
執(zhí)行其channelRead
方法, 由于我們在這里的handler
就是head
節(jié)點, 所以我們跟到HeadContext
的channelRead
方法中
HeadContext的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下傳遞channelRead事件 ctx.fireChannelRead(msg); }
在這里我們看到, 這里通過fireChannelRead
方法繼續(xù)往下傳遞channelRead
事件, 而這種調(diào)用方式, 就是我們剛才分析用戶代碼的第一種調(diào)用方式
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里直接通過context
對象調(diào)用fireChannelRead
方法, 那么和使用pipeline
調(diào)用有什么區(qū)別的, 我會回到HeadConetx
的channelRead
方法, 我們來剖析ctx.fireChannelRead(msg)
這句, 大家就會對這個問題有答案了, 跟到ctx
的fireChannelRead
方法中, 這里會走到AbstractChannelHandlerContext
類中的fireChannelRead
方法中
AbstractChannelHandlerContext.fireChannelRead(msg)
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這里我們看到, invokeChannelRead
方法中傳入了一個findContextInbound()
參數(shù), 而這findContextInbound
方法其實就是找到當(dāng)前Context
的下一個節(jié)點
AbstractChannelHandlerContext.findContextInbound()
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
這里的邏輯也比較簡單, 是通過一個doWhile
循環(huán), 找到當(dāng)前handlerContext
的下一個節(jié)點, 這里要注意循環(huán)的終止條件, while (!ctx.inbound)
表示下一個context
標志的事件不是inbound
的事件, 則循環(huán)繼續(xù)往下找, 言外之意就是要找到下一個標注inbound
事件的節(jié)點
有關(guān)事件的標注, 之前已經(jīng)進行了分析, 如果是用戶定義的handler
, 是通過handler
繼承的接口而定的, 如果tail
或者head
, 那么是在初始化的時候就已經(jīng)定義好, 這里不再贅述
回到AbstractChannelHandlerContext.fireChannelRead(msg)
AbstractChannelHandlerContext.fireChannelRead(msg)
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
找到下一個節(jié)點后, 繼續(xù)調(diào)用invokeChannelRead
方法, 傳入下一個和消息對象
AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的邏輯我們又不陌生了, 因為我們傳入的是當(dāng)前context
的下一個節(jié)點, 所以這里會調(diào)用下一個節(jié)點invokeChannelRead
方法, 因我們剛才剖析的是head
節(jié)點, 所以下一個節(jié)點有可能是用戶添加的handler
的包裝類HandlerConext
的對象
AbstractChannelHandlerContext.invokeChannelRead(Object msg)
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //發(fā)生異常的時候在這里捕獲異常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }
又是我們熟悉的邏輯, 調(diào)用了自身handler
的channelRead
方法, 如果是用戶自定義的handler
, 則會走到用戶定義的channelRead()
方法中去, 所以這里就解釋了為什么通過傳遞channelRead
事件, 最終會走到用戶重寫的channelRead
方法中去
同樣, 也解釋了該小節(jié)最初提到過的兩種寫法的區(qū)別
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
- 寫法1是通過當(dāng)前節(jié)點往下傳播事件
- 寫法2是通過頭節(jié)點往下傳遞事件
- 所以, 在
handler
中如果要在channelRead
方法中傳遞channelRead
事件, 一定要采用寫法1的方式向下傳遞, 或者交給其父類處理, 如果采用2的寫法則每次事件傳輸?shù)竭@里都會繼續(xù)從head
節(jié)點傳輸, 從而陷入死循環(huán)或者發(fā)生異常 - 還有一點需要注意, 如果用戶代碼中
channelRead
方法, 如果沒有顯示的調(diào)用ctx.fireChannelRead(msg)
那么事件則不會再往下傳播, 則事件會在這里終止, 所以如果我們寫業(yè)務(wù)代碼的時候要考慮有關(guān)資源釋放的相關(guān)操作
如果ctx.fireChannelRead(msg)
則事件會繼續(xù)往下傳播, 如果每一個handler
都向下傳播事件, 當(dāng)然, 根據(jù)我們之前的分析channelRead
事件只會在標識為inbound
事件的HandlerConetext
中傳播, 傳播到最后, 則最終會調(diào)用到tail
節(jié)點的channelRead
方法
tailConext的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
onUnhandledInboundMessage(msg)
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //釋放資源 ReferenceCountUtil.release(msg); } }
這里做了釋放資源的相關(guān)的操作
到這里,對于inbound
事件的傳輸流程以及channelRead
方法的執(zhí)行流程已經(jīng)分析完畢。
傳播outBound事件
有關(guān)于outBound
事件, 和inbound
正好相反,以自己為基準, 流向?qū)Ψ降氖录? 比如最常見的wirte
事件
在業(yè)務(wù)代碼中, , 有可能使用wirte方法往寫數(shù)據(jù)
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }
當(dāng)然, 直接調(diào)用write
方法是不能往對方channel
中寫入數(shù)據(jù)的, 因為這種方式只能寫入到緩沖區(qū), 還要調(diào)用flush
方法才能將緩沖區(qū)數(shù)據(jù)刷到channel
中, 或者直接調(diào)用writeAndFlush
方法, 有關(guān)邏輯, 我們會在后面章節(jié)中詳細講解, 這里只是以wirte
方法為例為了演示outbound
事件的傳播的流程
兩種寫法
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
這兩種寫法有什么區(qū)別, 首先分析第一種寫法
//這里獲取ctx所綁定的channel ctx.channel().write("test data");
AbstractChannel.write(Object msg)
public ChannelFuture write(Object msg) { //這里pipeline是DefaultChannelPipeline return pipeline.write(msg); }
繼續(xù)跟蹤DefaultChannelPipeline.write(msg)
DefaultChannelPipeline.write(msg)
public final ChannelFuture write(Object msg) { //從tail節(jié)點開始(從最后的節(jié)點往前寫) return tail.write(msg); }
這里調(diào)用tail
節(jié)點write
方法, 這里我們應(yīng)該能分析到, outbound
事件, 是通過tail
節(jié)點開始往上傳播的。
其實tail
節(jié)點并沒有重寫write
方法, 最終會調(diào)用其父類AbstractChannelHandlerContext.write方法
AbstractChannelHandlerContext.write(Object msg)
public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
這里有個newPromise()
這個方法, 這里是創(chuàng)建一個Promise
對象, 有關(guān)Promise
的相關(guān)知識會在以后章節(jié)進行分析,繼續(xù)分析write
AbstractChannelHandlerContext.write(final Object msg, final ChannelPromise promise)
public ChannelFuture write(final Object msg, final ChannelPromise promise) { /** * 省略 * */ write(msg, false, promise); return promise; }
AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里跟我們之前分析過channelRead
方法有點類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()
是獲取上一個標注outbound
事件的HandlerContext
AbstractChannelHandlerContext.findContextOutbound()
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
這里的邏輯跟之前的findContextInbound()
方法有點像, 只是過程是反過來的
在這里, 會找到當(dāng)前context
的上一個節(jié)點, 如果標注的事件不是outbound
事件, 則繼續(xù)往上找, 意思就是找到上一個標注outbound
事件的節(jié)點
回到AbstractChannelHandlerContext.write方法
AbstractChannelHandlerContext next = findContextOutbound();
這里將找到節(jié)點賦值到next
屬性中,因為我們之前分析的write
事件是從tail
節(jié)點傳播的, 所以上一個節(jié)點就有可能是用戶自定的handler
所屬的context
然后判斷是否為當(dāng)前eventLoop
線程, 如果是不是, 則封裝成task
異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush
方法, 因為我們這里沒有調(diào)用, 所以會執(zhí)行到next.invokeWrite(m, promise)
AbstractChannelHandlerContext.invokeWrite(Object msg, ChannelPromise promise)
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這里會判斷當(dāng)前handler
的狀態(tài)是否是添加狀態(tài), 這里返回的是true
, 將會走到invokeWrite0(msg, promise)
這一步
AbstractChannelHandlerContext.invokeWrite0(Object msg, ChannelPromise promise)
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調(diào)用當(dāng)前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
這里的邏輯也似曾相識, 調(diào)用了當(dāng)前節(jié)點包裝的handler
的write
方法, 如果用戶沒有重寫write
方法, 則會交給其父類處理
ChannelOutboundHandlerAdapter.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
這里調(diào)用了當(dāng)前ctx
的write
方法, 這種寫法和我們小節(jié)開始的寫法是相同的, 我們回顧一下
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
我們跟到其write
方法中, 這里走到的是AbstractChannelHandlerContext
類的write
方法
AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點的上一個標注事件為outbound
事件的節(jié)點, 繼續(xù)執(zhí)行invokeWrite
方法, 根據(jù)之前的剖析, 我們知道最終會執(zhí)行到上一個handler
的write
方法中。
走到這里已經(jīng)不難理解, ctx.channel().write("test data")
其實是從tail
節(jié)點開始傳播寫事件, 而ctx.write("test data")
是從自身開始傳播寫事件。
所以, 在handler
中如果重寫了write
方法要傳遞write
事件, 一定采用ctx.write("test data")
這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")
這種方式, 因為會造成每次事件傳輸?shù)竭@里都會從tail
節(jié)點重新傳輸, 導(dǎo)致不可預(yù)知的錯誤。
如果用代碼中沒有重寫handler
的write
方法, 則事件會一直往上傳輸, 當(dāng)傳輸完所有的outbound
節(jié)點之后, 最后會走到head
節(jié)點的wirte
方法中。
HeadContext.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
我們看到write
事件最終會流向這里, 通過unsafe
對象進行最終的寫操作
inbound事件和outbound事件的傳輸流程圖
以上就是netty中pipeline的inbound和outbound事件傳播分析的詳細內(nèi)容,更多關(guān)于netty pipeline事件傳播的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java如何使用ReentrantLock實現(xiàn)長輪詢
這篇文章主要介紹了如何使用ReentrantLock實現(xiàn)長輪詢,對ReentrantLock感興趣的同學(xué),可以參考下2021-04-04SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收
這篇文章主要介紹了SpringBoot webSocket實現(xiàn)發(fā)送廣播、點對點消息和Android接收,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-03-03解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstr
這篇文章主要介紹了解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的區(qū)別及在springboot常用地方,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12Java數(shù)據(jù)結(jié)構(gòu)之散列表詳解
散列表(Hash table,也叫哈希表),是根據(jù)關(guān)鍵碼值(Key value)而直接進行訪問的數(shù)據(jù)結(jié)構(gòu)。本文將為大家具體介紹一下散列表的原理及其代碼實現(xiàn)2022-01-01java Iterator接口和LIstIterator接口分析
這篇文章主要介紹了java Iterator接口和LIstIterator接口分析的相關(guān)資料,需要的朋友可以參考下2017-05-05