SpringBoot+WebSocket實現(xiàn)IM及時通訊的代碼示例
1、Springboot集成Websocket
集成分為三步:添加依賴、增加配置類和消息核心類、前端集成。
1.1、添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.1.13.RELEASE</version> </dependency>
1.2、增加WebSocket配置類
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocket配置 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
1.3、增加消息核心類WebSocketServer
@ServerEndpoint("/trtc/websocket/{userId}") @Component @Slf4j public class WebSocketServer { // 消息存儲 private static MessageStore messageStore; // 消息發(fā)送 private static MessageSender messageSender; public static void setMessageStore(MessageStore messageStore) { WebSocketServer.messageStore = messageStore; } public static void setMessageSender(MessageSender messageSender) { WebSocketServer.messageSender = messageSender; } /** * 連接建立成功調(diào)用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { messageStore.saveSession(session); } /** * 連接關(guān)閉調(diào)用的方法 */ @OnClose public void onClose(Session session, @PathParam("userId") String userId) { messageStore.deleteSession(session); } /** * 收到客戶端消息后調(diào)用的方法 * * @ Param message 客戶端發(fā)送過來的消息 */ @OnMessage public void onMessage(String message, Session session) throws Exception { log.warn("=========== 收到來自窗口" + session.getId() + "的信息:" + message); handleTextMessage(session, new TextMessage(message)); } /** * @param session * @param error */ @OnError public void onError(Session session, @PathParam("userId") String userId, Throwable error) { log.error("=========== 發(fā)生錯誤"); error.printStackTrace(); // msgStore.deleteSession(session); } public void handleTextMessage(Session session, TextMessage message) throws Exception { log.warn("=========== Received message: {}", message.getPayload()); } }
1.4、前端頁面加入socket
<!DOCTYPE html> <html xmlns="http://www.w3.org/1999/html"> <head> <title>WebSocket Example</title> </head> <body> 登錄用戶ID:<input type="text" id="sendUserId" /></br> 接受用戶ID:<input type="text" id="receivedUserId" /></br> 發(fā)送消息內(nèi)容:<input type="text" id="messageInput" /></br> 接受消息內(nèi)容:<input type="text" id="messageReceive" /></br> <button onclick="sendMessage()">Send</button> <script> var socket = new WebSocket("ws://localhost:8080/trtc/websocket/aaa"); var roomId = "123456"; // 隨機產(chǎn)出六位數(shù)字 var sendUserId = Math.floor(Math.random() * 1000000); document.getElementById("sendUserId").value = sendUserId; var messageReceive = document.getElementById("messageReceive"); socket.onopen = function (event) { console.log("WebSocket is open now."); let loginInfo = { msgType: 2, //登錄消息 sendUserId: sendUserId, bizType: 1, //業(yè)務(wù)類型 bizOptModule: 1, //業(yè)務(wù)模塊 roomId: roomId, msgBody: {}, }; socket.send(JSON.stringify(loginInfo)); }; socket.onmessage = function (event) { var message = event.data; console.log("Received message: " + message); messageReceive.value = message; }; socket.onclose = function (event) { console.log("WebSocket is closed now."); }; function sendMessage() { var message = document.getElementById("messageInput").value; var receivedUserId = document.getElementById("receivedUserId").value; let operateInfo = { msgType: 100, //業(yè)務(wù)消息 sendUserId: sendUserId, bizType: 1, //業(yè)務(wù)類型 bizOptModule: 1, //業(yè)務(wù)模塊 roomId: roomId, receivedUserId: receivedUserId, msgBody: { operateType: 1, //操作類型:禁言 operateContent: "1", }, }; socket.send(JSON.stringify(operateInfo)); } setInterval(() => { socket.send("ping"); }, 30000); </script> </body> </html>
2、小型及時通訊包含的模塊
以上只是集成了websocket框架,實現(xiàn)了基本的全雙工通信,服務(wù)器和客戶端都可以同時發(fā)送和接收數(shù)據(jù)。要想實現(xiàn)一些小型完整的及時通訊,還需要具備以下幾個核心模塊。也通過一張流程圖展示了其中過程。
2.1、消息對象模型
組織消息內(nèi)容,比如消息類型、發(fā)送者用戶ID、接受者用戶ID、具體的消息體等。比如:
public class SocketMsg<T> { /** * 消息類型:1心跳 2登錄 3業(yè)務(wù)操作 */ private Integer msgType; /** * 發(fā)送者用戶ID */ private String sendUserId; /** * 接受者用戶ID */ private String receivedUserId; /** * 業(yè)務(wù)類型 */ private Integer bizType; /** * 業(yè)務(wù)操作模塊 */ private Integer bizOptModule; /** * 消息內(nèi)容 */ private T msgBody; }
2.2、消息存儲模塊
負(fù)責(zé)存儲消息內(nèi)容、用戶ID和sessionID的關(guān)系,防止數(shù)據(jù)丟失或者服務(wù)器重啟等。
2.3、消息發(fā)送模塊
功能開發(fā)完畢,一般部署到分布式集群環(huán)境,所以通訊時session會分布在多臺服務(wù)器。比如用戶A的session在機器1,用戶B的session在機器2,此時A發(fā)送給B,就無法單獨通過機器1完成。
因為機器1拿不到機器2里的session,所以消息發(fā)不過去。此時只能借助別的中間件來實現(xiàn),比如借助消息中間件kafka實現(xiàn)。
機器1將消息發(fā)送給kafka,然后機器1和機器2都監(jiān)聽kafka,然后查看用戶對應(yīng)的session是否在本機,如果在本機則發(fā)送出去。
2.4、消息推送模塊
模塊3提到的消息發(fā)送流程中,消息發(fā)送給 消息中間件,然后服務(wù)器消費到消費,在通過本機的session推送給客戶端。
2.5、架構(gòu)圖
3、遇到的幾個問題
3.1、連接自動斷開
webSocket連接之后,發(fā)現(xiàn)一個問題:就是每隔一段時間如果不傳送數(shù)據(jù)的話,與前端的連接就會自動斷開。采用心跳消息的方式,就可以解決這個問題。比如客服端每隔30秒自動發(fā)送ping消息給服務(wù)端,服務(wù)端返回pong。
3.2、Session無法被序列化
分布式場景會存在這樣一個問題,當(dāng)一次請求負(fù)載到第一臺服務(wù)器時,session在第一臺服務(wù)器線程上,第二次請求,負(fù)載到第二臺服務(wù)器上,此時通過userId查找當(dāng)前用戶的session時,是查找不到的。
本來想著把session存入到redis中,就可以從redis獲取用戶的session,希望用這種方式來解決分布式場景下消息發(fā)送的問題。結(jié)果出現(xiàn)如下錯誤:
The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method
翻看了session源碼,發(fā)現(xiàn)session無法被序列化。所以這個方案只能放棄。解決方案請看下面的問題5
或者上面的架構(gòu)圖
。
3.3、對象無法自動注入
使用了@ServerEndpoint
注解的類中使用@Resource
或@Autowired
注入對象都會失敗,并且報空指針異常。
原因是WebSocket
服務(wù)是線程安全的,那么當(dāng)我們?nèi)グl(fā)起一個ws
連接時,就會創(chuàng)建一個端點對象。WebSocket
服務(wù)是多對象的,不是單例的。而我們的Spring
的Bean
默認(rèn)就是單例的,在非單例類中注入一個單例的Bean
是沖突的。
或者說:
Spring管理采用單例模式(singleton
),而 WebSocket 是多對象的,即每個客戶端對應(yīng)后臺的一個 WebSocket 對象,也可以理解成 new 了一個 WebSocket,這樣當(dāng)然是不能獲得自動注入的對象了,因為這兩者剛好沖突。
@Autowired
注解注入對象操作是在啟動時執(zhí)行的,而不是在使用時,而 WebSocket 是只有連接使用時才實例化對象,且有多個連接就有多個對象。所以我們可以得出結(jié)論,這個 Service 根本就沒有注入到 WebSocket 當(dāng)中。
如何解決呢?
使用靜態(tài)對象,并且對外暴露set方法,這樣在對象初始化的時候,將其注入到WebSocketServer
中。比如說這樣:
@ServerEndpoint("/trtc/websocket/{userId}") @Component @Slf4j public class WebSocketServer { private static MessageStore messageStore; private static MessageSender messageSender; public static void setMessageStore(MessageStore messageStore) { WebSocketServer.messageStore = messageStore; } public static void setMessageSender(MessageSender messageSender) { WebSocketServer.messageSender = messageSender; } } @Slf4j @Service public class MessageStore { @Autowired private RedisTemplate<String, Object> redisTemplate; @PostConstruct public void init() { WebSocketServer.setMessageStore(this); } }
3.4、分布式場景消息如何發(fā)給客戶端
問題2
中提到了分布式場景下存在的session不在本機的問題,這種場景可以通過發(fā)送消息中間件的方式解決。具體這樣解決:
每次連接時,都將userId和對應(yīng)的session存入到本機,發(fā)送消息時,直接發(fā)送給MQ-Broker,然后每臺應(yīng)用負(fù)載都去消費這個消息,拿到消息之后,判斷在本機能根據(jù)userId是否能找到session,找到則推送到客戶端。
3.5、部署時Nginx配置問題
代碼開發(fā)完畢之后,本機跑通后,然后部署到服務(wù)器之后,還差很重要的一步:配置nginx代理。
3.5.1、給后端應(yīng)用部署獨立域名
要給后端應(yīng)用部署獨立域名,nginx代理直接轉(zhuǎn)發(fā)到應(yīng)用的獨立域名,不要走微服務(wù)的gateway網(wǎng)關(guān)轉(zhuǎn)發(fā)過去。
3.5.2、多層nginx轉(zhuǎn)發(fā)問題
當(dāng)只有一層nginx的時候,配置比較簡單,如下:
location ~* ^/api/websocket/* { proxy_pass http://mangodwsstest.mangod.top; proxy_read_timeout 300s; proxy_send_timeout 300s; proxy_set_header Host mangodwsstest.mangod.top; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; proxy_set_header X-Real-IP $remote_addr; }
但是,當(dāng)有兩層nginx轉(zhuǎn)發(fā)的時候,問題就出現(xiàn)了。
在最外層的nginx需要使用如下配置,不能照抄后面一層的配置。proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for
和proxy_set_header X-Forwarded-Proto $scheme
這兩個配置不能少,用來將協(xié)議和真實IP傳遞給后面一層的nginx。
location ~* ^/api/websocket/* { proxy_pass http://mangodwsstest.mangod.top; proxy_read_timeout 300s; proxy_send_timeout 300s; proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; }
4、完整代碼
頁面效果如下
開啟兩個web頁面,用戶1輸入用戶2的用戶ID,輸入發(fā)送消息內(nèi)容,點擊發(fā)送。在用戶2的頁面的接受消息內(nèi)容可以看到發(fā)送的消息。
代碼結(jié)構(gòu)
代碼地址
https://github.com/yclxiao/spring-websocket.git
總結(jié):本文介紹springboot如何集成websocket、IM及時通訊需要哪些模塊、開發(fā)和部署過程中遇到的問題、以及實現(xiàn)小型IM及時通訊的代碼。
以上就是SpringBoot+WebSocket實現(xiàn)IM及時通訊的代碼示例的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot+WebSocket IM及時通訊的資料請關(guān)注腳本之家其它相關(guān)文章!
- Springboot實現(xiàn)人臉識別與WebSocket長連接的實現(xiàn)代碼
- SpringBoot+websocket實現(xiàn)消息對話功能
- SpringBoot 整合WebSocket 前端 uniapp 訪問的詳細(xì)方法
- Springboot+WebSocket+Netty實現(xiàn)在線聊天/群聊系統(tǒng)
- SpringBoot集成WebSocket的兩種方式(JDK內(nèi)置版和Spring封裝版)
- SpringBoot中使用WebSocket的教程分享
- SpringBoot實現(xiàn)WebSocket全雙工通信的項目實踐
- 使用WebSocket+SpringBoot+Vue搭建簡易網(wǎng)頁聊天室的實現(xiàn)代碼
- SpringBoot整合WebSocket實現(xiàn)后端向前端發(fā)送消息的實例代碼
- Springboot+WebSocket實現(xiàn)在線聊天功能
- springboot中websocket簡單實現(xiàn)
- Spring Boot中的WebSocketMessageBrokerConfigurer接口使用
相關(guān)文章
基于springboot和redis實現(xiàn)單點登錄
這篇文章主要為大家詳細(xì)介紹了基于springboot和redis實現(xiàn)單點登錄,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-06-06Spring使用AspectJ注解和XML配置實現(xiàn)AOP
這篇文章主要介紹了Spring使用AspectJ注解和XML配置實現(xiàn)AOP的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-10-10spring和quartz整合,并簡單調(diào)用(實例講解)
下面小編就為大家?guī)硪黄猻pring和quartz整合,并簡單調(diào)用(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07基于序列化存取實現(xiàn)java對象深度克隆的方法詳解
本篇文章是對序列化存取實現(xiàn)java對象深度克隆的方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05Spring?Boot?內(nèi)置工具類ReflectionUtils的實現(xiàn)
ReflectionUtils是一個反射工具類,它封裝了Java反射的操作,使得我們能夠更輕松地操作和訪問類的方法、字段,本文主要介紹了Spring?Boot?內(nèi)置工具類ReflectionUtils的實現(xiàn),感興趣的可以了解一下2023-11-11