亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java使用Netty實(shí)現(xiàn)同時(shí)多端口監(jiān)聽

 更新時(shí)間:2025年10月21日 10:35:33   作者:齊 飛  
本文主要內(nèi)容為SpringBoot項(xiàng)目使用Netty監(jiān)聽多個端口接受客戶端數(shù)據(jù)_netty監(jiān)聽多個端口,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言

Netty 是一個基于 Java NIO(非阻塞 I/O)的網(wǎng)絡(luò)應(yīng)用框架,它簡化了開發(fā)高性能、高可靠性網(wǎng)絡(luò)服務(wù)器和客戶端的過程,使得開發(fā)者能夠?qū)W⒂跇I(yè)務(wù)邏輯的實(shí)現(xiàn),而無需過多關(guān)注底層網(wǎng)絡(luò)通信的復(fù)雜細(xì)節(jié)。適用于 Web 服務(wù)器、即時(shí)通訊、游戲服務(wù)器等開發(fā),廣泛應(yīng)用于諸多領(lǐng)域。

環(huán)境

JDK:64位 Jdk1.8
SpringBoot:2.1.7.RELEASE
Netty:4.1.39.Final

實(shí)現(xiàn)功能

使用Netty監(jiān)聽多個端口接受設(shè)備發(fā)送的數(shù)據(jù),并發(fā)送數(shù)據(jù)給客戶端。

服務(wù)端

相關(guān)配置

pom.xml

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>

application.yml

spring:
  profiles:
    # 環(huán)境 dev|test|prod
    active: dev
netty-socket:
  ports: 8801,8802
  bufferSize: 2048

配置類

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


/**
 * @author qf
 * @since 2024/12/08 21:08
 */
@Component
@ConfigurationProperties(prefix = "netty-socket")
@Data
public class SocketConfig {
    private List<Integer> ports;
    private Integer bufferSize;
}

核心代碼

啟動類

CommandLineRunner:當(dāng)應(yīng)用程序啟動時(shí),CommandLineRunner 接口的實(shí)現(xiàn)類中的 run 方法會被調(diào)用

import cn.hutool.core.util.ObjectUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;

@Slf4j
@Component
public class NettyServer {

    @Value("${spring.profiles.active}")
    private String active;
    @Autowired
    private SocketConfig socketConfig;


    //用于關(guān)閉
    private List<ChannelFuture> futures;
    private EventLoopGroup bossGroup = null;
    private EventLoopGroup workerGroup = null;

    public List<ChannelFuture> getFutures() {
        if (ObjectUtil.isEmpty(futures)) {
            futures = new ArrayList<>();
        }
        return futures;
    }

    public void start() throws Exception {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        try {
            Integer bufferSize = socketConfig.getBufferSize();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(bufferSize, bufferSize, bufferSize))
                    .childHandler(new ListenChannelInitHandler());
            if("dev".equals(active)){
                bootstrap.handler(new LoggingHandler(LogLevel.INFO));
            }
            for (Integer port : socketConfig.getPorts()) {
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                getFutures().add(channelFuture);
                channelFuture.addListener(future -> {
                    if (future.isSuccess()) {
                        log.info("端口:{} 監(jiān)聽成功!", port);
                    } else {
                        log.info("端口:{} 監(jiān)聽失敗!", port);
                    }
                });
                channelFuture.channel().closeFuture()
                        .addListener((ChannelFutureListener) listener -> channelFuture.channel().close());
            }
        } catch (Exception e) {
            log.info("netty監(jiān)聽數(shù)據(jù)時(shí)發(fā)生異常:", e);
        } finally {
            //異步使用stop()關(guān)閉
//            bossGroup.shutdownGracefully();
//            workerGroup.shutdownGracefully();
        }
    }

    @PreDestroy
    public void stop() {
        if (ObjectUtil.isNotEmpty(futures)) {
            try {
                for (ChannelFuture future : futures) {
                    future.channel().close().addListener(ChannelFutureListener.CLOSE);
                    future.awaitUninterruptibly();
                }
                if (ObjectUtil.isNotEmpty(bossGroup) && ObjectUtil.isNotEmpty(workerGroup)) {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
                futures = null;
                log.info(" netty服務(wù)關(guān)閉成功! ");
            } catch (Exception e) {
                log.error("關(guān)閉netty服務(wù)時(shí)發(fā)生異常: ", e);
            }
        }
    }
}

端口枚舉

public enum PortEnum {
    TEST1(8801,"測試1"),
    TEST2(8802,"測試2");
    private int port;
    private String name;

