亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用Netty解碼自定義通信協(xié)議方式

 更新時間:2025年05月22日 09:44:15   作者:Linn-cn  
這篇文章主要介紹了使用Netty解碼自定義通信協(xié)議方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

網(wǎng)絡(luò)協(xié)議的基本要素

一個完備的網(wǎng)絡(luò)協(xié)議需要具備哪些基本要素

  1. 魔數(shù):魔數(shù)是通信雙方協(xié)商的一個暗號,通常采用固定的幾個字節(jié)表示。魔數(shù)的作用是防止任何人隨便向服務(wù)器的端口上發(fā)送數(shù)據(jù)。
  2. 協(xié)議版本號:隨著業(yè)務(wù)需求的變化,協(xié)議可能需要對結(jié)構(gòu)或字段進行改動,不同版本的協(xié)議對應(yīng)的解析方法也是不同的。所以在生產(chǎn)級項目中強烈建議預(yù)留協(xié)議版本號這個字段。
  3. 序列化算法:表示數(shù)據(jù)發(fā)送方應(yīng)該采用何種方法將請求的對象轉(zhuǎn)化為二進制,以及如何再將二進制轉(zhuǎn)化為對象
  4. 報文類型:報文可能存在不同的類型。例如在 RPC 框架中有請求、響應(yīng)、心跳等類型的報文,在 IM 即時通信的場景中有登陸、創(chuàng)建群聊、發(fā)送消息、接收消息、退出群聊等類型的報文。
  5. 長度域字段:代表請求數(shù)據(jù)的長度,接收方根據(jù)長度域字段獲取一個完整的報文。
  6. 請求數(shù)據(jù):通常為序列化之后得到的二進制流
  7. 狀態(tài):狀態(tài)字段用于標(biāo)識請求是否正常。一般由被調(diào)用方設(shè)置。例如一次 RPC 調(diào)用失敗,狀態(tài)字段可被服務(wù)提供方設(shè)置為異常狀態(tài)。
  8. 保留字段:保留字段是可選項,為了應(yīng)對協(xié)議升級的可能性,可以預(yù)留若干字節(jié)的保留字段,以備不時之需。
+---------------------------------------------------------------+
?
| 魔數(shù) 2byte | 協(xié)議版本號 1byte | 序列化算法 1byte | 報文類型 1byte  |
?
+---------------------------------------------------------------+
?
| 狀態(tài) 1byte |        保留字段 4byte     |      數(shù)據(jù)長度 4byte     | 
?
+---------------------------------------------------------------+
?
|                   數(shù)據(jù)內(nèi)容 (長度不定)                          |
?
+---------------------------------------------------------------+

舉例如下:

如何實現(xiàn)自定義通信協(xié)議

Netty 作為一個非常優(yōu)秀的網(wǎng)絡(luò)通信框架,已經(jīng)為我們提供了非常豐富的編解碼抽象基類,幫助我們更方便地基于這些抽象基類擴展實現(xiàn)自定義協(xié)議。

Netty 常用編碼器類型:

  • MessageToByteEncoder 對象編碼成字節(jié)流;
  • MessageToMessageEncoder 一種消息類型編碼成另外一種消息類型。

Netty 常用解碼器類型:

  • ByteToMessageDecoder/ReplayingDecoder 將字節(jié)流解碼為消息對象;
  • MessageToMessageDecoder 將一種消息類型解碼為另外一種消息類型。

編解碼器可以分為一次解碼器和二次解碼器,一次解碼器用于解決 TCP 拆包/粘包問題,按協(xié)議解析后得到的字節(jié)數(shù)據(jù)。

如果你需要對解析后的字節(jié)數(shù)據(jù)做對象模型的轉(zhuǎn)換,這時候便需要用到二次解碼器,同理編碼器的過程是反過來的。

  • 一次編解碼器:MessageToByteEncoder/ByteToMessageDecoder。
  • 二次編解碼器:MessageToMessageEncoder/MessageToMessageDecoder。

抽象編碼類

通過抽象編碼類的繼承圖可以看出,編碼類是 ChanneOutboundHandler 的抽象類實現(xiàn),具體操作的是 Outbound 出站數(shù)據(jù)。

