SpringBoot整合Netty+Websocket實現(xiàn)消息推送的示例代碼
前言
Netty是一個高性能、異步事件驅(qū)動的網(wǎng)絡(luò)應用框架,用于快速開發(fā)可維護的高性能協(xié)議服務(wù)器和客戶端。以下是Netty的主要優(yōu)勢:
- 高性能:Netty基于NIO(非阻塞IO)模型,采用事件驅(qū)動的設(shè)計,具有高性能的特點。它通過零拷貝技術(shù)、內(nèi)存池化技術(shù)等手段,進一步提高了IO性能,降低了資源消耗。
- 易用性:Netty提供了豐富的API和功能,如對TCP、UDP和文件傳輸?shù)闹С?,以及對SSL/TLS、壓縮、編解碼等功能的內(nèi)置實現(xiàn)。這些功能簡化了網(wǎng)絡(luò)應用的開發(fā),降低了學習和使用的難度。
- 可擴展性:Netty采用了模塊化設(shè)計,各個模塊之間耦合度低,易于擴展。開發(fā)者可以根據(jù)需要定制和擴展Netty的功能,如添加新的編解碼器、處理器或協(xié)議。
- 穩(wěn)定性:Netty經(jīng)過了大規(guī)模生產(chǎn)環(huán)境的驗證,具有高穩(wěn)定性。它通過合理的線程模型、資源管理和錯誤處理機制,保證了系統(tǒng)的穩(wěn)定性和可靠性。
- 社區(qū)活躍:Netty擁有一個活躍的開源社區(qū),不斷有新的功能和優(yōu)化被貢獻出來。這為開發(fā)者提供了強大的支持,也促進了Netty的發(fā)展和完善。
- 跨平臺性:Netty可以在多種操作系統(tǒng)和平臺上運行,如Windows、Linux和Mac OS等。這一特性使得開發(fā)者可以輕松地在不同環(huán)境下部署和維護網(wǎng)絡(luò)應用。
WebSocket 是一種網(wǎng)絡(luò)通信協(xié)議,相比傳統(tǒng)的HTTP協(xié)議,它具有以下優(yōu)勢:
- 實時性:WebSocket 允許服務(wù)器主動向客戶端推送數(shù)據(jù),從而實現(xiàn)實時通信,這對于需要即時反饋的應用(如在線游戲、聊天應用等)至關(guān)重要。
- 全雙工通信:WebSocket 支持雙向通信,服務(wù)器和客戶端可以同時發(fā)送和接收數(shù)據(jù),提高了通信的靈活性。
- 節(jié)省帶寬:由于 WebSocket 在單個 TCP 連接上運行,避免了為每個消息創(chuàng)建新連接所需的額外開銷,減少了數(shù)據(jù)傳輸量。
- 更好的二進制支持:WebSocket 定義了二進制幀,可以更輕松地處理二進制內(nèi)容,如圖片、音視頻等。
- 跨域通信:WebSocket 支持跨域通信,使得客戶端可以與不同域名的服務(wù)器進行通信,增加了應用的靈活性和可訪問性。
- 可擴展性:WebSocket 定義了擴展機制,用戶可以根據(jù)需要擴展協(xié)議或?qū)崿F(xiàn)自定義的子協(xié)議。
- 更好的支持實時應用:由于 WebSocket 的實時性和全雙工通信特性,它特別適合用于需要實時反饋的應用,例如在線游戲、實時音視頻聊天等。
- 更快的傳輸速度:由于 WebSocket 減少了不必要的連接和狀態(tài)轉(zhuǎn)換,通信速度更快。
- 更低的延遲:由于 WebSocket 建立的是持久連接,減少了建立和關(guān)閉連接的開銷,從而降低了通信延遲。
- 更強的兼容性:雖然 WebSocket 協(xié)議并未在所有瀏覽器中得到完全支持,但有各種庫和框架可以幫助實現(xiàn)兼容性,例如通過 polyfill 技術(shù)。
說明:以下為SpringBoot整合Netty+Websocket實現(xiàn)實時的消息通訊
一、引入Netty依賴
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>二、 使用步驟
1.配置服務(wù)啟動類
package com.pzg.chat.communication;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class WebSocketNettyServer {
@Autowired
WebSocketChannelInitializer webSocketChannelInitializer;
/**
* Netty服務(wù)器啟動對象
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();;
@PostConstruct
public void WebSocketNettyServerInit() {
// 初始化服務(wù)器啟動對象
// 主線程池
NioEventLoopGroup mainGrp = new NioEventLoopGroup();
// 從線程池
NioEventLoopGroup subGrp = new NioEventLoopGroup();
serverBootstrap
// 指定使用上面創(chuàng)建的兩個線程池
.group(mainGrp, subGrp)
// 指定Netty通道類型
.channel(NioServerSocketChannel.class)
// 指定通道初始化器用來加載當Channel收到事件消息后
.childHandler(webSocketChannelInitializer);
}
public void start() throws InterruptedException {
// 綁定服務(wù)器端口,以異步的方式啟動服務(wù)器
ChannelFuture future = serverBootstrap.bind("0.0.0.0",8089).sync();
if (future.isSuccess()){
log.info("netty初始化完成,端口8088");
}
}
}
說明:@PostConstruct用來保證容器初始化后觸發(fā)該注解下的方法
2.Netty服務(wù)初始化器
package com.pzg.chat.communication;
import com.pzg.chat.handler.ChatHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private ChatHandler chatHandler;
@Override
protected void initChannel(SocketChannel socketChannel) {
//獲取對應的管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
//添加HTTP編碼解碼器
.addLast(new HttpServerCodec())
//添加對大數(shù)據(jù)流的支持
.addLast(new ChunkedWriteHandler())
//添加聚合器
.addLast(new HttpObjectAggregator(1024 * 64))
//設(shè)置websocket連接前綴前綴
//心跳檢查(8秒)
.addLast(new IdleStateHandler(8,0,0))
//添加自定義處理器
.addLast(new WebSocketServerProtocolHandler("/ws",null,true))
.addLast(chatHandler);
}
}
3.自定義處理器類
package com.pzg.chat.handler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.pzg.chat.communication.context.impl.WebSocketContext;
import com.pzg.chat.service.ChannelService;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ChannelHandler.Sharable
@SuppressWarnings("all")
@Component
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static ChatHandler chatHandler;
@Autowired
private ChannelService channelService;
@Autowired
private WebSocketContext webSocketContext;
@PostConstruct
public void init() {
chatHandler = this;
}
/**
* 創(chuàng)建ChannelGroup對象存儲所有連接的用戶
*/
private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 有新消息時會調(diào)用這個方法
*
* @param channelHandlerContext 上下文處理器
* @param textWebSocketFrame 文本
* @throws Exception
*/
@Override
@SuppressWarnings("all")
public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
System.out.println(frame.getClass());
if (frame instanceof FullHttpRequest) {
}
//判斷是否為關(guān)閉事件
if (frame instanceof CloseWebSocketFrame) {
ctx.channel().close();
return;
}
if (frame instanceof PingWebSocketFrame) {
return;
}
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
JSONObject jsonObject = JSON.parseObject(textWebSocketFrame.text());
webSocketContext.executeWebSocketContext(jsonObject,ctx.channel());
//類型轉(zhuǎn)為(前后端達成的消息體)
}
//遍歷出所有連接的通道
}
/**
* 有新的連接建立時
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//加入通道組
clients.add(ctx.channel());
}
/**
* 不活躍時會調(diào)用這個方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除出通道組
try {
channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
channelService.deleteChannelBindUserId(ctx.channel());
}catch (Exception e){
}
clients.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 獲取參數(shù)
super.channelActive(ctx);
}
//檢查客戶端寫心跳事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
try {
channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText());
channelService.deleteChannelBindUserId(ctx.channel());
}catch (Exception e){
}
clients.remove(channel);
channel.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
說明:webSocketContext.executeWebSocketContext(jsonObject,ctx.channel()); 為自己定義的處理消息的類,textWebSocketFrame.text()為對應的消息,可自行處理
4.緩存用戶Channel接口和對應實現(xiàn)類
1.接口
package com.pzg.chat.service;
import io.netty.channel.Channel;
public interface ChannelService {
void setUserIdBindChannel(Channel channel);
void setIdBindChannel(String id,Channel channel);
void setChannelBindUserId(Channel channel);
String getChannelBindUserId(Channel channel);
void deleteChannelBindUserId(Channel channel);
void deleteBindUserIdChannel();
void deleteBindIdChannel(String id);
void setUserIdAndIdBindChannel(String id ,Channel channel);
void deleteBindUserIdAndIdChannel(String id);
Channel getUserIdChannel(String userId);
Channel getIdBindChannel(String Id);
}
2.實現(xiàn)類
package com.pzg.chat.service.impl;
import com.pzg.chat.service.ChannelService;
import com.pzg.chat.utils.UserUtil;
import io.netty.channel.Channel;
import org.springframework.stereotype.Service;
import java.util.*;
@Service
public class ChannelServiceImpl implements ChannelService {
//保存用戶id和channel的映射
public static HashMap<String,Channel> userIdChannel = new HashMap<>();
//保存channelId和channel映射關(guān)系
public static HashMap<String,Channel> idChannel = new HashMap<>();
//保存channel和userID映射關(guān)系
public static HashMap<Channel,String> ChannelUserId = new HashMap<>();
@Override
public void setUserIdBindChannel(Channel channel) {
String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
userIdChannel.put(userId,channel);
}
@Override
public void setIdBindChannel(String id, Channel channel) {
idChannel.put(id,channel);
}
@Override
public void setChannelBindUserId(Channel channel) {
String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
System.out.println("----------------------->"+userId);
ChannelUserId.put(channel,userId);
}
@Override
public String getChannelBindUserId(Channel channel) {
return ChannelUserId.get(channel);
}
@Override
public void deleteChannelBindUserId(Channel channel) {
ChannelUserId.remove(channel);
}
@Override
public void deleteBindUserIdChannel() {
String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId());
userIdChannel.remove(userId);
}
@Override
public void deleteBindIdChannel(String id) {
idChannel.remove(id);
}
@Override
public void setUserIdAndIdBindChannel(String id, Channel channel) {
setUserIdBindChannel(channel);
setIdBindChannel(id,channel);
}
@Override
public void deleteBindUserIdAndIdChannel(String id) {
deleteBindIdChannel(id);
deleteBindUserIdChannel();
}
@Override
public Channel getUserIdChannel(String userId) {
return userIdChannel.get(userId);
}
@Override
public Channel getIdBindChannel(String Id) {
return idChannel.get(Id);
}
}
說明:緩存Channel主要保證消息能發(fā)送到對應的Channel中,消息可攜帶用戶id通過id查找Channel,將信息存入即可 ,通過channel.writeAndFlush(new TextWebSocketFrame("消息內(nèi)容"));刷出消息。
5.調(diào)用start()啟動Netty服務(wù)
package com.pzg.chat.listener;
import com.pzg.chat.communication.WebSocketNettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
@Component
public class NettyStartListener implements ApplicationListener<ContextRefreshedEvent> {
/**
* 注入啟動器
*/
@Autowired
private WebSocketNettyServer webSocketNettyServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//判斷event上下文中的父級是否為空
if (event.getApplicationContext().getParent() == null) {
try {
//為空則調(diào)用start方法
webSocketNettyServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
6.Websocket配置
// 導出socket對象
import {getToken} from '@/utils/auth';
export {
socket
}
import { Message } from 'element-ui'
import {header} from "../../listening/header";
import {asidefriend} from "../../listening/asidefriend";
import {chatbox} from "../../listening/chatbox";
import {chatcontent} from "../../listening/chatcontent";
import {videocalls} from "../../listening/videocalls";
import {voicecalls} from "../../listening/voicecalls";
// socket主要對象
var socket = {
websock: null,
ws_url: "ws://localhost:8089/ws",
/**
* 開啟標識
* */
socket_open: false,
/**
* 心跳timer
* */
hearbeat_timer: null,
/**
* 心跳發(fā)送頻率
* */
hearbeat_interval: 5000,
/**
* 是否開啟重連
* */
is_reonnect: true,
/**
* 重新連接的次數(shù)
* */
reconnect_count: 3,
/**
* 當前重新連接的次數(shù),默認為:1
* */
reconnect_current: 1,
/**
* 重新連接的時間類型
* */
reconnect_timer: null,
/**
* 重新連接的間隔
* */
reconnect_interval: 3000,
i : 0,
timer:null,
/**
* 初始化連接
*/
init: () => {
if (!("WebSocket" in window)) {
Message({
message: '當前瀏覽器與網(wǎng)站不兼容丫',
type: 'error',
});
return null
}
if (socket.websock && socket.websock.readyState===1) {
return socket.websock
}
socket.websock = new WebSocket(socket.ws_url)
//接收消息
socket.websock.onmessage = function (e) {
//調(diào)用處理消息的方法
socket.receive(e)
}
// 關(guān)閉連接
socket.websock.onclose = function (e) {
clearInterval(socket.hearbeat_interval);
socket.socket_open = false;
if (socket.websock!=null){
header.getWebsocketStatus(socket.websock.readyState);
}
// 需要重新連接
if (socket.is_reonnect) {
socket.reconnect_timer = setTimeout(() => {
// 超過重連次數(shù)
if (socket.reconnect_current > socket.reconnect_count) {
clearTimeout(socket.reconnect_timer)
return
}
// 記錄重連次數(shù)
socket.reconnect_current++
socket.reconnect()
}, socket.reconnect_interval)
}
}
// 連接成功
socket.websock.onopen = function () {
// Message({
// message: '連接成功',
// type: 'success',
// });
header.getWebsocketStatus(socket.websock.readyState);
let data = {
"action": 10002,
"token":getToken(),
"chatMsg": null,
"extend": 1,
};
socket.send(data);
socket.socket_open = true;
socket.is_reonnect = true;
//重修刷新好友內(nèi)容
window.dispatchEvent(new CustomEvent('connectInit'));
// 開啟心跳
socket.heartbeat()
};
// 連接發(fā)生錯誤
socket.websock.onerror = function (err) {
Message({
message: '服務(wù)連接發(fā)送錯誤!',
type: 'error',
});
}
},
/**
* 獲取websocket對象
* */
getSocket:()=>{
//創(chuàng)建了直接返回,反之重來
if (socket.websock) {
return socket.websock
}else {
socket.init();
}
},
getStatus:()=> {
if (socket.websock.readyState === 0) {
return "未連接";
} else if (socket.websock.readyState === 1) {
return "已連接";
} else if (socket.websock.readyState === 2) {
return "連接正在關(guān)閉";
} else if (socket.websock.readyState === 3) {
return "連接已關(guān)閉";
}
},
/**
* 發(fā)送消息
* @param {*} data 發(fā)送數(shù)據(jù)
* @param {*} callback 發(fā)送后的自定義回調(diào)函數(shù)
*/
send: (data, callback = null) => {
// 開啟狀態(tài)直接發(fā)送
if (socket.websock!=null && socket.websock.readyState === socket.websock.OPEN) {
try {
socket.websock.send(JSON.stringify(data));
}catch (e) {
if (socket.timer !=null){
return
}
socket.timer = setInterval(()=>{
if (i>=6){
clearInterval(socket.timer);
socket.timer = null;
socket.i = 0;
return
}
socket.websock.send(JSON.stringify(data));
socket.i++;
},2000)
}
if (callback) {
callback()
}
// 正在開啟狀態(tài),則等待1s后重新調(diào)用
} else if (socket.websock!=null && socket.websock.readyState === socket.websock.CONNECTING) {
setTimeout(function () {
socket.send(data, callback)
}, 1000)
// 未開啟,則等待1s后重新調(diào)用
} else if (socket.websock!=null){
socket.init();
setTimeout(function () {
socket.send(data, callback)
}, 1000)
}
},
/**
* 接收消息
* @param {*} message 接收到的消息
*/
receive: (message) => {
var recData = JSON.parse(message.data);
/**
*這部分是我們具體的對消息的處理
* */
console.log(recData)
// 自行擴展其他業(yè)務(wù)處理...
},
/**
* 心跳
*/
heartbeat: () => {
if (socket.hearbeat_timer) {
clearInterval(socket.hearbeat_timer)
}
socket.hearbeat_timer = setInterval(() => {
//發(fā)送心跳包
let data = {
"action": 10000,
"token":getToken(),
"chatMsg": null,
"extend": null,
};
socket.send(data)
}, socket.hearbeat_interval)
},
/**
* 主動關(guān)閉連接
*/
close: () => {
if (socket.websock==null){
return
}
let data = {
"action": 10002,
"token":getToken(),
"chatMsg": null,
"extend": 0,
};
socket.send(data);
clearInterval(socket.hearbeat_interval);
socket.is_reonnect = false;
socket.websock.close();
header.getWebsocketStatus(socket.websock.readyState);
socket.websock=null
},
/**
* 重新連接
*/
reconnect: () => {
if (socket.websock && socket.socket_open) {
socket.websock.close()
}
socket.init()
},
}
說明:通過登入后,在某個全局頁面中調(diào)用socket.start()即可連接netty服務(wù)器,通過socket.send("消息")來發(fā)送消息。
三、結(jié)束語
到此這篇關(guān)于SpringBoot整合Netty+Websocket實現(xiàn)消息推送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty Websocket消息推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis 中Mapper使用package方式配置報錯的解決方案
這篇文章主要介紹了Mybatis 中Mapper使用package方式配置報錯的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
java使用this調(diào)用構(gòu)造函數(shù)的實現(xiàn)方法示例
這篇文章主要介紹了java使用this調(diào)用構(gòu)造函數(shù)的實現(xiàn)方法,結(jié)合實例形式分析了java面向?qū)ο蟪绦蛟O(shè)計中函數(shù)調(diào)用相關(guān)操作技巧,需要的朋友可以參考下2019-08-08
Java實現(xiàn)二叉樹的建立、計算高度與遞歸輸出操作示例
這篇文章主要介紹了Java實現(xiàn)二叉樹的建立、計算高度與遞歸輸出操作,結(jié)合實例形式分析了Java二叉樹的創(chuàng)建、遍歷、計算等相關(guān)算法實現(xiàn)技巧,需要的朋友可以參考下2019-03-03
java 使用HttpURLConnection發(fā)送數(shù)據(jù)簡單實例
這篇文章主要介紹了java 使用HttpURLConnection發(fā)送數(shù)據(jù)簡單實例的相關(guān)資料,需要的朋友可以參考下2017-06-06
Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對比
批量插入功能是我們?nèi)粘9ぷ髦斜容^常見的業(yè)務(wù)功能之一,下面這篇文章主要給大家介紹了關(guān)于Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對比的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-01-01
Java語言實現(xiàn)非遞歸實現(xiàn)樹的前中后序遍歷總結(jié)
今天小編就為大家分享一篇關(guān)于Java語言實現(xiàn)非遞歸實現(xiàn)樹的前中后序遍歷總結(jié),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-01-01