    PortEnum(int port, String name) {
        this.port = port;
        this.name = name;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

端口處理器

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

import static com.qf.netty.PortEnum.*;

/**
 * 多端口處理器
 *
 * @author qf
 * @since 2024/12/05 19:05
 */
@Slf4j
public class ListenChannelInitHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        int port = socketChannel.localAddress().getPort();

        if (TEST1.getPort() == port) {
            //使用自定義分割符
            customizeSplitHandler(socketChannel, true, "$");
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new StringEncoder());
            // 添加自定義的業(yè)務(wù)處理器
            socketChannel.pipeline().addLast(new DeviceAServiceHandler());
        } else if (TEST2.getPort() == port) {
            customizeSplitHandler(socketChannel, false, "#");
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new StringEncoder());
            socketChannel.pipeline().addLast(new DeviceBServiceHandler());
        } else {
            log.error("監(jiān)聽的端口號暫無業(yè)務(wù)處理:{}", port);
        }
        // 添加異常處理器
        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                // 獲取遠(yuǎn)程地址
                InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                String clientIP = remoteAddress.getAddress().getHostAddress();

                // 記錄異常信息和客戶端IP地址
                log.error("監(jiān)聽設(shè)備時(shí)出現(xiàn)異常,客戶端IP: {}", clientIP, cause);
                ctx.close();
            }
        });
    }

    /**
     * 自定義分隔符處理器
     *
     * @param socketChannel  socketChannel
     * @param stripDelimiter 是否去除分隔符
     * @param split          分隔符
     */
    private static void customizeSplitHandler(SocketChannel socketChannel, boolean stripDelimiter, String split) {
        ByteBuf buffer = Unpooled.copiedBuffer(split.getBytes());
        try {
            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, stripDelimiter, buffer));
        } catch (Exception e) {
            buffer.release();
            log.error("監(jiān)聽工地微站設(shè)備時(shí)出現(xiàn)異常:", e);
        }
    }
}

業(yè)務(wù)處理器

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qf
 * @since 2024/12/09 19:36
 */
@Slf4j
public class DeviceAServiceHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 處理接收到的消息
        // 你可以在這里添加更復(fù)雜的業(yè)務(wù)邏輯,比如解析消息、訪問數(shù)據(jù)庫等。
        log.info("發(fā)送的數(shù)據(jù)為------->:" + msg);
        if (true) {
            //發(fā)送數(shù)據(jù)給客戶端
            ctx.writeAndFlush("helloA~");
        }
    }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author qf
 * @since 2024/12/09 19:36
 */
@Slf4j
public class DeviceBServiceHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 處理接收到的消息
        // 你可以在這里添加更復(fù)雜的業(yè)務(wù)邏輯,比如解析消息、訪問數(shù)據(jù)庫等。
        log.info("發(fā)送的數(shù)據(jù)為------->:" + msg);
        if (true) {
            //發(fā)送數(shù)據(jù)給客戶端
            ctx.writeAndFlush("helloB~");
        }
    }
}

客戶端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
 * @author qf
 * @since 2024/10/11 18:01
 */
