Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯剖析
概述
在我們上一個(gè)章節(jié)遺留過一個(gè)問題, 就是如果Server在讀取客戶端的數(shù)據(jù)的時(shí)候, 如果一次讀取不完整, 就觸發(fā)channelRead事件, 那么Netty是如何處理這類問題的, 在這一章中, 會(huì)對(duì)此做詳細(xì)剖析
之前的章節(jié)我們學(xué)習(xí)過pipeline, 事件在pipeline中傳遞, handler可以將事件截取并對(duì)其處理, 而之后剖析的編解碼器, 其實(shí)就是一個(gè)handler, 截取byteBuf中的字節(jié), 然后組建成業(yè)務(wù)需要的數(shù)據(jù)進(jìn)行繼續(xù)傳播
編碼器, 通常是OutBoundHandler, 也就是以自身為基準(zhǔn), 對(duì)那些對(duì)外流出的數(shù)據(jù)做處理, 所以也叫編碼器, 將數(shù)據(jù)經(jīng)過編碼發(fā)送出去
解碼器, 通常是inboundHandler, 也就是以自身為基準(zhǔn), 對(duì)那些流向自身的數(shù)據(jù)做處理, 所以也叫解碼器, 將對(duì)向的數(shù)據(jù)接收之后經(jīng)過解碼再進(jìn)行使用
同樣, 在netty的編碼器中, 也會(huì)對(duì)半包和粘包問題做相應(yīng)的處理
什么是半包, 顧名思義, 就是不完整的數(shù)據(jù)包, 因?yàn)閚etty在輪詢讀事件的時(shí)候, 每次將channel中讀取的數(shù)據(jù), 不一定是一個(gè)完整的數(shù)據(jù)包, 這種情況, 就叫半包
粘包同樣也不難理解, 如果client往server發(fā)送數(shù)據(jù)包, 如果發(fā)送頻繁很有可能會(huì)將多個(gè)數(shù)據(jù)包的數(shù)據(jù)都發(fā)送到通道中, 如果在server在讀取的時(shí)候可能會(huì)讀取到超過一個(gè)完整數(shù)據(jù)包的長(zhǎng)度, 這種情況叫粘包
有關(guān)半包和粘包, 入下圖所示:

