Netty開發(fā)及粘包實戰(zhàn)解決分析
1. Netty介紹
Netty是一款開源的Java網(wǎng)絡編程框架,廣泛應用于很多高流量的服務器端應用程序:
- 異步和事件驅動:Netty基于NIO(非阻塞I/O)構建,操作都是異步回調來觸發(fā)事件,如連接建立、數(shù)據(jù)到達等。
- 高性能:Netty的一大優(yōu)點就是高性能。它的設計能夠讓你最大限度地利用現(xiàn)代的多核硬件。
- 靈活的協(xié)議支持:Netty支持各種協(xié)議,包括TCP、UDP、HTTP/HTTPS、Unix Socket、WebSockets等。
- 零拷貝:Netty支持“零拷貝”,這可以減少不必要的系統(tǒng)調用,顯著提高數(shù)據(jù)處理性能。
Netty 目前最新版本是 4.1.95Final
很久之前 Netty就發(fā)布了 5 的測試版本,市場上都有很多介紹 Netty5 的書在賣了,但可惜問題太多,最終廢棄了,目前依然只維護 4 的版本。
1.1. 組件
1.1.1. EventLoopGroup
EventLoopGroup 是一個線程池,用于管理和調度 EventLoop 對象。在 Netty 中,每個 EventLoopGroup 有一個或多個 EventLoop,用于處理連接請求和 I/O 操作,而每個EventLoop是單線程的。
所以Netty可以通過EventLoopGroup的構造調參,來實現(xiàn)不同的Reactor模型:
(1)既可也是單Reactor單線程模型:
EventLoopGroup group = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)
(2)也可以是 主從Reactor多線程模型:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(n); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup)
主從分工
- BossEventLoopGroup:負責接收客戶端的連接請求。并將連接請求分配給workerEventLoopGroup中的某個EventLoop進行處理。BossGroup中通常只需一個EventLoop;
- WorkerEventLoopGroup:負責處理服務器端的連接和數(shù)據(jù)讀寫操作。每個EventLoop都綁定在一個具體的線程上,在運行過程中只處理該線程所監(jiān)聽的IO事件。 workerGroup通常需要多個EventLoop。
1.1.2. EventLoop
EventLoop 則是事件循環(huán)的核心,負責監(jiān)聽和處理 Channel 中的事件和 I/O 操作。在 EventLoopGroup 中,每個 EventLoop 都是獨立的,可以并發(fā)地處理多個 Channel 上的事件和 I/O 操作。
1.1.3. Channel 和 ByteBuf
定義
- Channel:代表了一個網(wǎng)絡通道,可以用于進行網(wǎng)絡通信。通過使用 Channel,我們可以連接到遠程服務器、發(fā)送和接收數(shù)據(jù)等。
- ByteBuf:則是用于管理和操作數(shù)據(jù)的緩沖區(qū),通過使用 ByteBuf,我們可以進行數(shù)據(jù)的讀、寫、復制、切片、合并等操作。
搭配使用
在 Netty 中,Channel 和 ByteBuf 是緊密結合的,通常一次數(shù)據(jù)傳輸會涉及到兩個 Channel 和兩個 ByteBuf 對象,分別代表了發(fā)送端和接收端的數(shù)據(jù)緩沖區(qū)。以下是 Channel 和 ByteBuf 的搭配使用流程:
- 創(chuàng)建 Channel:首先,我們需要創(chuàng)建一個 Channel 對象,用于表示一個網(wǎng)絡通道??梢酝ㄟ^ Bootstrap 或 ServerBootstrap 類來創(chuàng)建 Channel 對象,并配置其參數(shù)和屬性。
- 寫入數(shù)據(jù):當需要向遠程服務器發(fā)送數(shù)據(jù)時,我們需要先將數(shù)據(jù)寫入到 ByteBuf 對象中,然后將 ByteBuf 對象寫入到 Channel 對象中。在寫入數(shù)據(jù)時,可以通過 write() 或 writeAndFlush() 方法來實現(xiàn)。
- 讀取數(shù)據(jù):當遠程服務器發(fā)送數(shù)據(jù)時,我們需要通過 Channel 對象來讀取數(shù)據(jù)。讀取數(shù)據(jù)時,Channel 對象會將數(shù)據(jù)存儲到 ByteBuf 對象中,我們可以通過 read() 方法來獲取數(shù)據(jù)或數(shù)據(jù)大小。在讀取數(shù)據(jù)之后,我們需要及時釋放 ByteBuf 對象,以便避免內存泄漏和內存溢出等問題。
- 釋放資源:當數(shù)據(jù)傳輸完成后,我們需要釋放 Channel 和 ByteBuf 對象的資源。在 Netty 中,Channel 和 ByteBuf 對象都需要顯式地釋放資源,以避免內存泄漏和內存溢出等問題??梢酝ㄟ^ release() 方法來釋放 ByteBuf 對象,通過 close() 方法來釋放 Channel 對象。
1.1.4. ChannelPipeline 和 Channel
定義
- Channel:對象表示一個通信通道,可以進行數(shù)據(jù)的讀寫和事件的觸發(fā)等操作。
- ChannelPipeline:則是一個事件處理器的鏈表,用于處理 Channel 中的事件和數(shù)據(jù)。
每個 Channel 都有一個關聯(lián)的 ChannelPipeline 對象,當有事件發(fā)生時,Netty 會將事件從 Channel 中傳遞到 ChannelPipeline 中,然后按照順序依次觸發(fā)各個事件處理器 ChannelHandler 的邏輯。當事件處理完畢后,Netty 會將處理結果返回到 Channel 中,以便進行數(shù)據(jù)的讀寫等操作。
在 ChannelPipeline 中,可以添加多個事件處理器,用于處理不同類型的事件和數(shù)據(jù)。例如,可以添加一個消息解碼器、一個消息編碼器、一個業(yè)務邏輯處理器等。每個事件處理器都可以進行特定的邏輯處理,并將處理結果傳遞給下一個事件處理器。
1.2. 網(wǎng)絡協(xié)議
Netty是一個非常強大和靈活的網(wǎng)絡編程框架,它支持多種通信協(xié)議。以下是一些Netty支持的通信協(xié)議:
- TCP/IP 和 UDP/IP:Netty 提供了底層的網(wǎng)絡通信支持,可以構建基于TCP/IP或UDP/IP的應用。
- HTTP/HTTPS and HTTP/2:Netty 提供了HTTP、HTTPS以及HTTP/2的高級支持。
- WebSocket:Netty 支持 WebSocket,允許 Web 瀏覽器和服務器之間進行全雙工通信。
- Google Protobuf:Netty 為 Google 的 Protobuf 序列化庫提供了支持。
- SSL/TLS:通過JDK的Secure Socket Extension (JSSE),Netty 支持 SSL/TLS 實現(xiàn)安全通信。
- Unix Domain Socket:從 Netty 4.1版本開始,Netty也開始支持 Unix Domain Socket。
因為 Netty 支持的網(wǎng)絡協(xié)議豐富,所以當有非Http協(xié)議網(wǎng)絡通信的需求時,大家第一時間會想到 Netty。
2. 代碼示例
2.1. 基于tcp協(xié)議
2.1.1.服務端
pom
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.63.Final</version> </dependency>
服務端
@Component public class NettyServer { // 創(chuàng)建兩個線程組,分別用于接收客戶端連接和處理網(wǎng)絡IO操作 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); @PostConstruct public void start() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 指定使用 NioServerSocketChannel 作為通道實現(xiàn) .channel(NioServerSocketChannel.class) // 定義 ChannelPipeline(多個ChannelHandler組合) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ServerHandler()); } }); // 綁定端口,開始接收進來的連接 ChannelFuture f = b.bind(8080).sync(); if (f.isSuccess()) { System.out.println("啟動Netty服務成功,端口號:" + 8080); } } @PreDestroy public void shutdown() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ChannelHandler 消息處理
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("Received message from client: " + msg); // 回復消息給客戶端 //ctx.writeAndFlush("Received your message: " + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
2.1.2.客戶端
客戶端
@DependsOn({"nettyServer"}) @Component public class NettyClient { private EventLoopGroup group; private Channel channel; @PostConstruct public void start() throws InterruptedException { group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync(); if (future.isSuccess()) { System.out.println("連接服務器成功"); } channel = future.channel(); } @PreDestroy public void destroy() { if (group != null) { group.shutdownGracefully(); } }
ChannelHandler 消息處理
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("Server response:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
2.2. 基于Unix Socket協(xié)議
其他的不變,這里只關注客戶服務端代碼。
2.2.1. 代碼
服務端
private final EventLoopGroup bossGroup = new KQueueEventLoopGroup(); private final EventLoopGroup workerGroup = new KQueueEventLoopGroup(); @PostConstruct public void start() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(KQueueServerDomainSocketChannel.class) .childHandler(new ChannelInitializer<KQueueDomainSocketChannel>() { @Override public void initChannel(KQueueDomainSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture f = b.bind(new DomainSocketAddress("/tmp/test.sock")).sync(); if (f.isSuccess()) { System.out.println("啟動Netty服務成功,文件:" + "/tmp/test.sock"); } }
客戶端
public void start() throws InterruptedException { group = new KQueueEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(KQueueDomainSocketChannel.class) .handler(new ChannelInitializer<KQueueDomainSocketChannel>() { @Override protected void initChannel(KQueueDomainSocketChannel socketChannel) { socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(new DomainSocketAddress("/tmp/test.sock")).sync(); if (future.isSuccess()) { System.out.println("連接服務器成功"); } channel = future.channel(); }
2.2.2. 分析
Unix Socket協(xié)議
Unix Domain Socket(簡稱UDS)是一個用于實現(xiàn)本地進程間通信的協(xié)議。與使用網(wǎng)絡套接字(socket)進行通信不同,UDS僅用于同一臺機器上的相鄰進程之間的通信。
在Unix/Linux系統(tǒng)中,UDS通常被用于代替TCP/IP套接字來提高性能和安全性。不過它們可以通過文件系統(tǒng)路徑來建立連接,不能跨機器通信。
Netty中協(xié)議切換
通過對比上述代碼,可以看出netty中切換協(xié)議是比較簡單的,換成對應的 Channel 實現(xiàn)類,以及連接方式就可以了。
因為是mac中運行,示例代碼中用KQueueDomainSocketChannel替代DomainSocketChannel
2.3. 測試
Controller發(fā)消息
@RestController public class MsgController { @Autowired private NettyClient nettyClient; @PostMapping("/send") public ResponseEntity<Void> sendMsg(@RequestBody String msg) { System.out.println(msg.getBytes(StandardCharsets.UTF_8).length); try { for (int i = 0; i < 1000; i++) { nettyClient.send(msg); } return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR); } } }
測試結果
前面已經基于 TCP協(xié)議寫好了netty的客戶端、服務端,
現(xiàn)在寫接口,可以通過客戶端給服務端發(fā)消息,不過單次調用會一次性發(fā)1000遍。
接口調用傳入:hello
預期結果:
Received message from client: hello
Received message from client: hello
... ... // 同樣輸出1000遍
實際結果:
Received message from client: hello
Received message from client: hellohello
Received message from client: hellohe
Received message from client: llohellohellohellohello
Received message from client: hellohellohello
... ... // 無規(guī)則
出現(xiàn)問題的原因就是下一章要將的粘包、拆包問題。
3. 粘包、拆包問題
3.1. 問題分析
3.1.1. tcp協(xié)議出現(xiàn)問題的原因
粘包/拆包問題是由TCP協(xié)議本身造成的,和Netty本身無關,任何基于TCP協(xié)議實現(xiàn)數(shù)據(jù)傳輸?shù)募夹g都會面臨這個問題,原因如下:
- 應用程序寫入數(shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小:這種情況下,會發(fā)生拆包現(xiàn)象,發(fā)送方的TCP協(xié)議棧會把一次應用程序的發(fā)送操作分成多個數(shù)據(jù)段進行發(fā)送。
- 進行了多次寫入操作,但數(shù)據(jù)沒有被及時發(fā)送出去:這可能是由于TCP的Nagle算法造成的。
- 應用程序讀取操作不及時:如果接收方面的應用層沒有及時讀取接收緩沖區(qū)的數(shù)據(jù),造成堆積,從而形成一個大的數(shù)據(jù)塊。如果此時應用層進行數(shù)據(jù)讀取,就容易讀取到多個TCP數(shù)據(jù)段的數(shù)據(jù),形成了粘包現(xiàn)象。
- 網(wǎng)絡環(huán)境等硬件問題:如網(wǎng)絡延遲、抖動等,也可能導致多個小的數(shù)據(jù)包合并為一個大包進行傳送,從而導致粘包。
解決粘包和拆包問題的基本策略就是在應用層引入數(shù)據(jù)邊界。常見的方法有:固定長度、分隔符、在包頭中加入長度字段等。
3.1.2. 其他協(xié)議為什么沒問題
HTTP 協(xié)議
HTTP 協(xié)議 基于 TCP 協(xié)議構建,而 TCP 是一種面向流的協(xié)議,所以理論上可能會有粘包問題。但是在實際應用中,HTTP 協(xié)議已經做了明確的分包處理,因此通常不需要開發(fā)者去處理粘包問題,HTTP 使用了一些特定的方式來定義數(shù)據(jù)包的邊界:
對于 HTTP/1.0 和 HTTP/1.1,一次完整的 HTTP 交互由一個請求和一個響應組成,它們都是相對獨立的。請求和響應都有明確的開始行(請求行或狀態(tài)行)和結束標志(如 Content-Length 頭或 chunked 編碼表示的消息體長度)。這樣可以很清楚地知道報文的開始和結束,避免了粘包問題。
對于 HTTP/2,它引入了二進制幀的概念。每個幀有明確的長度和類型,這也使得在接收端可以準確地解析出各個幀,避免粘包問題。
UDP 協(xié)議
UDP 協(xié)議 是一種無連接的、不可靠的協(xié)議,它并沒有像TCP協(xié)議那樣提供流量控制和擁塞控制等功能,因此在傳輸過程中可能會出現(xiàn)丟包或亂序等問題。由于UDP協(xié)議采用數(shù)據(jù)報方式進行傳輸,每個UDP數(shù)據(jù)報都有獨立的頭部標識,因此不會出現(xiàn)粘包問題。
WebSocket 協(xié)議
WebSocket 協(xié)議 建立連接后,客戶端和服務器之間會保持長時間的連接狀態(tài),可以隨時發(fā)送和接收數(shù)據(jù)。當服務器發(fā)送數(shù)據(jù)時,會將數(shù)據(jù)封裝到一個完整的WebSocket幀中,并通過TCP協(xié)議進行傳輸。而客戶端收到數(shù)據(jù)后,會從WebSocket幀中解析出數(shù)據(jù),并進行相應處理。這樣就避免了TCP協(xié)議中的“粘包”和“拆包”問題。
3.1.3. Unix Socket 為什么也有問題
Unix Socket(也被稱為 Unix Domain Socket,UDS)主要支持以下兩種類型的通信協(xié)議:
- 流式協(xié)議 (SOCK_STREAM): 類似于 TCP,在發(fā)送和接收數(shù)據(jù)時提供了字節(jié)流服務。數(shù)據(jù)在兩個方向上都是有序的,并且不會重復或者丟失。這種模式下,一端發(fā)送的數(shù)據(jù)順序和另一端接收的數(shù)據(jù)順序是相同的。
- 數(shù)據(jù)報協(xié)議 (SOCK_DGRAM): 這種類型的 socket 提供了一種無需連接的、固定大小的消息服務,類似于 UDP。每次讀操作都返回最多一條完整的消息;如果消息超出緩沖區(qū)的大小,那么該消息可能會被截斷。
Unix Socket 的這兩種模式在行為上與 TCP 和 UDP 很相似。因此在基于 SOCK_STREAM 協(xié)議使用 Netty 開發(fā)服務端和客戶端時,可能會出現(xiàn)類似粘包的問題。
前面有現(xiàn)成的基于Unix Stream協(xié)議實現(xiàn)的代碼,我們同樣調用接口試一下,發(fā)現(xiàn) Unix Socket 同樣會產生粘包問題
解決思路
結合HTTP、UDP、WebSocket 解決粘包/拆包問題的思路,同樣也可以推導解決TCP問題的思路:在發(fā)送數(shù)據(jù)時,應該設計一種協(xié)議來確定消息的邊界,比如:添加特殊的分隔符,或者在每個消息的頭部包含消息的長度等。
基于這個思路,Netty 框架提供了 LineBasedFrameDecoder、DelimiterBasedFrameDecoder和 LengthFieldBasedFrameDecoder等解決方案,下面一一介紹。
3.2. 解決方案
3.2.1. LineBasedFrameDecoder
使用行結束符作為數(shù)據(jù)包的分隔符。每條消息后面都有一個行結束符(例如 \n 或 \r\n),它會一直讀取字節(jié)直到遇到這個結束符,然后把之前讀取到的字節(jié)組裝成一條消息。
如果沒有找到行結束符,那么就認為當前還沒有讀取到完整的數(shù)據(jù)包,需要將已經讀取到的字節(jié)保存起來,等待下次讀取。
代碼-客戶端修改
發(fā)送消息的方法中,每條消息結尾都加上行結束符后綴:
public void send(String msg) { if (channel != null) { channel.writeAndFlush(msg + "\\n"); } else { System.out.println("message sending failed, connection not established"); } }
代碼-服務端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
局限性
- 固定的分隔符:主要是通過 \n 或 \r\n 來標識一個完整的消息。這意味著如果你的協(xié)議中沒有使用這兩個字符作為結束標記,或者這兩個字符在消息體中有特殊含義,則不能正確工作。
- 只支持文本數(shù)據(jù): 主要設計為處理文本協(xié)議。對于二進制數(shù)據(jù),尤其是包含
\n
或\r\n
的二進制數(shù)據(jù),可能會出現(xiàn)誤切割的情況。 - 無法處理大數(shù)據(jù)包: 如果一個非常大的數(shù)據(jù)塊在沒有任何分隔符的情況下被發(fā)送,會消耗大量內存來存儲這些數(shù)據(jù),直到找到一個分隔符。這可能會導致內存溢出問題。所以構造方法中要設置 maxLength 參數(shù)(如示例中的 1024)。
3.2.2. DelimiterBasedFrameDecoder
解決方式
和LineBasedFrameDecoder類似,當接收到數(shù)據(jù)時,會檢查是否存在分隔符。如果存在,它就認為已經讀取到了一個完整的消息,并將這個消息傳遞給下一個ChannelHandler進行處理。如果不存在,它將繼續(xù)等待,直到讀取到分隔符。
區(qū)別在于,前者的分隔符固定,而它的分隔符可以自定義。
代碼-客戶端修改
發(fā)送消息的方法中,每條消息結尾都加上行結束符后綴:
public void send(String msg) { if (channel != null) { channel.writeAndFlush(msg + "$_"); } else { System.out.println("message sending failed, connection not established"); } }
代碼-服務端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
局限性
- 依賴于特定的分隔符:需要依賴特定的分隔符來判定一個消息的結束,但是在某些情況下,這樣的分隔符可能并不存在,或者不能很好地被應用在該協(xié)議上。同樣可能出現(xiàn)誤判。
- 不適合二進制協(xié)議: 由于DelimiterBasedFrameDecoder主要是針對文本協(xié)議設計的,所以在處理一些二進制協(xié)議時可能會遇到困難。
- 內存問題: 如果一個非常大的數(shù)據(jù)塊在沒有任何分隔符的情況下被發(fā)送,DelimiterBasedFrameDecoder可能會消耗過多的內存來存儲這些數(shù)據(jù),直到找到一個分隔符。這可能會導致內存溢出問題。所以也需要設置 maxFrameLength(如示例中的 1024)。
3.2.3. FixedLengthFrameDecoder
解決方式
工作原理主要是每次從 ByteBuf 中讀取固定長度的字節(jié),然后構造成一個獨立的 frame 對象,傳遞給下一個 handler 處理。
這樣可以確保不會因為 TCP 粘包導致多個消息被當作一個消息處理,也不會因為 TCP 拆包導致一個消息被當作多個消息處理。
代碼-服務端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
因為傳輸?shù)膮?shù)“hello”是5個字節(jié),這類就固定為5.
局限性
- 固定長度限制: FixedLengthFrameDecoder 只能處理固定長度的消息,如果實際應用中的消息長度不固定,那么就無法使用 FixedLengthFrameDecoder 進行解碼。相應地,如果消息長度小于固定長度,那么必須填充到固定長度,這就可能會浪費帶寬。
- 無內置校驗: FixedLengthFrameDecoder 僅僅是按照固定長度切分消息,它并不關心消息的完整性和正確性。如果你想對消息進行校驗,需要自己實現(xiàn)。
3.2.4. LengthFieldBasedFrameDecoder
解決方式
- 長度字段標識: LengthFieldBasedFrameDecoder 解決粘包問題的方式主要是通過在數(shù)據(jù)包中添加一個表示后續(xù)數(shù)據(jù)長度的字段,這個字段的位置和長度可以由開發(fā)者自定義,解碼器會根據(jù)這個長度字段得知具體的消息體長度,然后進行正確的截取。
- 校驗讀取: 當接收到新的數(shù)據(jù)包時,解碼器首先找到長度字段,讀取出消息體的長度,然后等待足夠長度的數(shù)據(jù)到達后,再從 ByteBuf 中讀取,形成一個完整的消息幀。
- 消除半包讀取: 通過以上方式,LengthFieldBasedFrameDecoder 可以確保每次都能從 ByteBuf 中讀取到完整的消息幀,不會出現(xiàn)只讀取到半個消息幀的情況。
在網(wǎng)絡通信中,發(fā)送和接收數(shù)據(jù)需要遵循同一種協(xié)議。LengthFieldBasedFrameDecoder 是一個基于長度字段的解碼器,而 LengthFieldPrepender 則是一個對應的編碼器,它會在消息體前面加上一個長度字段。
它們一般會配套使用,這樣發(fā)送端發(fā)送的數(shù)據(jù)和接收端接收的數(shù)據(jù)結構就會保持一致,從而能夠正確地進行解碼。
代碼-客戶端修改
添加ChannelHandler實現(xiàn),通過LengthFieldPrepender這個編碼器,在發(fā)送的消息前添加長度字段(這里的 4 是指長度字段本身占用的字節(jié)數(shù)量):
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
代碼-服務端修改
在 ChannelPipeline 中加上下列解碼的 ChannelHandler:
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
4. Netty特性優(yōu)化
4.1. 內存池 PooledByteBufAllocator
內存池是一種用于管理和復用內存塊的技術??梢员苊忸l繁地分配和釋放內存,從而減少系統(tǒng)開銷和內存碎片問題,提高系統(tǒng)的效率和性能。
PooledByteBufAllocator(分配器 [?æl??ke?t?r]) 是 Netty 提供的一個基于內存池的 ByteBuf 分配器。與直接創(chuàng)建新的 ByteBuf 實例相比,PooledByteBufAllocator 提供了重用內存的能力,這可以顯著減少內存分配和垃圾收集的開銷,提高性能:
- 內存分區(qū):PooledByteBufAllocator 將內存劃分為多個 Arena,每個 Arena 進一步劃分為多個 Chunk 和 Page。通過這種方式,PooledByteBufAllocator 能夠滿足不同大小的內存需求,并且能夠快速找到合適的內存塊進行分配。
- 對象復用:當 ByteBuf 的引用計數(shù)為 0 時,它的內存會被返回到原來的 Arena 并可以被重用。這避免了頻繁創(chuàng)建和銷毀對象,降低了系統(tǒng)開銷。
- 線程本地緩存:PooledByteBufAllocator 使用了線程本地緩存技術(Thread Local Cache),每個線程都有自己的一份緩存池,這可以減少線程間的競爭,進一步提高性能。
- 內存分配策略:對于小于 Page 大小的內存分配請求,PooledByteBufAllocator 使用 jemalloc 策略進行內存分配。這是一種高效的內存分配策略,能夠減少內存碎片,提高內存使用率。
通過這些方式,PooledByteBufAllocator 可以有效地復用內存,提高了內存使用的效率和性能。
PooledByteBufAllocator 創(chuàng)建 ByteBuf 過程
PooledByteBufAllocator allocator = new PooledByteBufAllocator(); // 分別分配堆內存、堆外內存,內存大小也可以指定,如: allocator.heapBuffer(1024); ByteBuf heapBuffer = allocator.heapBuffer(); ByteBuf directBuffer = allocator.directBuffer(); // 正常將寫入數(shù)據(jù)或讀取 heapBuffer.writeBytes(data); byte b = heapBuffer.readByte(); // 記得不用時釋放內存,堆外內存不受垃圾回收,不釋放會有內存泄露 heapBuffer.release(); directBuffer.release();
不過實際項目中,很少有見過通過創(chuàng)建 PooledByteBufAllocator,再創(chuàng)建 ByteBuf 的。
基本都是由 Unpooled 工具類 創(chuàng)建 ByteBuf。
創(chuàng)建:堆內內存 OR 堆外內存?
(1)堆內內存:
如果你需要處理的數(shù)據(jù)比較小(比如幾 KB 或幾百 KB),而且需要進行頻繁的讀寫操作,那么建議使用堆內內存。
(2)堆外內存:
如果你需要處理的數(shù)據(jù)比較大(比如幾 MB 或幾十 MB),而且需要進行頻繁的 IO 操作,那么建議使用堆外內存。堆外內存是由操作系統(tǒng)管理的,數(shù)據(jù)存儲在操作系統(tǒng)的內存中,可以直接進行 IO 操作。此外,在使用堆外內存時,可以避免 Java 堆和操作系統(tǒng)之間的數(shù)據(jù)拷貝,減少了系統(tǒng)的開銷和延遲。
需要注意的是,堆外內存的申請和釋放需要調用 JNI 接口,因此申請和釋放堆外內存的開銷會比較高。因此一般來說:
對于小規(guī)模的數(shù)據(jù)處理應用,建議使用堆內內存;對于大規(guī)模的數(shù)據(jù)處理應用,建議使用堆外內存
4.2. 內存池 Unpooled
Unpooled 是 Netty 中一個工具類,用于創(chuàng)建不同類型的 ByteBuf 對象,而且同樣是使用PooledByteBufAllocator 類來分配和管理內存。
只不過它提供了一些靜態(tài)方法,可以很方便地創(chuàng)建 HeapBuf、DirectBuf、CompositeBuf 等類型的 ByteBuf 對象。常見方法:
- buffer():創(chuàng)建一個 HeapBuf 對象,使用 JVM 堆內存來存儲數(shù)據(jù)。
- directBuffer():創(chuàng)建一個 DirectBuf 對象,使用直接內存來存儲數(shù)據(jù)。
- wrappedBuffer():創(chuàng)建一個 CompositeBuf 對象,可以將多個 ByteBuf 對象合并成一個虛擬的 ByteBuf 對象。
- copiedBuffer():創(chuàng)建一個 HeapBuf 對象,并將字節(jié)數(shù)組的內容復制到 HeapBuf 中。
- unsafeBuffer():創(chuàng)建一個不安全的 ByteBuf 對象,用于一些特殊的場景,例如 JNI 調用等。
不過同樣要記得在使用完畢后,應該及時調用 release() 方法來釋放 ByteBuf 對象的資源哦。
回顧一下:考慮到Netty中 ByteBuf 等常用類,為避免頻繁地分配和釋放內存,通過內存池實現(xiàn)內存復用。但 ByteBuf 也是類,頻繁地創(chuàng)建、銷毀對象同樣有大量的性能開銷,怎么優(yōu)化?
那么接下來我們看一下 對象池。
4.3. 對象池 Recycler
Recycler (回收器,[?ri??sa?kl] )是 Netty是一個對象池,主要用于重用對象,避免頻繁創(chuàng)建和銷毀帶來的性能開銷。被廣泛地應用于各種場景中,例如 ByteBuf 對象池、EventExecutor 對象池、ChannelHandlerContext 對象池等等。我們還是來看看 ByteBuf。
ByteBuf 中包含一個 Recycler.Handle 對象,用于管理 ByteBuf 對象池的創(chuàng)建和銷毀。當需要創(chuàng)建一個新的 ByteBuf 對象時,無論通過前面介紹的PooledByteBufAllocator、Unpooled,都是通過 ByteBufAllocator 接口提供的 directBuffer() 或 heapBuffer() 等方法來創(chuàng)建。
這些方法就是基于Recycler,會自動從線程本地的對象池中獲取一個 ByteBuf 對象,如果對象池為空,則會創(chuàng)建一個新對象,并將其加入對象池中。當不再需要這個對象時,可以通過調用 release() 方法將其回收到對象池中,等待下次使用。
ChannelHandlerContext 對象池也類似,在 Netty 中,可以通過 ChannelHandlerContext 的 newContext() 方法來獲取一個新的 ChannelHandlerContext 對象,這個方法會從 Recycler 對象池中獲取一個 ChannelHandlerContext 對象并進行初始化,如果沒有可用的對象,則會創(chuàng)建一個新對象。在使用完后,通過調用 ChannelHandlerContext 的 recycle() 方法將其回收到對象池中,等待下次使用。
當然 Recycler 是 Netty 中實現(xiàn)對象池的機制,并不局限于只有 Netty 的這些組件類可以用,任何我們自定義的類都可以。下面看一個例子。
示例(任何對象)
public class UserCache { private static final Recycler<User> userRecycler = new Recycler<User>() { @Override protected User newObject(Handle<User> handle) { return new User(handle); } }; static final class User { private String name; private Recycler.Handle<User> handle; public void setName(String name) { this.name = name; } public String getName() { return name; } public User(Recycler.Handle<User> handle) { this.handle = handle; } public void recycle() { handle.recycle(this); } } public static void main(String[] args) { User user1 = userRecycler.get(); user1.setName("hello"); user1.recycle(); User user2 = userRecycler.get(); System.out.println(user1 == user2); } }
左邊的例子中,我們定義了一個User類,main方法中,user1.recycle(),user1回收了之后,然后 user2 再獲取。
- (1)user2獲取的依然是同一個對象,所以打印出的結果是:hello 和 true。
- (2)如果我們注釋掉 user1.cecycle(),user2 會獲取不到對象,打印的結果就是:null 和 false。
線程安全
另外,Recycler 使用線程本地變量(FastThreadLocal)來存儲對象,每個線程都有一個獨立的對象池。這個機制可以保證對象的安全性和線程互相獨立,避免了線程安全問題和競爭條件的出現(xiàn)。
那么這個 FastThreadLocal 是啥?和常見的 ThreadLocal 有啥關系呢?
4.4. 本地線程優(yōu)化 FastThreadLocal
FastThreadLocal(更快的ThreadLocal) 是 Netty 自己研發(fā)的一個工具類,用于替換 Java 原生的 ThreadLocal。主要有以下幾個原因:
- 性能:與 ThreadLocal 相比,F(xiàn)astThreadLocal 在存取線程局部變量時有更快的速度。在 ThreadLocal 中,每次獲取變量都需要通過哈希映射進行查找,當線程局部變量很多時,這會成為一個性能瓶頸。而 FastThreadLocal 則將所有線程的局部變量存儲在一個數(shù)組中,通過索引快速定位,提高了存取速度。
- 避免內存泄漏:ThreadLocal 在使用不當時,很容易造成內存泄漏,需要我們在使用后再手動調用reomve()方法。而 FastThreadLocal 能有效避免這個問題。它會在每個線程結束時自動清理線程局部變量,而不是依賴于 JVM 的垃圾回收。
- 更好的整合:Netty 中很多地方使用了線程局部變量,例如 ByteBuf 的內存池、Recycler 對象池等。有了自己的 FastThreadLocal,Netty 可以更好地控制和優(yōu)化這些功能,提高整體性能。
代碼示例
public class FastThreadLocalDemo { private static final FastThreadLocal<Integer> THREAD_LOCAL = new FastThreadLocal<Integer>() { @Override protected Integer initialValue() throws Exception { return 1; } }; public static void main(String[] args) { new FastThreadLocalThread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " --> " + THREAD_LOCAL.get()); THREAD_LOCAL.set(THREAD_LOCAL.get() + 1); } }, "FastThreadLocalThread-1").start(); } }
注意事項
FastThreadLocal 的使用方式和 ThreadLocal差別不大,但是有幾點需要注意:
- 使用 FastThreadLocal 的線程最好是 FastThreadLocalThread 類型或者其子類。FastThreadLocal 會在這些線程中有更好的性能。如果使用的是Thread或其他實現(xiàn)的話,F(xiàn)astThreadLocal 仍然可以工作,但性能會降級。
- 相比于 ThreadLocal,F(xiàn)astThreadLocal 的優(yōu)勢在于當一個線程有多個線程本地變量時,它可以通過減少哈希沖突和查找來提高性能。但是如果一個線程只有一個或者很少的線程本地變量,那么 ThreadLocal 可能會有更好的性能。
- 當你不再需要使用 FastThreadLocal 中的對象時,還是應該調用 remove() 來避免內存泄漏。
雖說在使用了 FastThreadLocalThread 實例的情況下,在線程結束時,F(xiàn)astThreadLocal 會自動清理所有線程局部變量。但顯式地調用 remove() 方法仍然是一個好的實踐。特別是在長生命周期的線程或者使用了線程池的情況下,顯式地清理線程局部變量可以幫助避免潛在的內存泄漏問題。
以上就是Netty開發(fā)及粘包實戰(zhàn)解決分析的詳細內容,更多關于Netty開發(fā)粘包解決的資料請關注腳本之家其它相關文章!
以上就是Netty開發(fā)及粘包實戰(zhàn)解決分析的詳細內容,更多關于Netty開發(fā)粘包解決的資料請關注腳本之家其它相關文章!
相關文章
Spring Boot結成MyBatis-Plus最全配置指南
本文主要介紹了Spring Boot結成MyBatis-Plus最全配置指南,包括依賴引入、配置數(shù)據(jù)源、Mapper 掃描、基本CRUD操作等,具有一定的參考價值,感興趣的可以了解一下2025-03-03spring基于通用Dao的多數(shù)據(jù)源配置詳解
這篇文章主要為大家詳細介紹了spring基于通用Dao的多數(shù)據(jù)源配置,具有一定的參考價值,感興趣的小伙伴們可以參考一下解2018-03-03SpringBoot使用@SpringBootTest注解開發(fā)單元測試教程
這篇文章主要介紹了SpringBoot使用@SpringBootTest注解開發(fā)單元測試教程,本文通過詳細的案例過程來說明如何使用該項技術,需要的朋友可以參考下2021-06-06Java中快速排序優(yōu)化技巧之隨機取樣、三數(shù)取中和插入排序
快速排序是一種常用的基于比較的排序算法,下面這篇文章主要給大家介紹了關于Java中快速排序優(yōu)化技巧之隨機取樣、三數(shù)取中和插入排序的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2023-09-09