深入理解Netty核心類及其作用
MessageToByteEncoder
MessageToByteEncoder是一個(gè)抽象編碼器,子類可重寫encode方法把對(duì)象編碼為ByteBuf輸出。
MessageToByteEncoder繼承自ChannelOutboundHandlerAdapter,encode在出站是被調(diào)用。
public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> { @Override protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder.encode,被調(diào)用"); String json = JSONObject.toJSONString(msg); out.writeInt(json.getBytes(StandardCharsets.UTF_8).length); out.writeBytes(json.getBytes(StandardCharsets.UTF_8)); } }
ByteToMessageDecoder
ByteToMessageDecoder是一種ChannelInboundHandler,可以稱為解碼器,負(fù)責(zé)將byte字節(jié)流(ByteBuf)轉(zhuǎn)換成一種Message,Message是應(yīng)用可以自己定義的一種Java對(duì)象。
ByteToMessageDecoder:用于將字節(jié)轉(zhuǎn)為消息,需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié)。
public class MyMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被調(diào)用"); while (in.readableBytes() >= 4){ int num = in.readInt(); System.out.println("解碼出一個(gè)整數(shù):"+num); out.add(num); } } }
ReplayingDecoder
ReplayingDecoder:繼承自ByteToMessageDecoder,不需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。
項(xiàng)目復(fù)雜度高用ReplayingDecoder,否則使用ByteToMessageDecoder。
public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被調(diào)用"); int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); String json = new String(content,StandardCharsets.UTF_8); MessagePO po = JSONObject.parseObject(json,MessagePO.class); out.add(po); } }
MessageToMessageEncoder
用于從一種消息編碼為另外一種消息,例如從POJO到POJO,是一種ChannelOutboundHandler
MessageToMessageDecoder
從一種消息解碼為另一種消息,例如POJO到POJO,是一種ChannelInboundHandler
MessageToMessageCodec
整合了MessageToMessageEncoder 和 MessageToMessageDecoder
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被調(diào)用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被調(diào)用 " + msg); RequestData po = JSONObject.parseObject(msg, RequestData.class); out.add(po); } }
ChannelInitializer
ChannelInitializer是一種特殊的ChannelInboundHandler,可以通過一種簡(jiǎn)單的方式(調(diào)用initChannel方法)來(lái)初始化Channel。
通常在Bootstrap.handler(ChannelHandler)
, ServerBootstrap.handler(ChannelHandler)
和 ServerBootstrap.childHandler(ChannelHandler)
中給Channel設(shè)置ChannelPipeline。
注意:當(dāng)initChannel被執(zhí)行完后,會(huì)將當(dāng)前的handler從Pipeline中移除。
Bootstrap bootstrap = new Bootstrap().group(group)//設(shè)置線程組 .channel(NioSocketChannel.class)//設(shè)置客戶端通道的實(shí)現(xiàn)類 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler());//加入自己的處理器 } });
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn) .option(ChannelOption.SO_BACKLOG, 128)//設(shè)置線程隊(duì)列等待連接的個(gè)數(shù) .childOption(ChannelOption.SO_KEEPALIVE, true)//設(shè)置保持活動(dòng)連接狀態(tài) // .handler(null)//該Handler對(duì)應(yīng)bossGroup .childHandler(new ChannelInitializer<SocketChannel>() {//給workerGroup的EventLoop對(duì)應(yīng)的管道設(shè)置處理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } });
SimpleChannelInboundHandler
SimpleChannelInboundHandler繼承自ChannelInboundHandlerAdapter,可以通過泛型來(lái)規(guī)定消息類型。
處理入站的數(shù)據(jù)我們只需要實(shí)現(xiàn)channelRead0方法。
SimpleChannelInboundHandler在接收到數(shù)據(jù)后會(huì)自動(dòng)release掉數(shù)據(jù)占用的Bytebuffer資源,ChannelInboundHandlerAdapter不會(huì)自動(dòng)釋放。
public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception { System.out.println("收到服務(wù)端消息:" + msg); } }
DefaultEventLoopGroup
在向pipline中添加ChannelHandler時(shí),可以提供一個(gè)新的線程組,Handler業(yè)務(wù)會(huì)在該線程中執(zhí)行。
當(dāng)加ChannelHandler需要執(zhí)行多線程并發(fā)業(yè)務(wù)時(shí),DefaultEventLoopGroup可以派上大用處。
如果沒有設(shè)置DefaultEventLoopGroup,默認(rèn)使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100); ... addLast(businessGroup, new MyNettyServerHandler())
/** * 讀取客戶端發(fā)送過來(lái)的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到客戶信息:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); System.out.println("處理線程:" + Thread.currentThread().getName()); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成1"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成2"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成3"); } catch (InterruptedException e) { e.printStackTrace(); } }); }
以上代碼執(zhí)行日志如下:
收到客戶信息:Hello 服務(wù)端
客戶端地址:/127.0.0.1:60345
處理線程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序繼續(xù)~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任務(wù)執(zhí)行完成1
parent任務(wù)執(zhí)行完成3
parent任務(wù)執(zhí)行完成2
EventLoop定時(shí)任務(wù)
可以在Handler中通過方法ctx.channel().eventLoop().schedule()
添加定時(shí)任務(wù)
ctx.channel().eventLoop().schedule(()->{ try { System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("定時(shí)任務(wù)執(zhí)行完成"); } catch (InterruptedException e) { e.printStackTrace(); } },10L,TimeUnit.SECONDS);
到此這篇關(guān)于深入理解Netty核心類及其作用的文章就介紹到這了,更多相關(guān)Netty核心類內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)
這篇文章主要介紹了詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10BMIDE環(huán)境導(dǎo)入項(xiàng)目報(bào)編碼錯(cuò)誤解決方案
這篇文章主要介紹了BMIDE環(huán)境導(dǎo)入項(xiàng)目報(bào)編碼錯(cuò)誤解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實(shí)例代碼
本文主要介紹了Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實(shí)例代碼,代碼簡(jiǎn)單易懂,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02Java中StringBuilder與StringBuffer的區(qū)別
在Java編程中,字符串的拼接是一項(xiàng)常見的操作。為了有效地處理字符串的拼接需求,Java提供了兩個(gè)主要的類:StringBuilder和StringBuffer,本文主要介紹了Java中StringBuilder與StringBuffer的區(qū)別,感興趣的可以了解一下2023-08-08基于Spring Boot保護(hù)Web應(yīng)用程序
這篇文章主要介紹了基于Spring Boot保護(hù)Web應(yīng)用程序,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03詳解Spring依賴注入的三種方式以及優(yōu)缺點(diǎn)
IoC?和?DI?是?Spring?中最重要的兩個(gè)概念,其中?IoC(Inversion?of?Control)為控制反轉(zhuǎn)的思想,而?DI(Dependency?Injection)依賴注入為其(IoC)具體實(shí)現(xiàn)。那么?DI?實(shí)現(xiàn)依賴注入的方式有幾種?這些注入方式又有什么不同?本文就來(lái)和大家一起詳細(xì)聊聊2022-08-08通過AOP攔截Spring?Boot日志并將其存入數(shù)據(jù)庫(kù)功能實(shí)現(xiàn)
本文介紹了如何使用Spring Boot和AOP技術(shù)實(shí)現(xiàn)攔截系統(tǒng)日志并保存到數(shù)據(jù)庫(kù)中的功能,包括配置數(shù)據(jù)庫(kù)連接、定義日志實(shí)體類、定義日志攔截器、使用AOP攔截日志并保存到數(shù)據(jù)庫(kù)中等步驟,感興趣的朋友一起看看吧2023-08-08使用IDEA創(chuàng)建一個(gè)vert.x項(xiàng)目的方法
這篇文章主要介紹了使用IDEA創(chuàng)建一個(gè)vert.x項(xiàng)目的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧2018-09-09Spring項(xiàng)目中使用Junit單元測(cè)試并配置數(shù)據(jù)源的操作
這篇文章主要介紹了Spring項(xiàng)目中使用Junit單元測(cè)試并配置數(shù)據(jù)源的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09