public class Hello1Client {
    public static void main(String[] args) throws InterruptedException {
        // 1. 啟動類,啟動客戶端
        new Bootstrap()
                // 2. 添加 EventLoop
                .group(new NioEventLoopGroup())//如果服務(wù)器端發(fā)來數(shù)據(jù),客戶端的EventLoop就可以從selector觸發(fā)讀事件進(jìn)行處理
                // 3. 選擇客戶端 channel 實(shí)現(xiàn),底層封裝了SocketChannel
                .channel(NioSocketChannel.class)
                // 4. 添加處理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在連接建立后被調(diào)用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());//編碼器,將字符串編碼成ByteBuf進(jìn)行發(fā)送
                        ch.pipeline().addLast(new EchoClientHandler());
                    }
                })
                // 5. 連接到服務(wù)器
                .connect(new InetSocketAddress("localhost", 8801))
                .sync()//sync()是一個阻塞方法,只有連接建立后才會繼續(xù)執(zhí)行
                .channel()//.channel()表示拿到服務(wù)器和客戶端之間的SocketChannel(連接對象)
                // 6. 向服務(wù)器發(fā)送數(shù)據(jù)
                .writeAndFlush("hello, world$");
    }
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class Hello2Client {
    public static void main(String[] args) throws InterruptedException {
        // 1. 啟動類,啟動客戶端
        new Bootstrap()
                // 2. 添加 EventLoop
                .group(new NioEventLoopGroup())//如果服務(wù)器端發(fā)來數(shù)據(jù),客戶端的EventLoop就可以從selector觸發(fā)讀事件進(jìn)行處理
                // 3. 選擇客戶端 channel 實(shí)現(xiàn),底層封裝了SocketChannel
                .channel(NioSocketChannel.class)
                // 4. 添加處理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在連接建立后被調(diào)用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());//編碼器,將字符串編碼成ByteBuf進(jìn)行發(fā)送
                        ch.pipeline().addLast(new EchoClientHandler());
                    }
                })
                // 5. 連接到服務(wù)器
                .connect(new InetSocketAddress("localhost", 8802))
                .sync()//sync()是一個阻塞方法,只有連接建立后才會繼續(xù)執(zhí)行
                .channel()//.channel()表示拿到服務(wù)器和客戶端之間的SocketChannel(連接對象)
                // 6. 向服務(wù)器發(fā)送數(shù)據(jù)
                .writeAndFlush("hello, world#");
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author qf
 * @since 2024/10/11 18:05
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf message;

    public EchoClientHandler() {
        message = Unpooled.buffer(256);
        message.writeBytes("hello netty - ".getBytes(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);
        System.out.println(data);
//        ctx.write(msg);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

到此這篇關(guān)于Java使用Netty實(shí)現(xiàn)同時(shí)多端口監(jiān)聽的文章就介紹到這了,更多相關(guān)Java Netty 多端口監(jiān)聽內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java中接口和事件監(jiān)聽器的深入理解

    java中接口和事件監(jiān)聽器的深入理解

    這篇文章主要給大家介紹了關(guān)于java中接口和事件監(jiān)聽器的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • Java中關(guān)于OAuth2.0的原理分析

    Java中關(guān)于OAuth2.0的原理分析

    這篇文章主要介紹了Java中關(guān)于OAuth2.0的原理分析,OAuth是一個關(guān)于授權(quán)的開放網(wǎng)絡(luò)標(biāo)準(zhǔn),允許用戶授權(quán)第三 方應(yīng)用訪問他們存儲在另外的服務(wù)提供者上的信息,而不需要將用戶名和密碼提供給第三方移動應(yīng)用或分享他們數(shù)據(jù)的所有內(nèi)容,需要的朋友可以參考下
    2023-09-09
  • java設(shè)計(jì)模式之單例模式

    java設(shè)計(jì)模式之單例模式

    這篇文章主要為大家詳細(xì)介紹了java設(shè)計(jì)模式之單例模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • SpringBoot應(yīng)用自定義logback日志詳解

    SpringBoot應(yīng)用自定義logback日志詳解

    默認(rèn)情況下,SpringBoot內(nèi)部使用logback作為系統(tǒng)日志實(shí)現(xiàn)的框架,將日志輸出到控制臺,不會寫到日志文件。本篇文章主要講解下如何自定義logabck.xml以及對logback文件中配置做一個詳解,需要的可以參考一下
    2022-10-10
  • java分布式面試系統(tǒng)限流最佳實(shí)踐

    java分布式面試系統(tǒng)限流最佳實(shí)踐

    這篇文章主要介紹了java分布式面試系統(tǒng)限流最佳實(shí)踐場景分析解答,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-03-03
  • java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別

    java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別

    這篇文章主要介紹了 java中synchronized(同步代碼塊和同步方法)詳解及區(qū)別的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • SpringBoot框架如何操作Excel和PDF

    SpringBoot框架如何操作Excel和PDF

    Excel和PDF都是常見的辦公文件類型,在實(shí)際需求中有著較多的應(yīng)用,excel經(jīng)常用來處理數(shù)據(jù),PDF文件格式可以將文字、字型、格式、顏色及獨(dú)立于設(shè)備和分辨率的圖形圖像等封裝在一個文件中,本文就講述下SpringBoot框架如何操作這兩種類型的文件
    2021-06-06
  • springboot項(xiàng)目打docker鏡像實(shí)例(入門級)

    springboot項(xiàng)目打docker鏡像實(shí)例(入門級)

    最近做個項(xiàng)目,我們想把自己的程序打包成鏡像,并運(yùn)行在docker容器中,本文主要介紹了springboot項(xiàng)目打docker鏡像實(shí)例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-06-06
  • Java編程基礎(chǔ)測試題分享

    Java編程基礎(chǔ)測試題分享

    這篇文章主要介紹了Java編程基礎(chǔ)測試題分享,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • common-upload上傳文件功能封裝類分享

    common-upload上傳文件功能封裝類分享

    本文介紹一個common-upload上傳封裝類,為了更方便的上傳文件,對common-upload進(jìn)行了一個簡單的封裝,大家參考使用吧
    2014-01-01

最新評論