MessageToByteEncoder

MessageToByteEncoder 用于將對象編碼成字節(jié)流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我們只需要實現(xiàn)encode 方法即可完成自定義編碼。

編碼器實現(xiàn)非常簡單,不需要關(guān)注拆包/粘包問題。

如下例子,展示了如何將字符串類型的數(shù)據(jù)寫入到 ByteBuf 實例,ByteBuf 實例將傳遞給 ChannelPipeline 鏈表中的下一個 ChannelOutboundHandler。

public class StringToByteEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {
        byteBuf.writeBytes(data.getBytes());
    }
}

encode什么時候被調(diào)用的

MessageToByteEncoder 重寫了 ChanneOutboundHandler 的 write() 方法,其主要邏輯分為以下幾個步驟:

  • acceptOutboundMessage 判斷是否有匹配的消息類型,如果匹配需要執(zhí)行編碼流程,如果不匹配直接繼續(xù)傳遞給下一個 ChannelOutboundHandler;
  • 分配 ByteBuf 資源,默認使用堆外內(nèi)存;
  • 調(diào)用子類實現(xiàn)的 encode 方法完成數(shù)據(jù)編碼,一旦消息被成功編碼,會通過調(diào)用 ReferenceCountUtil.release(cast) 自動釋放;
  • 如果 ByteBuf 可讀,說明已經(jīng)成功編碼得到數(shù)據(jù),然后寫入 ChannelHandlerContext 交到下一個節(jié)點;如果 ByteBuf 不可讀,則釋放 ByteBuf 資源,向下傳遞空的 ByteBuf 對象。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) { // 1. 消息類型是否匹配
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect); // 2. 分配 ByteBuf 資源
            try {
                encode(ctx, cast, buf); // 3. 執(zhí)行 encode 方法完成數(shù)據(jù)編碼
            } finally {
                ReferenceCountUtil.release(cast);
            }
            if (buf.isReadable()) {
                ctx.write(buf, promise); // 4. 向后傳遞寫事件
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

MessageToMessageEncoder

MessageToMessageEncoder 與 MessageToByteEncoder 類似,同樣只需要實現(xiàn) encode 方法。

MessageToMessageEncoder常用的實現(xiàn)子類StringEncoder、LineEncoder、Base64Encoder等。

StringEncoder為例看下MessageToMessageEncoder 的用法。

源碼示例如下:將 CharSequence 類型(String、StringBuilder、StringBuffer 等)轉(zhuǎn)換成 ByteBuf 類型,結(jié)合 StringDecoder 可以直接實現(xiàn) String 類型數(shù)據(jù)的編解碼。

@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() == 0) {
        return;
    }
    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}

抽象解碼類

解碼類是 ChanneInboundHandler 的抽象類實現(xiàn),操作的是 Inbound 入站數(shù)據(jù)。解碼器實現(xiàn)的難度要遠大于編碼器,因為解碼器需要考慮拆包/粘包問題。

由于接收方有可能沒有接收到完整的消息,所以解碼框架需要對入站的數(shù)據(jù)做緩沖操作,直至獲取到完整的消息。

ByteToMessageDecoder

使用 ByteToMessageDecoder,Netty 會自動進行內(nèi)存的釋放,我們不用操心太多的內(nèi)存管理方面的邏輯。

首先,我們看下 ByteToMessageDecoder 定義的抽象方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }
}

我們只需要實現(xiàn)一下decode()方法,這里的 in 大家可以看到,傳遞進來的時候就已經(jīng)是 ByteBuf 類型,所以我們不再需要強轉(zhuǎn),第三個參數(shù)是List類型,我們通過往這個List里面添加解碼后的結(jié)果對象,就可以自動實現(xiàn)結(jié)果往下一個 handler 進行傳遞,這樣,我們就實現(xiàn)了解碼的邏輯 handler。

為什么存取解碼后的數(shù)據(jù)是用List

由于 TCP 粘包問題,ByteBuf 中可能包含多個有效的報文,或者不夠一個完整的報文。

Netty 會重復(fù)回調(diào) decode() 方法,直到?jīng)]有解碼出新的完整報文可以添加到 List 當(dāng)中,或者 ByteBuf 沒有更多可讀取的數(shù)據(jù)為止。