6-0-1
netty對(duì)半包的或者粘包的處理其實(shí)也很簡(jiǎn)單, 通過之前的學(xué)習(xí), 我們知道, 每個(gè)handler是和channel唯一綁定的, 一個(gè)handler只對(duì)應(yīng)一個(gè)channel, 所以將channel中的數(shù)據(jù)讀取時(shí)候經(jīng)過解析, 如果不是一個(gè)完整的數(shù)據(jù)包, 則解析失敗, 將這塊數(shù)據(jù)包進(jìn)行保存, 等下次解析時(shí)再和這個(gè)數(shù)據(jù)包進(jìn)行組裝解析, 直到解析到完整的數(shù)據(jù)包, 才會(huì)將數(shù)據(jù)包進(jìn)行向下傳遞
具體流程是在代碼中如何體現(xiàn)的呢?我們進(jìn)入到源碼分析中
第一節(jié): ByteToMessageDecoder
ByteToMessageDecoder解碼器, 顧名思義, 是一個(gè)將Byte解析成消息的解碼器,
我們看他的定義
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
//類體省略
}這里繼承了ChannelInboundHandlerAdapter, 根據(jù)之前的學(xué)習(xí), 我們知道, 這是個(gè)inbound類型的handler, 也就是處理流向自身事件的handler
其次, 該類通過abstract關(guān)鍵字修飾, 說明是個(gè)抽象類, 在我們實(shí)際使用的時(shí)候, 并不是直接使用這個(gè)類, 而是使用其子類, 類定義了解碼器的骨架方法, 具體實(shí)現(xiàn)邏輯交給子類, 同樣, 在半包處理中也是由該類進(jìn)行實(shí)現(xiàn)的
netty中很多解碼器都實(shí)現(xiàn)了這個(gè)類, 并且, 我們也可以通過實(shí)現(xiàn)該類進(jìn)行自定義解碼器
我們重點(diǎn)關(guān)注一下該類的一個(gè)屬性:
ByteBuf cumulation;
這個(gè)屬性, 就是有關(guān)半包處理的關(guān)鍵屬性, 從概述中我們知道, netty會(huì)將不完整的數(shù)據(jù)包進(jìn)行保存, 這個(gè)數(shù)據(jù)包就是保存在這個(gè)屬性中
之前的學(xué)習(xí)我們知道, ByteBuf讀取完數(shù)據(jù)會(huì)傳遞channelRead事件, 傳播過程中會(huì)調(diào)用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關(guān)鍵部分
我們看其channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果message是byteBuf類型
if (msg instanceof ByteBuf) {
//簡(jiǎn)單當(dāng)成一個(gè)arrayList, 用于盛放解析到的對(duì)象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//當(dāng)前累加器為空, 說明這是第一次從io流里面讀取數(shù)據(jù)
first = cumulation == null;
if (first) {
//如果是第一次, 則將累加器賦值為剛讀進(jìn)來的對(duì)象
cumulation = data;
} else {
//如果不是第一次, 則把當(dāng)前累加的數(shù)據(jù)和讀進(jìn)來的數(shù)據(jù)進(jìn)行累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//調(diào)用子類的方法進(jìn)行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長(zhǎng)度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
//不是byteBuf類型則向下傳播
ctx.fireChannelRead(msg);
}
}這方法比較長(zhǎng), 帶大家一步步剖析
首先判斷如果傳來的數(shù)據(jù)是ByteBuf, 則進(jìn)入if塊中
CodecOutputList out = CodecOutputList.newInstance() 這里就當(dāng)成一個(gè)ArrayList就好, 用于盛放解碼完成的數(shù)據(jù)
ByteBuf data = (ByteBuf) msg 這步將數(shù)據(jù)轉(zhuǎn)化成ByteBuf
first = cumulation == null 這里表示如果cumulation == null, 說明沒有存儲(chǔ)板半包數(shù)據(jù), 則將當(dāng)前的數(shù)據(jù)保存在屬性cumulation中
如果 cumulation != null , 說明存儲(chǔ)了半包數(shù)據(jù), 則通過cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的數(shù)據(jù)和原來的數(shù)據(jù)進(jìn)行累加, 保存在屬性cumulation中
我們看cumulator屬性
private Cumulator cumulator = MERGE_CUMULATOR;
這里調(diào)用了其靜態(tài)屬性MERGE_CUMULATOR, 我們跟過去:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
//不能到過最大內(nèi)存
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
//將當(dāng)前數(shù)據(jù)buffer
buffer.writeBytes(in);
in.release();
return buffer;
}
};這里創(chuàng)建了Cumulator類型的靜態(tài)對(duì)象, 并重寫了cumulate方法, 這里cumulate方法, 就是用于將ByteBuf進(jìn)行拼接的方法:
方法中, 首先判斷cumulation的寫指針+in的可讀字節(jié)數(shù)是否超過了cumulation的最大長(zhǎng)度, 如果超過了, 將對(duì)cumulation進(jìn)行擴(kuò)容, 如果沒超過, 則將其賦值到局部變量buffer中
然后將in的數(shù)據(jù)寫到buffer中, 將in進(jìn)行釋放, 返回寫入數(shù)據(jù)后的ByteBuf
回到channelRead方法中:
最后通過callDecode(ctx, cumulation, out)方法進(jìn)行解碼, 這里傳入了Context對(duì)象, 緩沖區(qū)cumulation和集合out:
我們跟到callDecode(ctx, cumulation, out)方法中:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//只要累加器里面有數(shù)據(jù)
while (in.isReadable()) {
int outSize = out.size();
//判斷當(dāng)前List是否有對(duì)象
if (outSize > 0) {
//如果有對(duì)象, 則向下傳播事件
fireChannelRead(ctx, out, outSize);
//清空當(dāng)前l(fā)ist
out.clear();
//解碼過程中如ctx被removed掉就break
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//當(dāng)前可讀數(shù)據(jù)長(zhǎng)度
int oldInputLength = in.readableBytes();
//子類實(shí)現(xiàn)
//子類解析, 解析玩對(duì)象放到out里面
decode(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//List解析前大小 和解析后長(zhǎng)度一樣(什么沒有解析出來)
if (outSize == out.size()) {
//原來可讀的長(zhǎng)度==解析后可讀長(zhǎng)度
//說明沒有讀取數(shù)據(jù)(當(dāng)前累加的數(shù)據(jù)并沒有拼成一個(gè)完整的數(shù)據(jù)包)
if (oldInputLength == in.readableBytes()) {
//跳出循環(huán)(下次在讀取數(shù)據(jù)才能進(jìn)行后續(xù)的解析)
break;
} else {
//沒有解析到數(shù)據(jù), 但是進(jìn)行讀取了
continue;
}
}
//out里面有數(shù)據(jù), 但是沒有從累加器讀取數(shù)據(jù)
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}這里首先循環(huán)判斷傳入的ByteBuf是否有可讀字節(jié), 如果還有可讀字節(jié)說明沒有解碼完成, 則循環(huán)繼續(xù)解碼
然后判斷集合out的大小, 如果大小大于1, 說明out中盛放了解碼完成之后的數(shù)據(jù), 然后將事件向下傳播, 并清空out
因?yàn)槲覀兊谝淮谓獯aout是空的, 所以這里不會(huì)進(jìn)入if塊, 這部分我們稍后分析, 這里繼續(xù)往下看
通過 int oldInputLength = in.readableBytes() 獲取當(dāng)前ByteBuf, 其實(shí)也就是屬性cumulation的可讀字節(jié)數(shù), 這里就是一個(gè)備份用于比較, 我們繼續(xù)往下看:
decode(ctx, in, out)方法是最終的解碼操作, 這部會(huì)讀取cumulation并且將解碼后的數(shù)據(jù)放入到集合out中, 在ByteToMessageDecoder中的該方法是一個(gè)抽象方法, 讓子類進(jìn)行實(shí)現(xiàn), 我們使用的netty很多的解碼都是繼承了ByteToMessageDecoder并實(shí)現(xiàn)了decode方法從而完成了解碼操作, 同樣我們也可以遵循相應(yīng)的規(guī)則進(jìn)行自定義解碼器, 在之后的小節(jié)中會(huì)講解netty定義的解碼器, 并剖析相關(guān)的實(shí)現(xiàn)細(xì)節(jié), 這里我們繼續(xù)往下看:
if (outSize == out.size()) 這個(gè)判斷表示解析之前的out大小和解析之后out大小進(jìn)行比較, 如果相同, 說明并沒有解析出數(shù)據(jù), 我們進(jìn)入到if塊中:
if (oldInputLength == in.readableBytes()) 表示cumulation的可讀字節(jié)數(shù)在解析之前和解析之后是相同的, 說明解碼方法中并沒有解析數(shù)據(jù), 也就是當(dāng)前的數(shù)據(jù)并不是一個(gè)完整的數(shù)據(jù)包, 則跳出循環(huán), 留給下次解析, 否則, 說明沒有解析到數(shù)據(jù), 但是讀取了, 所以跳過該次循環(huán)進(jìn)入下次循環(huán)
最后判斷 if (oldInputLength == in.readableBytes()) , 這里代表out中有數(shù)據(jù), 但是并沒有從cumulation讀數(shù)據(jù), 說明這個(gè)out的內(nèi)容是非法的, 直接拋出異常
我們回到channRead方法中
我們關(guān)注finally中的內(nèi)容:
finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長(zhǎng)度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}首先判斷cumulation不為null, 并且沒有可讀字節(jié), 則將累加器進(jìn)行釋放, 并設(shè)置為null
之后記錄out的長(zhǎng)度, 通過fireChannelRead(ctx, out, size)將channelRead事件進(jìn)行向下傳播, 并回收out對(duì)象
我們跟到fireChannelRead(ctx, out, size)方法中:
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
//遍歷List
for (int i = 0; i < numElements; i ++) {
//逐個(gè)向下傳遞
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}這里遍歷out集合, 并將里面的元素逐個(gè)向下傳遞
以上就是有關(guān)解碼的骨架邏輯
更多關(guān)于Netty分布式解碼器讀取數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring boot創(chuàng)建自定義starter的完整步驟
這篇文章主要給大家介紹了關(guān)于Spring boot創(chuàng)建自定義starter的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Java精品項(xiàng)目瑞吉外賣之登陸的完善與退出功能篇
這篇文章主要為大家詳細(xì)介紹了java精品項(xiàng)目-瑞吉外賣訂餐系統(tǒng),此項(xiàng)目過大,分為多章獨(dú)立講解,本篇內(nèi)容為新增菜品和分頁查詢功能的實(shí)現(xiàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05
java枚舉使用詳細(xì)介紹及實(shí)現(xiàn)
這篇文章主要介紹了java枚舉使用詳細(xì)介紹及實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2017-06-06
IDEA使用學(xué)生郵箱無法注冊(cè)問題:JetBrains Account connection error: 拒絕連接
這篇文章主要介紹了IDEA使用學(xué)生郵箱無法注冊(cè)問題:JetBrains Account connection error: 拒絕連接,文中通過圖文及示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07
使用Spring掃描Mybatis的mapper接口的三種配置
這篇文章主要介紹了使用Spring掃描Mybatis的mapper接口的三種配置,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
Spring security實(shí)現(xiàn)權(quán)限管理示例
這篇文章主要介紹了Spring security實(shí)現(xiàn)權(quán)限管理示例,這里整理了詳細(xì)的代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-01-01

