利用Netty+SpringBoot實(shí)現(xiàn)定時(shí)后端向前端推送數(shù)據(jù)
本文介紹BIO,NIO,AIO 及如何使用Spring Boot集成Netty,實(shí)現(xiàn)后臺(tái)向前端推送信息的功能。利用Spring Boot簡(jiǎn)化Netty的集成和配置。
1.BIO,NIO,AIO
BIO、NIO和AIO是Java編程語(yǔ)言中用于處理輸入輸出(IO)操作的三種不同的機(jī)制,它們分別代表 同步阻塞I/O,同步非阻塞I/O 和 異步非阻塞I/O。
1.1 BIO
BIO(Blocking IO) 是最傳統(tǒng)的IO模型,也稱(chēng)為同步阻塞IO。它實(shí)現(xiàn)的是同步阻塞模型,即服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線(xiàn)程,即客戶(hù)端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線(xiàn)程進(jìn)行處理。如果這個(gè)連接不做任何事情會(huì)造成不必要的線(xiàn)程開(kāi)銷(xiāo),并且線(xiàn)程在進(jìn)行IO操作期間是被阻塞的,無(wú)法進(jìn)行其他任務(wù)。在高并發(fā)環(huán)境下,BIO的性能較差,因?yàn)樗枰獮槊總€(gè)連接創(chuàng)建一個(gè)線(xiàn)程,而且線(xiàn)程切換開(kāi)銷(xiāo)較大,不過(guò)可以通過(guò)線(xiàn)程池機(jī)制改善。BIO適合一些簡(jiǎn)單的、低頻的、短連接的通信場(chǎng)景,例如HTTP請(qǐng)求。
1.2 NIO
NIO是Java 1.4引入的新IO模型,也稱(chēng)為同步非阻塞IO,它提供了一種基于事件驅(qū)動(dòng)的方式來(lái)處理I/O操作。
相比于傳統(tǒng)的BIO模型,NIO采用了Channel、Buffer和Selector等組件,線(xiàn)程可以對(duì)某個(gè)IO事件進(jìn)行監(jiān)聽(tīng),并繼續(xù)執(zhí)行其他任務(wù),不需要阻塞等待。當(dāng)IO事件就緒時(shí),線(xiàn)程會(huì)得到通知,然后可以進(jìn)行相應(yīng)的操作,實(shí)現(xiàn)了非阻塞式的高伸縮性網(wǎng)絡(luò)通信。在NIO模型中,數(shù)據(jù)總是從Channel讀入Buffer,或者從Buffer寫(xiě)入Channel,這種模式提高了IO效率,并且可以充分利用系統(tǒng)資源。
NIO主要由三部分組成:選擇器(Selector)、緩沖區(qū)(Buffer)和通道(Channel)。Channel是一個(gè)可以進(jìn)行數(shù)據(jù)讀寫(xiě)的對(duì)象,所有的數(shù)據(jù)都通過(guò)Buffer來(lái)處理,這種方式避免了直接將字節(jié)寫(xiě)入通道中,而是將數(shù)據(jù)寫(xiě)入包含一個(gè)或者多個(gè)字節(jié)的緩沖區(qū)。在多線(xiàn)程模式下,一個(gè)線(xiàn)程可以處理多個(gè)請(qǐng)求,這是通過(guò)將客戶(hù)端的連接請(qǐng)求注冊(cè)到多路復(fù)用器上,然后由多路復(fù)用器輪詢(xún)到連接有I/O請(qǐng)求時(shí)進(jìn)行處理。
對(duì)于NIO,如果從特性來(lái)看,它是非阻塞式IO,N是Non-Blocking的意思;如果從技術(shù)角度,NIO對(duì)于BIO來(lái)說(shuō)是一個(gè)新技術(shù),N的意思是New的意思。所以NIO也常常被稱(chēng)作Non-Blocking I/O或New I/O。
NIO適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),例如聊天服務(wù)器、彈幕系統(tǒng)、服務(wù)器間通訊等。它通過(guò)引入非阻塞通道的概念,提高了系統(tǒng)的伸縮性和并發(fā)性能。同時(shí),NIO的使用也簡(jiǎn)化了程序編寫(xiě),提高了開(kāi)發(fā)效率。
1.3 AIO
Java AIO(Asynchronous I/O)是Java提供的異步非阻塞IO編程模型,從Java 7版本開(kāi)始支持,AIO又稱(chēng)NIO 2.0。
相比于NIO模型,AIO模型更進(jìn)一步地實(shí)現(xiàn)了異步非阻塞IO,提高了系統(tǒng)的并發(fā)性能和伸縮性。在NIO模型中,雖然可以通過(guò)多路復(fù)用器處理多個(gè)連接請(qǐng)求,但仍需要在每個(gè)連接上進(jìn)行讀寫(xiě)操作,這仍然存在一定的阻塞。而在AIO模型中,所有的IO操作都是異步的,不會(huì)阻塞任何線(xiàn)程,可以更好地利用系統(tǒng)資源。
AIO模型有以下特性:
- 異步能力:AIO模型的最大特性是異步能力,對(duì)于socket和I/O操作都有效。讀寫(xiě)操作都是異步的,完成后會(huì)自動(dòng)調(diào)用回調(diào)函數(shù)。
- 回調(diào)函數(shù):在AIO模型中,當(dāng)一個(gè)異步操作完成后,會(huì)通知相關(guān)線(xiàn)程進(jìn)行后續(xù)處理,這種處理方式稱(chēng)為“回調(diào)”。回調(diào)函數(shù)可以由開(kāi)發(fā)者自行定義,用于處理異步操作的結(jié)果。
- 非阻塞:AIO模型實(shí)現(xiàn)了完全的異步非阻塞IO,不會(huì)阻塞任何線(xiàn)程,可以更好地利用系統(tǒng)資源。
- 高性能:由于AIO模型的異步能力和非阻塞特性,它可以更好地處理高并發(fā)、高伸縮性的網(wǎng)絡(luò)通信場(chǎng)景,進(jìn)一步提高系統(tǒng)的性能和效率。
- 操作系統(tǒng)支持:AIO模型需要操作系統(tǒng)的支持,因此在不同的操作系統(tǒng)上可能會(huì)有不同的表現(xiàn)。在Linux內(nèi)核2.6版本之后增加了對(duì)真正異步IO的實(shí)現(xiàn)
2 Netty原理
2.1 Netty原理
Netty基于Java NIO(非阻塞IO)實(shí)現(xiàn),它采用事件驅(qū)動(dòng)的編程模型,將IO操作抽象為事件,通過(guò)事件處理器來(lái)處理這些事件。Netty的主要組件包括:
- Bootstrap:用于啟動(dòng)客戶(hù)端和服務(wù)器的引導(dǎo)類(lèi)
- Channel:代表IO操作的通道,用于網(wǎng)絡(luò)讀寫(xiě)操作
- ChannelHandler:用于處理IO事件的事件處理器
- EventLoopGroup:用于處理IO操作的多線(xiàn)程事件循環(huán)組
3 Spring Boot集成Netty和Websocket
在Spring Boot應(yīng)用程序中,我們可以通過(guò)集成Netty,實(shí)現(xiàn)后臺(tái)向前端推送信息的功能。首先,我們需要添加Netty依賴(lài),然后在Spring Boot應(yīng)用程序中創(chuàng)建一個(gè)NettyServer類(lèi),用于初始化Websocket通道。
1.引入依賴(lài)
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.111.Final</version> </dependency>
2.創(chuàng)建 NettyConfig 配置管理所有管道
import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("all") public class NettyConfig { /** * 定義全局單例channel組 管理所有channel */ private static volatile ChannelGroup channelGroup = null; /** * 存放請(qǐng)求ID與channel的對(duì)應(yīng)關(guān)系 */ private static volatile ConcurrentHashMap<String, Channel> channelMap = null; /** * 定義兩把鎖 */ private static final Object lock1 = new Object(); private static final Object lock2 = new Object(); public static ChannelGroup getChannelGroup() { if (null == channelGroup) { synchronized (lock1) { if (null == channelGroup) { channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } } } return channelGroup; } public static ConcurrentHashMap<String, Channel> getChannelMap() { if (null == channelMap) { synchronized (lock2) { if (null == channelMap) { channelMap = new ConcurrentHashMap<>(); } } } return channelMap; } public static Channel getChannel(String userId) { if (null == channelMap) { return getChannelMap().get(userId); } return channelMap.get(userId); } }
3.創(chuàng)建MyChannelHandlerPool 通道組池
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * MyChannelHandlerPool * 通道組池,管理所有websocket連接 */ public class MyChannelHandlerPool { private MyChannelHandlerPool(){} public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
4.創(chuàng)建NettyServer 初始化Netty
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * NettyServer Netty服務(wù)器配置 */ @Slf4j @Component @SuppressWarnings("all") public class NettyServer { private String url = "/admin/socket"; public NettyServer() {} public void start() throws Exception { // 主事件組 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 執(zhí)行事件組 EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); int port = 12345; serverBootstrap.group(group, bossGroup) // 綁定線(xiàn)程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(port)// 綁定監(jiān)聽(tīng)端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 綁定客戶(hù)端連接時(shí)候觸發(fā)操作 @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ObjectEncoder和ObjectDecoder來(lái)處理對(duì)象的序列化和反序列化 log.info("收到新連接"); //websocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http解編碼器 ch.pipeline().addLast(new HttpServerCodec()); //以塊的方式來(lái)寫(xiě)的處理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketHandler()); ch.pipeline().addLast(new WebSocketServerProtocolHandler(url, null, true, 65536 * 10)); } }); // 綁定端口并同步等待直到綁定完成 ChannelFuture future = serverBootstrap.bind().sync(); log.info(NettyServer.class.getName() + "啟動(dòng)正在監(jiān)聽(tīng): " + future.channel().localAddress()); // 等待服務(wù)器通道關(guān)閉 future.channel().closeFuture().sync(); } finally { // 釋放線(xiàn)程池資源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } } }
5. 創(chuàng)建 WebSocketHandler 執(zhí)行任務(wù)
@Slf4j @Component @SuppressWarnings("all") public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static final String NETTY_START = "Netty-start"; public WebSocketHandler() {} private ScheduledFuture<?> sendDataTask; @Autowired private DeviceLevelFourService deviceLevelFourService; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加到channelGroup通道組 MyChannelHandlerPool.channelGroup.add(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("與客戶(hù)端斷開(kāi)連接,通道關(guān)閉!"); //添加到channelGroup 通道組 MyChannelHandlerPool.channelGroup.remove(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 首次連接是FullHttpRequest,處理參數(shù) if (msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; String uri = request.uri(); Map<String, String> paramMap = getUrlParams(uri); log.info("接收到的參數(shù)是:" + JSON.toJSONString(paramMap)); // 如果url包含參數(shù),需要處理 if (uri.contains("?")) { String newUri = uri.substring(0, uri.indexOf("?")); log.info(newUri); request.setUri(newUri); } // 當(dāng)連接建立時(shí),啟動(dòng)定時(shí)任務(wù) sendDataTask = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() { @Override public void run() { try { if (deviceLevelFourService == null) { deviceLevelFourService = SpringContextUtil.getBean(DeviceLevelFourServiceImpl.class); } // 此處可以接收路徑參數(shù) ,直接獲取前端傳遞參數(shù) // "ws://localhost:12345/admin/socket?id=1" String deviceId = paramMap.get("id"); /** * -------此處為自己的數(shù)據(jù)--------- */ // 調(diào)用service 得到前端需要的數(shù)據(jù),用JSON工具類(lèi)轉(zhuǎn)換推送到前端 List<DeviceNettyData> deviceNettyData = deviceLevelFourService.handlerDeviceData(Long.parseLong(deviceId)); String json = JSON.toJSONString(deviceNettyData, SerializerFeature.WriteMapNullValue); log.info(json); // 將 JSON 字符串封裝為 TextWebSocketFrame TextWebSocketFrame frameNetty = new TextWebSocketFrame(json); ctx.writeAndFlush(frameNetty); // 發(fā)送 WebSocket 幀 } catch (Exception e) { log.error(e.getMessage(), e); } } }, 0, 30, TimeUnit.SECONDS); // 立即開(kāi)始,每30秒發(fā)送一次 // 調(diào)用父類(lèi)方法,處理下一個(gè)handler super.channelRead(ctx, request); } else if (msg instanceof TextWebSocketFrame frame) { // 正常的TEXT消息類(lèi)型 sendAllMessage(frame.text()); // 繼續(xù)傳遞給后續(xù)handler super.channelRead(ctx, frame); } else { // 如果消息類(lèi)型不匹配,記錄警告或處理異常情況 log.error("未處理的消息類(lèi)型:" + msg.getClass()); super.channelRead(ctx, msg); // 仍然傳遞給后續(xù)處理 } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { log.info(channelHandlerContext.name()); } //讀取完成刷新 @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } //異常則關(guān)閉ChannelHandlerContext連接 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } private void sendAllMessage(String message){ //收到信息后,群發(fā)給所有channel MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message)); } private static Map<String ,String> getUrlParams(String url){ Map<String,String> map = new HashMap<>(); url = url.replace("?",";"); if (!url.contains(";")){ return map; } if (url.split(";").length > 0){ String[] arr = url.split(";")[1].split("&"); for (String s : arr){ String key = s.split("=")[0]; String value = s.split("=")[1]; map.put(key,value); } return map; }else{ return map; } } }
6.創(chuàng)建NettyServerRunner 用來(lái)使用新線(xiàn)程調(diào)用NetterServer
@Slf4j public class NettyServerRunner implements Runnable { @Override public void run() { try { new NettyServer().start(); } catch (Exception e) { // 使用Logger進(jìn)行日志記錄 log.error(e.getMessage(), e); } } }
7. 最后可以在隨意注入到spring容器類(lèi)中,在項(xiàng)目啟動(dòng)時(shí)候調(diào)用,也可以在其他訪(fǎng)問(wèn)接口事件調(diào)用
// 方式一 @PostConstruct public void init(){ // 在新的線(xiàn)程中運(yùn)行Netty服務(wù)器 Thread thread = new Thread(new NettyServerRunner()); thread.start(); } // 方式二調(diào)用 @GetMapping("/test/{id}") public R getDevice(@PathVariable Long id){ // 在新的線(xiàn)程中運(yùn)行Netty服務(wù)器 Thread thread = new Thread(new NettyServerRunner()); thread.start(); return deviceService.getDeviceAlarmCount(deviceId); }
注意:
使用新線(xiàn)程時(shí)候,spring容器注入的對(duì)象為空,容易產(chǎn)生空指針異常,可以借鑒WebSocketHandler 類(lèi)中方法,重新從spring容器中獲取需要的對(duì)象。
總結(jié)
到此這篇關(guān)于利用Netty+SpringBoot實(shí)現(xiàn)定時(shí)后端向前端推送數(shù)據(jù)的文章就介紹到這了,更多相關(guān)SpringBoot定時(shí)后端向前端推送數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 解決gateway報(bào)netty堆外內(nèi)存溢出io.netty.util.internal.OutOfDirectMemoryError
- springboot接入netty實(shí)現(xiàn)在線(xiàn)統(tǒng)計(jì)人數(shù)
- Netty如何自定義編碼解碼器
- 基于Netty實(shí)現(xiàn)WebSocket的常用處理器及區(qū)別解析
- springboot整合netty實(shí)現(xiàn)心跳檢測(cè)和自動(dòng)重連
- SpringCloud整合Netty集群實(shí)現(xiàn)WebSocket的示例代碼
- io.netty項(xiàng)目UDP實(shí)現(xiàn)方式
相關(guān)文章
Java并發(fā)編程之關(guān)鍵字volatile知識(shí)總結(jié)
今天帶大家學(xué)習(xí)java的相關(guān)知識(shí),文章圍繞著Java關(guān)鍵字volatile展開(kāi),文中有非常詳細(xì)的知識(shí)總結(jié),需要的朋友可以參考下2021-06-06IDEA SpringBoot項(xiàng)目配置熱更新的步驟詳解(無(wú)需每次手動(dòng)重啟服務(wù)器)
這篇文章主要介紹了IDEA SpringBoot項(xiàng)目配置熱更新的步驟,無(wú)需每次手動(dòng)重啟服務(wù)器,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-04-04springboot整合Mybatis-plus的實(shí)現(xiàn)
這篇文章主要介紹了springboot整合Mybatis-plus的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Spring Boot JPA訪(fǎng)問(wèn)Mysql示例
本篇文章主要介紹了Spring Boot JPA訪(fǎng)問(wèn)Mysql示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-03-03java web項(xiàng)目里ehcache.xml介紹
java web項(xiàng)目里ehcache.xml介紹,需要的朋友可以參考一下2013-03-03java簡(jiǎn)單實(shí)現(xiàn)桌球滾動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了java簡(jiǎn)單實(shí)現(xiàn)桌球滾動(dòng)效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-10-10SpringBoot優(yōu)雅地實(shí)現(xiàn)全局異常處理的方法詳解
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何優(yōu)雅地實(shí)現(xiàn)全局異常處理,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-08-08Springboot通過(guò)請(qǐng)求頭獲取當(dāng)前用戶(hù)信息方法詳細(xì)示范
這篇文章主要介紹了Springboot通過(guò)請(qǐng)求頭獲取當(dāng)前用戶(hù)信息的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2022-11-11