Java使用Netty實(shí)現(xiàn)同時(shí)多端口監(jiān)聽
前言
Netty 是一個基于 Java NIO(非阻塞 I/O)的網(wǎng)絡(luò)應(yīng)用框架,它簡化了開發(fā)高性能、高可靠性網(wǎng)絡(luò)服務(wù)器和客戶端的過程,使得開發(fā)者能夠?qū)W⒂跇I(yè)務(wù)邏輯的實(shí)現(xiàn),而無需過多關(guān)注底層網(wǎng)絡(luò)通信的復(fù)雜細(xì)節(jié)。適用于 Web 服務(wù)器、即時(shí)通訊、游戲服務(wù)器等開發(fā),廣泛應(yīng)用于諸多領(lǐng)域。
環(huán)境
JDK:64位 Jdk1.8
SpringBoot:2.1.7.RELEASE
Netty:4.1.39.Final
實(shí)現(xiàn)功能
使用Netty監(jiān)聽多個端口接受設(shè)備發(fā)送的數(shù)據(jù),并發(fā)送數(shù)據(jù)給客戶端。
服務(wù)端
相關(guān)配置
pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
application.yml
spring:
profiles:
# 環(huán)境 dev|test|prod
active: dev
netty-socket:
ports: 8801,8802
bufferSize: 2048
配置類
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author qf
* @since 2024/12/08 21:08
*/
@Component
@ConfigurationProperties(prefix = "netty-socket")
@Data
public class SocketConfig {
private List<Integer> ports;
private Integer bufferSize;
}
核心代碼
啟動類
CommandLineRunner:當(dāng)應(yīng)用程序啟動時(shí),CommandLineRunner 接口的實(shí)現(xiàn)類中的 run 方法會被調(diào)用
import cn.hutool.core.util.ObjectUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class NettyServer {
@Value("${spring.profiles.active}")
private String active;
@Autowired
private SocketConfig socketConfig;
//用于關(guān)閉
private List<ChannelFuture> futures;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
public List<ChannelFuture> getFutures() {
if (ObjectUtil.isEmpty(futures)) {
futures = new ArrayList<>();
}
return futures;
}
public void start() throws Exception {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
Integer bufferSize = socketConfig.getBufferSize();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(bufferSize, bufferSize, bufferSize))
.childHandler(new ListenChannelInitHandler());
if("dev".equals(active)){
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
for (Integer port : socketConfig.getPorts()) {
ChannelFuture channelFuture = bootstrap.bind(port).sync();
getFutures().add(channelFuture);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
log.info("端口:{} 監(jiān)聽成功!", port);
} else {
log.info("端口:{} 監(jiān)聽失敗!", port);
}
});
channelFuture.channel().closeFuture()
.addListener((ChannelFutureListener) listener -> channelFuture.channel().close());
}
} catch (Exception e) {
log.info("netty監(jiān)聽數(shù)據(jù)時(shí)發(fā)生異常:", e);
} finally {
//異步使用stop()關(guān)閉
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();
}
}
@PreDestroy
public void stop() {
if (ObjectUtil.isNotEmpty(futures)) {
try {
for (ChannelFuture future : futures) {
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
}
if (ObjectUtil.isNotEmpty(bossGroup) && ObjectUtil.isNotEmpty(workerGroup)) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
futures = null;
log.info(" netty服務(wù)關(guān)閉成功! ");
} catch (Exception e) {
log.error("關(guān)閉netty服務(wù)時(shí)發(fā)生異常: ", e);
}
}
}
}
端口枚舉
public enum PortEnum {
TEST1(8801,"測試1"),
TEST2(8802,"測試2");
private int port;
private String name;
PortEnum(int port, String name) {
this.port = port;
this.name = name;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
端口處理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import static com.qf.netty.PortEnum.*;
/**
* 多端口處理器
*
* @author qf
* @since 2024/12/05 19:05
*/
@Slf4j
public class ListenChannelInitHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
int port = socketChannel.localAddress().getPort();
if (TEST1.getPort() == port) {
//使用自定義分割符
customizeSplitHandler(socketChannel, true, "$");
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new StringEncoder());
// 添加自定義的業(yè)務(wù)處理器
socketChannel.pipeline().addLast(new DeviceAServiceHandler());
} else if (TEST2.getPort() == port) {
customizeSplitHandler(socketChannel, false, "#");
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new DeviceBServiceHandler());
} else {
log.error("監(jiān)聽的端口號暫無業(yè)務(wù)處理:{}", port);
}
// 添加異常處理器
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 獲取遠(yuǎn)程地址
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = remoteAddress.getAddress().getHostAddress();
// 記錄異常信息和客戶端IP地址
log.error("監(jiān)聽設(shè)備時(shí)出現(xiàn)異常,客戶端IP: {}", clientIP, cause);
ctx.close();
}
});
}
/**
* 自定義分隔符處理器
*
* @param socketChannel socketChannel
* @param stripDelimiter 是否去除分隔符
* @param split 分隔符
*/
private static void customizeSplitHandler(SocketChannel socketChannel, boolean stripDelimiter, String split) {
ByteBuf buffer = Unpooled.copiedBuffer(split.getBytes());
try {
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, stripDelimiter, buffer));
} catch (Exception e) {
buffer.release();
log.error("監(jiān)聽工地微站設(shè)備時(shí)出現(xiàn)異常:", e);
}
}
}
業(yè)務(wù)處理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author qf
* @since 2024/12/09 19:36
*/
@Slf4j
public class DeviceAServiceHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 處理接收到的消息
// 你可以在這里添加更復(fù)雜的業(yè)務(wù)邏輯,比如解析消息、訪問數(shù)據(jù)庫等。
log.info("發(fā)送的數(shù)據(jù)為------->:" + msg);
if (true) {
//發(fā)送數(shù)據(jù)給客戶端
ctx.writeAndFlush("helloA~");
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author qf
* @since 2024/12/09 19:36
*/
@Slf4j
public class DeviceBServiceHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 處理接收到的消息
// 你可以在這里添加更復(fù)雜的業(yè)務(wù)邏輯,比如解析消息、訪問數(shù)據(jù)庫等。
log.info("發(fā)送的數(shù)據(jù)為------->:" + msg);
if (true) {
//發(fā)送數(shù)據(jù)給客戶端
ctx.writeAndFlush("helloB~");
}
}
}
客戶端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @author qf
* @since 2024/10/11 18:01
*/
public class Hello1Client {
public static void main(String[] args) throws InterruptedException {
// 1. 啟動類,啟動客戶端
new Bootstrap()
// 2. 添加 EventLoop
.group(new NioEventLoopGroup())//如果服務(wù)器端發(fā)來數(shù)據(jù),客戶端的EventLoop就可以從selector觸發(fā)讀事件進(jìn)行處理
// 3. 選擇客戶端 channel 實(shí)現(xiàn),底層封裝了SocketChannel
.channel(NioSocketChannel.class)
// 4. 添加處理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在連接建立后被調(diào)用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());//編碼器,將字符串編碼成ByteBuf進(jìn)行發(fā)送
ch.pipeline().addLast(new EchoClientHandler());
}
})
// 5. 連接到服務(wù)器
.connect(new InetSocketAddress("localhost", 8801))
.sync()//sync()是一個阻塞方法,只有連接建立后才會繼續(xù)執(zhí)行
.channel()//.channel()表示拿到服務(wù)器和客戶端之間的SocketChannel(連接對象)
// 6. 向服務(wù)器發(fā)送數(shù)據(jù)
.writeAndFlush("hello, world$");
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class Hello2Client {
public static void main(String[] args) throws InterruptedException {
// 1. 啟動類,啟動客戶端
new Bootstrap()
// 2. 添加 EventLoop
.group(new NioEventLoopGroup())//如果服務(wù)器端發(fā)來數(shù)據(jù),客戶端的EventLoop就可以從selector觸發(fā)讀事件進(jìn)行處理
// 3. 選擇客戶端 channel 實(shí)現(xiàn),底層封裝了SocketChannel
.channel(NioSocketChannel.class)
// 4. 添加處理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在連接建立后被調(diào)用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());//編碼器,將字符串編碼成ByteBuf進(jìn)行發(fā)送
ch.pipeline().addLast(new EchoClientHandler());
}
})
// 5. 連接到服務(wù)器
.connect(new InetSocketAddress("localhost", 8802))
.sync()//sync()是一個阻塞方法,只有連接建立后才會繼續(xù)執(zhí)行
.channel()//.channel()表示拿到服務(wù)器和客戶端之間的SocketChannel(連接對象)
// 6. 向服務(wù)器發(fā)送數(shù)據(jù)
.writeAndFlush("hello, world#");
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author qf
* @since 2024/10/11 18:05
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf message;
public EchoClientHandler() {
message = Unpooled.buffer(256);
message.writeBytes("hello netty - ".getBytes(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
System.out.println(data);
// ctx.write(msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
到此這篇關(guān)于Java使用Netty實(shí)現(xiàn)同時(shí)多端口監(jiān)聽的文章就介紹到這了,更多相關(guān)Java Netty 多端口監(jiān)聽內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot應(yīng)用自定義logback日志詳解
默認(rèn)情況下,SpringBoot內(nèi)部使用logback作為系統(tǒng)日志實(shí)現(xiàn)的框架,將日志輸出到控制臺,不會寫到日志文件。本篇文章主要講解下如何自定義logabck.xml以及對logback文件中配置做一個詳解,需要的可以參考一下2022-10-10
java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別
這篇文章主要介紹了 java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下2017-02-02
springboot項(xiàng)目打docker鏡像實(shí)例(入門級)
最近做個項(xiàng)目,我們想把自己的程序打包成鏡像,并運(yùn)行在docker容器中,本文主要介紹了springboot項(xiàng)目打docker鏡像實(shí)例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-06-06

