利用Java搭建個(gè)簡(jiǎn)單的Netty通信實(shí)例教程
前言
看過(guò)dubbo源碼的同學(xué)應(yīng)該都清楚,使用dubbo協(xié)議的底層通信是使用的netty進(jìn)行交互,而最近看了dubbo的Netty部分后,自己寫(xiě)了個(gè)簡(jiǎn)單的Netty通信例子。
準(zhǔn)備
工程截圖
模塊詳解
- rpc-common
rpc-common作為各個(gè)模塊都需使用的模塊,工程中出現(xiàn)的是一些通信時(shí)請(qǐng)求的參數(shù)以及返回的參數(shù),還有一些序列化的工具。
- rpc-client
rpc-client中目前只是單單的一個(gè)NettyClient啟動(dòng)類。
- rpc-server
rpc-client中目前也只是單單的一個(gè)NettyServer服務(wù)啟動(dòng)類。
需要的依賴
目前所有的依賴項(xiàng)都出現(xiàn)在 rpc-common 下的 pom.xml中。
<dependencies> <!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <!-- Protostuff --> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.9</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.9</version> </dependency> <!-- Objenesis --> <dependency> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> <version>2.1</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.38</version> </dependency> </dependencies>
實(shí)現(xiàn)
首先我們?cè)赾ommon中先定義本次的Request和Response的基類對(duì)象。
public class Request { private String requestId; private Object parameter; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getParameter() { return parameter; } public void setParameter(Object parameter) { this.parameter = parameter; } } public class Response { private String requestId; private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } }
使用fastJson進(jìn)行本次序列化
Netty對(duì)象的序列化轉(zhuǎn)換很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分別只要繼承它們,重寫(xiě)方法后,獲取到Object和Byte,各自轉(zhuǎn)換就OK。
不過(guò)如果是有要用到生產(chǎn)上的同學(xué),建議不要使用 fastJson,因?yàn)樗穆┒囱a(bǔ)丁真的是太多了,可以使用google的 protostuff。
public class RpcDecoder extends ByteToMessageDecoder { // 目標(biāo)對(duì)象類型進(jìn)行解碼 private Class<?> target; public RpcDecoder(Class target) { this.target = target; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { // 不夠長(zhǎng)度丟棄 return; } in.markReaderIndex(); // 標(biāo)記一下當(dāng)前的readIndex的位置 int dataLength = in.readInt(); // 讀取傳送過(guò)來(lái)的消息的長(zhǎng)度。ByteBuf 的readInt()方法會(huì)讓他的readIndex增加4 if (in.readableBytes() < dataLength) { // 讀到的消息體長(zhǎng)度如果小于我們傳送過(guò)來(lái)的消息長(zhǎng)度,則resetReaderIndex. 這個(gè)配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = JSON.parseObject(data, target); // 將byte數(shù)據(jù)轉(zhuǎn)化為我們需要的對(duì)象 out.add(obj); } } public class RpcEncoder extends MessageToByteEncoder { //目標(biāo)對(duì)象類型進(jìn)行編碼 private Class<?> target; public RpcEncoder(Class target) { this.target = target; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (target.isInstance(msg)) { byte[] data = JSON.toJSONBytes(msg); // 使用fastJson將對(duì)象轉(zhuǎn)換為byte out.writeInt(data.length); // 先將消息長(zhǎng)度寫(xiě)入,也就是消息頭 out.writeBytes(data); // 消息體中包含我們要發(fā)送的數(shù)據(jù) } } }
NetyServer
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println("Client Data:" + JSON.toJSONString(request)); Response response = new Response(); response.setRequestId(request.getRequestId()); response.setResult("Hello Client !"); // client接收到信息后主動(dòng)關(guān)閉掉連接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private String ip; private int port; public NettyServer(String ip, int port) { this.ip = ip; this.port = port; } public void server() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new RpcDecoder(Request.class)) .addLast(new RpcEncoder(Response.class)) .addLast(new NettyServerHandler()); } }); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 開(kāi)啟長(zhǎng)連接 ChannelFuture future = serverBootstrap.bind(ip, port).sync(); // if (future.isSuccess()) { // // new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port); // } future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer("127.0.0.1", 20000).server(); } }
關(guān)鍵名詞:
- EventLoopGroup
- workerGroup
- bossGroup
Server端的EventLoopGroup分為兩個(gè),一般workerGroup作為處理請(qǐng)求,bossGroup作為接收請(qǐng)求。
- ChannelOption
- SO_BACKLOG
- SO_SNDBUF
- SO_RCVBUF
- SO_KEEPALIVE
以上四個(gè)常量作為TCP連接中的屬性。
- ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中出現(xiàn)的 ChannelFutureListener.CLOSE ,作為Server端主動(dòng)關(guān)閉與Client端的通信,如果沒(méi)有主動(dòng)Close,那么NettyClient將會(huì)一直處于阻塞狀態(tài),得不到NettyServer的返回信息。
NettyClient
public class NettyClient extends SimpleChannelInboundHandler<Response> { private final String ip; private final int port; private Response response; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 創(chuàng)建并初始化 Netty 客戶端 Bootstrap 對(duì)象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Response.class)); pipeline.addLast(new RpcEncoder(Request.class)); pipeline.addLast(NettyClient.this); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); // String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":"); // 連接 RPC 服務(wù)器 ChannelFuture future = bootstrap.connect(ip, port).sync(); // 寫(xiě)入 RPC 請(qǐng)求數(shù)據(jù)并關(guān)閉連接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Request request = new Request(); request.setRequestId(UUID.randomUUID().toString()); request.setParameter("Hello Server !"); System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request))); } }
測(cè)試
如果以上所有內(nèi)容都準(zhǔn)備就緒,那么就可以進(jìn)行調(diào)試了。
啟動(dòng)順序,先啟動(dòng)NettyServer,再啟動(dòng)NettyClient。
總結(jié)
記得剛出來(lái)工作時(shí),有工作很多年的同事問(wèn)我了不了解Netty,當(dāng)時(shí)工作太短,直說(shuō)聽(tīng)過(guò)Putty,現(xiàn)在回想起來(lái)真的挺丟人的,哈哈。😋
Netty作為通信框架,如果你了解TCP,而且項(xiàng)目中有類似傳輸信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺實(shí)用的。
參考資料:
本項(xiàng)目Github地址:Netty-RPC
到此這篇關(guān)于利用Java搭建個(gè)簡(jiǎn)單的Netty通信的文章就介紹到這了,更多相關(guān)Java搭建Netty通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jlabel實(shí)現(xiàn)內(nèi)容自動(dòng)換行簡(jiǎn)單實(shí)例
這篇文章主要介紹了Jlabel實(shí)現(xiàn)內(nèi)容自動(dòng)換行簡(jiǎn)單實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01SpringBoot集成IJPay實(shí)現(xiàn)微信v3支付的示例代碼
本文主要介紹了SpringBoot集成IJPay實(shí)現(xiàn)微信v3支付的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07關(guān)于MyBatis中Mapper?XML熱加載優(yōu)化
大家好,本篇文章主要講的是關(guān)于MyBatis中Mapper?XML熱加載優(yōu)化,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2022-01-01Spring實(shí)戰(zhàn)之使用@Resource配置依賴操作示例
這篇文章主要介紹了Spring實(shí)戰(zhàn)之使用@Resource配置依賴操作,結(jié)合實(shí)例形式分析了Spring使用@Resource配置依賴具體步驟、實(shí)現(xiàn)及測(cè)試案例,需要的朋友可以參考下2019-12-12Mybatis中關(guān)于自定義mapper.xml時(shí),參數(shù)傳遞的方式及寫(xiě)法
這篇文章主要介紹了Mybatis中關(guān)于自定義mapper.xml時(shí),參數(shù)傳遞的方式及寫(xiě)法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12減小Maven項(xiàng)目生成的JAR包體積實(shí)現(xiàn)提升運(yùn)維效率
在Maven構(gòu)建Java項(xiàng)目過(guò)程中,減小JAR包體積可通過(guò)排除不必要的依賴和使依賴jar包獨(dú)立于應(yīng)用jar包來(lái)實(shí)現(xiàn),在pom.xml文件中使用<exclusions>標(biāo)簽排除不需要的依賴,有助于顯著降低JAR包大小,此外,將依賴打包到應(yīng)用外,可減少應(yīng)用包的體積2024-10-10Java?數(shù)據(jù)結(jié)構(gòu)與算法系列精講之隊(duì)列
這篇文章主要介紹了Java隊(duì)列數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn),隊(duì)列是一種特殊的線性表,只允許在表的隊(duì)頭進(jìn)行刪除操作,在表的后端進(jìn)行插入操作,隊(duì)列是一個(gè)有序表先進(jìn)先出,想了解更多相關(guān)資料的小伙伴可以參考下面文章的詳細(xì)內(nèi)容2022-02-02Java實(shí)戰(zhàn)之小米交易商城系統(tǒng)的實(shí)現(xiàn)
這篇文章將利用Java實(shí)現(xiàn)小米交易商城系統(tǒng),文中采用的技術(shù)有:JSP?、Spring、SpringMVC、MyBatis等,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-04-04MyBatis中有關(guān)int和Integer的使用方式
這篇文章主要介紹了MyBatis中有關(guān)int和Integer的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03