Netty?拆包沾包問題解決方案詳解
上一篇說到Springboot整合Netty,自定義協(xié)議實現(xiàn),本文聊一些拆包/沾包問題。
拆包/沾包問題
TCP是面向字節(jié)流的協(xié)議,在發(fā)送方發(fā)送的若干包數(shù)據(jù)到接收方接收時,這些數(shù)據(jù)包可能會被粘成一個數(shù)據(jù)包,而從接收緩沖區(qū)看,后一包數(shù)據(jù)的頭緊接著前一包數(shù)據(jù)的尾,這就形成沾包問題。
但如果一次請求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP 就會將其拆分為多次發(fā)送,這就是拆包問題,也就是將一個大的包拆分為多個小包進行發(fā)送,接收端接收到多個包才能組成一個完整數(shù)據(jù)。
為什么UDP沒有粘包
粘包/拆包問題在數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層以及傳輸層都有可能發(fā)生。日常的網(wǎng)絡(luò)應(yīng)用開發(fā)大都在傳輸層進行,由于UDP有消息保護邊界,不會發(fā)生粘包/拆包問題。
而TCP是面向字節(jié)流,沒有邊界,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時候,底層會有一個緩沖區(qū),通過這個緩沖區(qū)來進行優(yōu)化,例如緩沖區(qū)為1024個字節(jié)大小,如果一次發(fā)送數(shù)據(jù)量小于1024,則會合并多個數(shù)據(jù)作為一個數(shù)據(jù)包發(fā)送;如果一次發(fā)送數(shù)據(jù)量大于1024,則會將這個包拆分成多個數(shù)據(jù)包進行發(fā)送。上述兩種情況也是沾包和拆包問題。
上圖出現(xiàn)的四種情況包括:
- 正常發(fā)送,兩個包恰好滿足TCP緩沖區(qū)的大小或達到TCP等待時長,分別發(fā)送兩個包。
- 沾包:D1、D2都過小,兩者進行了沾包處理。
- 拆包沾包:D2過大,進行了拆包處理,而拆出去的一部分D2_1又與D1進行粘包處理。
- 沾包拆包:D1過大,進行了拆包處理,而拆出去的一部分D1_2又與D2進行粘包處理。
解決方案
對于粘包和拆包問題,通??梢允褂眠@四種解決方案:
- 使用固定數(shù)據(jù)長度進行發(fā)送,發(fā)送端將每個包都封裝成固定的長度,比如100字節(jié)大小。如果不足100字節(jié)可通過補0等填充到指定長度再發(fā)送。
- 發(fā)送端在每個包的末尾使用固定的分隔符,例如
##@##
。如果發(fā)生拆包需等待多個包發(fā)送過來之后再找到其中的##@##
進行合并。如果發(fā)送沾包則找到其中的##@##
進行拆分。 - 將消息分為頭部和消息體,頭部中保存整個消息的長度,這種情況下接收端只有在讀取到足夠長度的消息之后,才算是接收到一個完整的消息。
- 通過自定義協(xié)議進行粘包和拆包的處理。
Netty拆包沾包處理
Netty對解決粘包和拆包的方案做了抽象,提供了一些解碼器(Decoder)來解決粘包和拆包的問題。如:
LineBasedFrameDecoder
:以行為單位進行數(shù)據(jù)包的解碼,使用換行符\n
或者\r\n
作為依據(jù),遇到\n
或者\r\n
都認為是一條完整的消息。
DelimiterBasedFrameDecoder
:以特殊的符號作為分隔來進行數(shù)據(jù)包的解碼。 FixedLengthFrameDecoder
:以固定長度進行數(shù)據(jù)包的解碼。
LenghtFieldBasedFrameDecode
:適用于消息頭包含消息長度的協(xié)議(最常用)。
基于Netty進行網(wǎng)絡(luò)讀寫的程序,可以直接使用這些Decoder來完成數(shù)據(jù)包的解碼。對于高并發(fā)、大流量的系統(tǒng)來說,每個數(shù)據(jù)包都不應(yīng)該傳輸多余的數(shù)據(jù)(所以補齊的方式不可?。?code>LenghtFieldBasedFrameDecode更適合這樣的場景。
LineBasedFrameDecoder
使用LineBasedFrameDecoder解決粘包問題,其會根據(jù)"\n"或"\r\n"對二進制數(shù)據(jù)進行拆分,封裝到不同的ByteBuf實例中
/** * 服務(wù)啟動器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時時間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 通過換行符處理沾包/拆包 .childHandler(new NettyServerLineBasedHandler()); return serverBootstrap; }
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 使用LineBasedFrameDecoder解決粘包問題,其會根據(jù)"\n"或"\r\n"對二進制數(shù)據(jù)進行拆分,封裝到不同的ByteBuf實例中,并且每次查找的最大長度為1024字節(jié) pipeline.addLast(new LineBasedFrameDecoder(1024, true, true)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實例 pipeline.addLast(new MessageDecodeHandler()); // 對發(fā)送客戶端的數(shù)據(jù)進行編碼 pipeline.addLast(new MessageEncodeHandler()); // 對數(shù)據(jù)進行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
DelimiterBasedFrameDecoder
以特殊的符號作為分隔來進行數(shù)據(jù)包的解碼,上文中就是以##@##
作為分割符作為示例展開講解的。這里再粘貼一下關(guān)鍵代碼: 使用DelimiterBasedFrameDecoder
處理拆包/沾包,并且每次查找的最大長度為1024字節(jié)。
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 數(shù)據(jù)分割符 String delimiterStr = "##@##"; ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes()); ChannelPipeline pipeline = socketChannel.pipeline(); // 使用自定義分隔符處理拆包/沾包,并且每次查找的最大長度為1024字節(jié) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實例 pipeline.addLast(new MessageDecodeHandler()); // 對發(fā)送客戶端的數(shù)據(jù)進行編碼,并添加數(shù)據(jù)分隔符 pipeline.addLast(new MessageEncodeHandler(delimiterStr)); // 對數(shù)據(jù)進行最終處理 pipeline.addLast(new ServerListenerHandler()); }
MessageEncodeHandler
對發(fā)送數(shù)據(jù)進行添加分割符并編碼操作
public class MessageEncodeHandler extends MessageToByteEncoder<Message> { // 數(shù)據(jù)分割符 String delimiter; public MessageEncodeHandler(String delimiter) { this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8)); } }
FixedLengthFrameDecoder
服務(wù)端代碼設(shè)置,在NettyConfig
配置中將worker處理器改為NettyServerFixedLengthHandler
,使用固定100字節(jié)長度處理消息。
/** * 服務(wù)啟動器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時時間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 指定為固定長度字節(jié)的處理器 .childHandler(new NettyServerFixedLengthHandler()); return serverBootstrap; }
NettyServerFixedLengthHandler
類代碼,使用FixedLengthFrameDecoder
設(shè)置按固定100字節(jié)數(shù)去拆分接收到的ByteBuf。并自定義一個消息編碼器,對字節(jié)長度不足100字節(jié)的消息進行補0操作。
public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 固定字節(jié)長度 Integer length = 100; ChannelPipeline pipeline = socketChannel.pipeline(); // 按固定100字節(jié)數(shù)拆分接收到的ByteBuf的解碼器 pipeline.addLast(new FixedLengthFrameDecoder(length)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實例 pipeline.addLast(new MessageDecodeHandler()); // 對發(fā)送客戶端的數(shù)據(jù)進行自定義編碼,并設(shè)置字節(jié)長度不足補0 pipeline.addLast(new MessageEncodeFixedLengthHandler(length)); // 對數(shù)據(jù)進行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
自定義MessageEncodeFixedLengthHandler
編碼類,使用固定字節(jié)長度編碼消息,字節(jié)長度不足時補0。
public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> { private int length; public MessageEncodeFixedLengthHandler(int length) { this.length = length; } /** * 使用固定字節(jié)長度編碼消息,字節(jié)長度不足時補0 * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link ByteBuf} into which the encoded message will be written * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { String jsonStr = msg.toJsonString(); // 如果長度不足,則進行補0 if (jsonStr.length() < length) { jsonStr = addSpace(jsonStr); } // 使用Unpooled.wrappedBuffer實現(xiàn)零拷貝,將字符串轉(zhuǎn)為ByteBuf ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes())); } /** * 如果沒有達到指定長度進行補0 * * @param msg * @return */ private String addSpace(String msg) { StringBuilder builder = new StringBuilder(msg); for (int i = 0; i < length - msg.length(); i++) { builder.append(0); } return builder.toString(); } }
LenghtFieldBasedFrameDecode
LenghtFieldBasedFrameDecode
適用于消息頭包含消息長度的協(xié)議,根據(jù)消息長度判斷是否讀取完一個數(shù)據(jù)包。
/** * 服務(wù)啟動器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時時間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 請求頭包含數(shù)據(jù)長度 .childHandler(new NettyServerLenghtFieldBasedHandler()); return serverBootstrap; }
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 請求頭包含數(shù)據(jù)長度,根據(jù)長度進行沾包拆包處理 /** * maxFrameLength:指定了每個包所能傳遞的最大數(shù)據(jù)包大小; * lengthFieldOffset:指定了長度字段在字節(jié)碼中的偏移量; * lengthFieldLength:指定了長度字段所占用的字節(jié)長度; * lengthAdjustment:對一些不僅包含有消息頭和消息體的數(shù)據(jù)進行消息頭的長度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長度; * initialBytesToStrip:對于長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的字節(jié)。 */ pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 在請求頭添加字節(jié)長度字段 pipeline.addLast(new LengthFieldPrepender(2)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實例 pipeline.addLast(new MessageDecodeHandler()); // 對發(fā)送客戶端的數(shù)據(jù)進行編碼,字節(jié)長度不足補0 pipeline.addLast(new MessageEncodeHandler()); // 對數(shù)據(jù)進行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
總結(jié)
造成TCP協(xié)議粘包/拆包問題的原因是TCP協(xié)議數(shù)據(jù)傳輸是基于字節(jié)流的,它不包含消息、數(shù)據(jù)包等概念,是無界的,需要應(yīng)用層協(xié)議自己設(shè)計消息的邊界,即消息幀(Message Framing)。如果應(yīng)用層協(xié)議沒有使用基于長度或者基于分隔符(終結(jié)符)劃分邊界等方式進行處理,則會導(dǎo)致多個消息的粘包和拆包。
以上就是Netty 拆包沾包問題解決方案示例的詳細內(nèi)容,更多關(guān)于Netty 拆包沾包解決方案的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot2整合activiti6環(huán)境搭建過程解析
這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11java學(xué)生信息管理系統(tǒng)MVC架構(gòu)詳解
這篇文章主要為大家詳細介紹了java學(xué)生信息管理系統(tǒng)MVC架構(gòu)的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-11-11Spring boot從安裝到交互功能實現(xiàn)零基礎(chǔ)全程詳解
這篇文章主要介紹了Spring boot從安裝到交互功能得實現(xiàn)全程詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07java隨機抽取指定范圍內(nèi)不重復(fù)的n個數(shù)
這篇文章主要為大家詳細介紹了java隨機抽取指定范圍內(nèi)不重復(fù)的n個數(shù),感興趣的小伙伴們可以參考一下2016-02-02