如果此時 List 的內(nèi)容不為空,那么會傳遞給 ChannelPipeline 中的下一個ChannelInboundHandler。

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        //循環(huán)傳播  有多少調(diào)用多少
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

decodeLast

ByteToMessageDecoder 還定義了 decodeLast() 方法。為什么抽象解碼器要比編碼器多一個 decodeLast() 方法呢?

因為 decodeLast 在 Channel 關(guān)閉后會被調(diào)用一次,主要用于處理 ByteBuf 最后剩余的字節(jié)數(shù)據(jù)。Netty 中 decodeLast 的默認實現(xiàn)只是簡單調(diào)用了 decode() 方法。

如果有特殊的業(yè)務(wù)需求,則可以通過重寫 decodeLast() 方法擴展自定義邏輯。

ReplayingDecoder

ByteToMessageDecoder 還有一個抽象子類是 ReplayingDecoder。

它封裝了緩沖區(qū)的管理,在讀取緩沖區(qū)數(shù)據(jù)時,你無須再對字節(jié)長度進行檢查。

因為如果沒有足夠長度的字節(jié)數(shù)據(jù),ReplayingDecoder 將終止解碼操作。

ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情況下并不推薦使用 ReplayingDecoder。

MessageToMessageDecoder

與 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不會對數(shù)據(jù)報文進行緩存,它主要用作轉(zhuǎn)換消息模型。

比較推薦的做法是使用 ByteToMessageDecoder 解析 TCP 協(xié)議,解決拆包/粘包問題。

解析得到有效的 ByteBuf 數(shù)據(jù),然后傳遞給后續(xù)的 MessageToMessageDecoder 做數(shù)據(jù)對象的轉(zhuǎn)換,具體流程如下圖所示:

案例如下:

public class MyTcpDecoder extends ByteToMessageDecoder {
?
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 檢查ByteBuf數(shù)據(jù)是否完整
        if (in.readableBytes() < 4) {
            return;
        }
?
        // 標(biāo)記ByteBuf讀取索引位置
        in.markReaderIndex();
?
        // 讀取數(shù)據(jù)包長度
        int length = in.readInt();
?
        // 如果ByteBuf中可讀字節(jié)數(shù)不足一個數(shù)據(jù)包長度,則將讀取索引位置恢復(fù)到標(biāo)記位置,等待下一次讀取
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
?
        // 讀取數(shù)據(jù)
        ByteBuf data = in.readBytes(length);
?
        // 將數(shù)據(jù)傳遞給下一個解碼器進行轉(zhuǎn)換,轉(zhuǎn)換后的數(shù)據(jù)對象添加到out中
        ctx.fireChannelRead(data);
    }
}
?
public class MyDataDecoder extends MessageToMessageDecoder<ByteBuf> {
?
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // 將讀取到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為自定義的數(shù)據(jù)對象
        MyData data = decode(msg);
        if (data != null) {
            // 將轉(zhuǎn)換后的數(shù)據(jù)對象添加到out中,表示解碼成功
            out.add(data);
        }
    }
?
    private MyData decode(ByteBuf buf) {
        // 實現(xiàn)自定義的數(shù)據(jù)轉(zhuǎn)換邏輯
        // ...
        return myData;
    }
}

實戰(zhàn)案例

如何判斷 ByteBuf 是否存在完整的報文? 最常用的做法就是通過讀取消息長度 dataLength 進行判斷。

如果 ByteBuf 的可讀數(shù)據(jù)長度小于 dataLength,說明 ByteBuf 還不夠獲取一個完整的報文。

在該協(xié)議前面的消息頭部分包含了魔數(shù)、協(xié)議版本號、數(shù)據(jù)長度等固定字段,共 14 個字節(jié)。

固定字段長度和數(shù)據(jù)長度可以作為我們判斷消息完整性的依據(jù),具體編碼器實現(xiàn)ByteToMessageDecoder邏輯示例如下:

