使用Netty解碼自定義通信協(xié)議方式
網(wǎng)絡(luò)協(xié)議的基本要素
一個完備的網(wǎng)絡(luò)協(xié)議需要具備哪些基本要素
- 魔數(shù):魔數(shù)是通信雙方協(xié)商的一個暗號,通常采用固定的幾個字節(jié)表示。魔數(shù)的作用是防止任何人隨便向服務(wù)器的端口上發(fā)送數(shù)據(jù)。
- 協(xié)議版本號:隨著業(yè)務(wù)需求的變化,協(xié)議可能需要對結(jié)構(gòu)或字段進行改動,不同版本的協(xié)議對應(yīng)的解析方法也是不同的。所以在生產(chǎn)級項目中強烈建議預(yù)留協(xié)議版本號這個字段。
- 序列化算法:表示數(shù)據(jù)發(fā)送方應(yīng)該采用何種方法將請求的對象轉(zhuǎn)化為二進制,以及如何再將二進制轉(zhuǎn)化為對象
- 報文類型:報文可能存在不同的類型。例如在 RPC 框架中有請求、響應(yīng)、心跳等類型的報文,在 IM 即時通信的場景中有登陸、創(chuàng)建群聊、發(fā)送消息、接收消息、退出群聊等類型的報文。
- 長度域字段:代表請求數(shù)據(jù)的長度,接收方根據(jù)長度域字段獲取一個完整的報文。
- 請求數(shù)據(jù):通常為序列化之后得到的二進制流
- 狀態(tài):狀態(tài)字段用于標(biāo)識請求是否正常。一般由被調(diào)用方設(shè)置。例如一次 RPC 調(diào)用失敗,狀態(tài)字段可被服務(wù)提供方設(shè)置為異常狀態(tài)。
- 保留字段:保留字段是可選項,為了應(yīng)對協(xié)議升級的可能性,可以預(yù)留若干字節(jié)的保留字段,以備不時之需。
+---------------------------------------------------------------+ ? | 魔數(shù) 2byte | 協(xié)議版本號 1byte | 序列化算法 1byte | 報文類型 1byte | ? +---------------------------------------------------------------+ ? | 狀態(tài) 1byte | 保留字段 4byte | 數(shù)據(jù)長度 4byte | ? +---------------------------------------------------------------+ ? | 數(shù)據(jù)內(nèi)容 (長度不定) | ? +---------------------------------------------------------------+
舉例如下:
如何實現(xiàn)自定義通信協(xié)議
Netty 作為一個非常優(yōu)秀的網(wǎng)絡(luò)通信框架,已經(jīng)為我們提供了非常豐富的編解碼抽象基類,幫助我們更方便地基于這些抽象基類擴展實現(xiàn)自定義協(xié)議。
Netty 常用編碼器類型:
- MessageToByteEncoder 對象編碼成字節(jié)流;
- MessageToMessageEncoder 一種消息類型編碼成另外一種消息類型。
Netty 常用解碼器類型:
- ByteToMessageDecoder/ReplayingDecoder 將字節(jié)流解碼為消息對象;
- MessageToMessageDecoder 將一種消息類型解碼為另外一種消息類型。
編解碼器可以分為一次解碼器和二次解碼器,一次解碼器用于解決 TCP 拆包/粘包問題,按協(xié)議解析后得到的字節(jié)數(shù)據(jù)。
如果你需要對解析后的字節(jié)數(shù)據(jù)做對象模型的轉(zhuǎn)換,這時候便需要用到二次解碼器,同理編碼器的過程是反過來的。
- 一次編解碼器:MessageToByteEncoder/ByteToMessageDecoder。
- 二次編解碼器:MessageToMessageEncoder/MessageToMessageDecoder。
抽象編碼類
通過抽象編碼類的繼承圖可以看出,編碼類是 ChanneOutboundHandler 的抽象類實現(xiàn),具體操作的是 Outbound 出站數(shù)據(jù)。
MessageToByteEncoder
MessageToByteEncoder 用于將對象編碼成字節(jié)流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我們只需要實現(xiàn)encode 方法即可完成自定義編碼。
編碼器實現(xiàn)非常簡單,不需要關(guān)注拆包/粘包問題。
如下例子,展示了如何將字符串類型的數(shù)據(jù)寫入到 ByteBuf 實例,ByteBuf 實例將傳遞給 ChannelPipeline 鏈表中的下一個 ChannelOutboundHandler。
public class StringToByteEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(data.getBytes()); } }
encode什么時候被調(diào)用的
MessageToByteEncoder 重寫了 ChanneOutboundHandler 的 write() 方法,其主要邏輯分為以下幾個步驟:
- acceptOutboundMessage 判斷是否有匹配的消息類型,如果匹配需要執(zhí)行編碼流程,如果不匹配直接繼續(xù)傳遞給下一個 ChannelOutboundHandler;
- 分配 ByteBuf 資源,默認使用堆外內(nèi)存;
- 調(diào)用子類實現(xiàn)的 encode 方法完成數(shù)據(jù)編碼,一旦消息被成功編碼,會通過調(diào)用 ReferenceCountUtil.release(cast) 自動釋放;
- 如果 ByteBuf 可讀,說明已經(jīng)成功編碼得到數(shù)據(jù),然后寫入 ChannelHandlerContext 交到下一個節(jié)點;如果 ByteBuf 不可讀,則釋放 ByteBuf 資源,向下傳遞空的 ByteBuf 對象。
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { // 1. 消息類型是否匹配 @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); // 2. 分配 ByteBuf 資源 try { encode(ctx, cast, buf); // 3. 執(zhí)行 encode 方法完成數(shù)據(jù)編碼 } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); // 4. 向后傳遞寫事件 } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } }
MessageToMessageEncoder
MessageToMessageEncoder 與 MessageToByteEncoder 類似,同樣只需要實現(xiàn) encode 方法。
MessageToMessageEncoder常用的實現(xiàn)子類有StringEncoder
、LineEncoder
、Base64Encoder
等。
以StringEncoder
為例看下MessageToMessageEncoder
的用法。
源碼示例如下:將 CharSequence 類型(String、StringBuilder、StringBuffer 等)轉(zhuǎn)換成 ByteBuf 類型,結(jié)合 StringDecoder 可以直接實現(xiàn) String 類型數(shù)據(jù)的編解碼。
@Override protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception { if (msg.length() == 0) { return; } out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset)); }
抽象解碼類
解碼類是 ChanneInboundHandler 的抽象類實現(xiàn),操作的是 Inbound 入站數(shù)據(jù)。解碼器實現(xiàn)的難度要遠大于編碼器,因為解碼器需要考慮拆包/粘包問題。
由于接收方有可能沒有接收到完整的消息,所以解碼框架需要對入站的數(shù)據(jù)做緩沖操作,直至獲取到完整的消息。
ByteToMessageDecoder
使用 ByteToMessageDecoder,Netty 會自動進行內(nèi)存的釋放,我們不用操心太多的內(nèi)存管理方面的邏輯。
首先,我們看下 ByteToMessageDecoder 定義的抽象方法:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.isReadable()) { decodeRemovalReentryProtection(ctx, in, out); } } }
我們只需要實現(xiàn)一下decode()
方法,這里的 in 大家可以看到,傳遞進來的時候就已經(jīng)是 ByteBuf 類型,所以我們不再需要強轉(zhuǎn),第三個參數(shù)是List
類型,我們通過往這個List
里面添加解碼后的結(jié)果對象,就可以自動實現(xiàn)結(jié)果往下一個 handler 進行傳遞,這樣,我們就實現(xiàn)了解碼的邏輯 handler。
為什么存取解碼后的數(shù)據(jù)是用List
由于 TCP 粘包問題,ByteBuf 中可能包含多個有效的報文,或者不夠一個完整的報文。
Netty 會重復(fù)回調(diào) decode() 方法,直到?jīng)]有解碼出新的完整報文可以添加到 List 當(dāng)中,或者 ByteBuf 沒有更多可讀取的數(shù)據(jù)為止。
如果此時 List 的內(nèi)容不為空,那么會傳遞給 ChannelPipeline 中的下一個ChannelInboundHandler。
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0; i < numElements; i ++) { //循環(huán)傳播 有多少調(diào)用多少 ctx.fireChannelRead(msgs.getUnsafe(i)); } }
decodeLast
ByteToMessageDecoder 還定義了 decodeLast() 方法。為什么抽象解碼器要比編碼器多一個 decodeLast() 方法呢?
因為 decodeLast 在 Channel 關(guān)閉后會被調(diào)用一次,主要用于處理 ByteBuf 最后剩余的字節(jié)數(shù)據(jù)。Netty 中 decodeLast 的默認實現(xiàn)只是簡單調(diào)用了 decode() 方法。
如果有特殊的業(yè)務(wù)需求,則可以通過重寫 decodeLast() 方法擴展自定義邏輯。
ReplayingDecoder
ByteToMessageDecoder 還有一個抽象子類是 ReplayingDecoder。
它封裝了緩沖區(qū)的管理,在讀取緩沖區(qū)數(shù)據(jù)時,你無須再對字節(jié)長度進行檢查。
因為如果沒有足夠長度的字節(jié)數(shù)據(jù),ReplayingDecoder 將終止解碼操作。
ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情況下并不推薦使用 ReplayingDecoder。
MessageToMessageDecoder
與 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不會對數(shù)據(jù)報文進行緩存,它主要用作轉(zhuǎn)換消息模型。
比較推薦的做法是使用 ByteToMessageDecoder 解析 TCP 協(xié)議,解決拆包/粘包問題。
解析得到有效的 ByteBuf 數(shù)據(jù),然后傳遞給后續(xù)的 MessageToMessageDecoder 做數(shù)據(jù)對象的轉(zhuǎn)換,具體流程如下圖所示:
案例如下:
public class MyTcpDecoder extends ByteToMessageDecoder { ? @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 檢查ByteBuf數(shù)據(jù)是否完整 if (in.readableBytes() < 4) { return; } ? // 標(biāo)記ByteBuf讀取索引位置 in.markReaderIndex(); ? // 讀取數(shù)據(jù)包長度 int length = in.readInt(); ? // 如果ByteBuf中可讀字節(jié)數(shù)不足一個數(shù)據(jù)包長度,則將讀取索引位置恢復(fù)到標(biāo)記位置,等待下一次讀取 if (in.readableBytes() < length) { in.resetReaderIndex(); return; } ? // 讀取數(shù)據(jù) ByteBuf data = in.readBytes(length); ? // 將數(shù)據(jù)傳遞給下一個解碼器進行轉(zhuǎn)換,轉(zhuǎn)換后的數(shù)據(jù)對象添加到out中 ctx.fireChannelRead(data); } } ? public class MyDataDecoder extends MessageToMessageDecoder<ByteBuf> { ? @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { // 將讀取到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為自定義的數(shù)據(jù)對象 MyData data = decode(msg); if (data != null) { // 將轉(zhuǎn)換后的數(shù)據(jù)對象添加到out中,表示解碼成功 out.add(data); } } ? private MyData decode(ByteBuf buf) { // 實現(xiàn)自定義的數(shù)據(jù)轉(zhuǎn)換邏輯 // ... return myData; } }
實戰(zhàn)案例
如何判斷 ByteBuf 是否存在完整的報文? 最常用的做法就是通過讀取消息長度 dataLength 進行判斷。
如果 ByteBuf 的可讀數(shù)據(jù)長度小于 dataLength,說明 ByteBuf 還不夠獲取一個完整的報文。
在該協(xié)議前面的消息頭部分包含了魔數(shù)、協(xié)議版本號、數(shù)據(jù)長度等固定字段,共 14 個字節(jié)。
固定字段長度和數(shù)據(jù)長度可以作為我們判斷消息完整性的依據(jù),具體編碼器實現(xiàn)ByteToMessageDecoder
邏輯示例如下:
/* +---------------------------------------------------------------+ | 魔數(shù) 2byte | 協(xié)議版本號 1byte | 序列化算法 1byte | 報文類型 1byte | +---------------------------------------------------------------+ | 狀態(tài) 1byte | 保留字段 4byte | 數(shù)據(jù)長度 4byte | +---------------------------------------------------------------+ | 數(shù)據(jù)內(nèi)容 (長度不定) | +---------------------------------------------------------------+ */ @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // 判斷 ByteBuf 可讀取字節(jié) if (in.readableBytes() < 14) { return; } // 標(biāo)記 ByteBuf 讀指針位置 in.markReaderIndex(); // 跳過魔數(shù) in.skipBytes(2); // 跳過協(xié)議版本號 in.skipBytes(1); byte serializeType = in.readByte(); // 跳過報文類型 in.skipBytes(1); // 跳過狀態(tài)字段 in.skipBytes(1); // 跳過保留字段 in.skipBytes(4); // 驗證報文長度,不對的話就重置指針位置 int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); // 重置 ByteBuf 讀指針位置,這一步很重要 return; } byte[] data = new byte[dataLength]; in.readBytes(data); // 方式一:在解碼器中就將數(shù)據(jù)解碼成具體的對象 SerializeService serializeService = getSerializeServiceByType(serializeType); Object obj = serializeService.deserialize(data); if (obj != null) { out.add(obj); } // 方式二:這一步可以不在解碼器中處理,將請求數(shù)據(jù)讀取到一個新的byteBuf然后丟給handler處理 // 創(chuàng)建新的 ByteBuf 對象來存儲有效負載數(shù)據(jù) ByteBuf payload = Unpooled.buffer((int) dataSize); // 讀取有效負載數(shù)據(jù)并寫入到 payload 中 in.readBytes(payload); if (payload.isReadable()) { out.add(payload); } }
擴展
什么是字節(jié)序
字節(jié)順序,是指數(shù)據(jù)在內(nèi)存中的存放順序 使用16進制表示:0x12345678。在內(nèi)存中有兩種方法存儲這個數(shù)字,
不同在于,對于某一個要表示的值,是把值的低位存到低地址,還是把值的高位存到低地址。
字節(jié)順序分類
字節(jié)的排列方式有兩種。例如,將一個多字節(jié)對象的低位放在較小的地址處,高位放在較大的地址處,則稱小端序;反之則稱大端序。
典型的情況是整數(shù)在內(nèi)存中的存放方式(小端/主機字節(jié)序)和網(wǎng)絡(luò)傳輸?shù)膫鬏旐樞颍ù蠖?網(wǎng)絡(luò)字節(jié)序)
1. 網(wǎng)絡(luò)字節(jié)序(Network Order):TCP/IP各層協(xié)議將字節(jié)序定義為大端(Big Endian) ,因此TCP/IP協(xié)議中使用的字節(jié)序通常稱之為網(wǎng)絡(luò)字節(jié)序。
- 所以當(dāng)兩臺主機之間要通過TCP/IP協(xié)議進行通信的時候就需要調(diào)用相應(yīng)的函數(shù)進行主機序列(Little Endian)和網(wǎng)絡(luò)序(Big Endian)的轉(zhuǎn)換。
- 這樣一來,也就達到了與CPU、操作系統(tǒng)無關(guān),實現(xiàn)了網(wǎng)絡(luò)通信的標(biāo)準(zhǔn)化。
2. 主機字節(jié)序(Host Order): 整數(shù)在內(nèi)存中保存的順序,它遵循小端(Little Endian)規(guī)則(不一定,要看主機的CPU架構(gòu),不過大多數(shù)都是小端)。
- 同型號計算機上寫的程序,在相同的系統(tǒng)上面運行是沒有問題的。
總結(jié)
Java中虛擬機屏蔽了大小端問題,如果是Java之間通信則無需考慮,只有在跨語言通信的場景下才需要處理大小端問題。
回到本文的重點,我們在編解碼時也要注意大小端的問題,一般來說如果是小端序的話,我們用Netty取值的時候都要用LE結(jié)尾的方法。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java擴展庫RxJava的基本結(jié)構(gòu)與適用場景小結(jié)
RxJava(GitHub: https://github.com/ReactiveX/RxJava)能夠幫助Java進行異步與事務(wù)驅(qū)動的程序編寫,這里我們來作一個Java擴展庫RxJava的基本結(jié)構(gòu)與適用場景小結(jié),剛接觸RxJava的同學(xué)不妨看一下^^2016-06-06java中進程與線程_三種實現(xiàn)方式總結(jié)(必看篇)
下面小編就為大家?guī)硪黄猨ava中進程與線程_三種實現(xiàn)方式總結(jié)(必看篇)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06SpringBoot配置MyBatis-Plus實現(xiàn)增刪查改
本文主要介紹了SpringBoot配置MyBatis-Plus實現(xiàn)增刪查改,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08springboot中使用redis并且執(zhí)行調(diào)試lua腳本
今天有個項目需要使用redis,并且有使用腳本的需求,本文主要介紹了springboot中使用redis并且執(zhí)行調(diào)試lua腳本,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04Java多線程并發(fā)JUC包CountDownLatch閉鎖的實例
這篇文章主要介紹了Java多線程并發(fā)JUC包CountDownLatch閉鎖的實例,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-05-05Springboot WebFlux集成Spring Security實現(xiàn)JWT認證的示例
這篇文章主要介紹了Springboot WebFlux集成Spring Security實現(xiàn)JWT認證的示例,幫助大家更好的理解和學(xué)習(xí)使用springboot框架,感興趣的朋友可以了解下2021-04-04