/*
+---------------------------------------------------------------+
| 魔數(shù) 2byte | 協(xié)議版本號 1byte | 序列化算法 1byte | 報文類型 1byte  |
+---------------------------------------------------------------+
| 狀態(tài) 1byte |        保留字段 4byte     |      數(shù)據(jù)長度 4byte     | 
+---------------------------------------------------------------+
|                   數(shù)據(jù)內(nèi)容 (長度不定)                          |
+---------------------------------------------------------------+
 */
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 判斷 ByteBuf 可讀取字節(jié)
    if (in.readableBytes() < 14) { 
        return;
    }
    // 標(biāo)記 ByteBuf 讀指針位置
    in.markReaderIndex();
    // 跳過魔數(shù)
    in.skipBytes(2);
    // 跳過協(xié)議版本號
    in.skipBytes(1);
    byte serializeType = in.readByte();
     // 跳過報文類型
    in.skipBytes(1);
    // 跳過狀態(tài)字段
    in.skipBytes(1);
    // 跳過保留字段
    in.skipBytes(4);
    
    // 驗證報文長度,不對的話就重置指針位置
    int dataLength = in.readInt();
    if (in.readableBytes() < dataLength) {
        in.resetReaderIndex(); // 重置 ByteBuf 讀指針位置,這一步很重要
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);
    // 方式一:在解碼器中就將數(shù)據(jù)解碼成具體的對象
    SerializeService serializeService = getSerializeServiceByType(serializeType);
    Object obj = serializeService.deserialize(data);
    if (obj != null) {
        out.add(obj);
    }
    // 方式二:這一步可以不在解碼器中處理,將請求數(shù)據(jù)讀取到一個新的byteBuf然后丟給handler處理
    // 創(chuàng)建新的 ByteBuf 對象來存儲有效負載數(shù)據(jù)
    ByteBuf payload = Unpooled.buffer((int) dataSize);

    // 讀取有效負載數(shù)據(jù)并寫入到 payload 中
    in.readBytes(payload);
    if (payload.isReadable()) {
        out.add(payload);
    }
}

擴展

什么是字節(jié)序

字節(jié)順序,是指數(shù)據(jù)在內(nèi)存中的存放順序 使用16進制表示:0x12345678。在內(nèi)存中有兩種方法存儲這個數(shù)字,

不同在于,對于某一個要表示的值,是把值的低位存到低地址,還是把值的高位存到低地址。

字節(jié)順序分類

字節(jié)的排列方式有兩種。例如,將一個多字節(jié)對象的低位放在較小的地址處,高位放在較大的地址處,則稱小端序;反之則稱大端序。

典型的情況是整數(shù)在內(nèi)存中的存放方式(小端/主機字節(jié)序)和網(wǎng)絡(luò)傳輸?shù)膫鬏旐樞颍ù蠖?網(wǎng)絡(luò)字節(jié)序)

1. 網(wǎng)絡(luò)字節(jié)序(Network Order):TCP/IP各層協(xié)議將字節(jié)序定義為大端(Big Endian) ,因此TCP/IP協(xié)議中使用的字節(jié)序通常稱之為網(wǎng)絡(luò)字節(jié)序。

  • 所以當(dāng)兩臺主機之間要通過TCP/IP協(xié)議進行通信的時候就需要調(diào)用相應(yīng)的函數(shù)進行主機序列(Little Endian)和網(wǎng)絡(luò)序(Big Endian)的轉(zhuǎn)換。
  • 這樣一來,也就達到了與CPU、操作系統(tǒng)無關(guān),實現(xiàn)了網(wǎng)絡(luò)通信的標(biāo)準(zhǔn)化。

2. 主機字節(jié)序(Host Order): 整數(shù)在內(nèi)存中保存的順序,它遵循小端(Little Endian)規(guī)則(不一定,要看主機的CPU架構(gòu),不過大多數(shù)都是小端)。

  • 同型號計算機上寫的程序,在相同的系統(tǒng)上面運行是沒有問題的。

總結(jié)

Java中虛擬機屏蔽了大小端問題,如果是Java之間通信則無需考慮,只有在跨語言通信的場景下才需要處理大小端問題。

回到本文的重點,我們在編解碼時也要注意大小端的問題,一般來說如果是小端序的話,我們用Netty取值的時候都要用LE結(jié)尾的方法。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論