Springboot+Stomp協(xié)議實(shí)現(xiàn)聊天功能
前端代碼
這里我對(duì)Stomp.js進(jìn)行了一個(gè)簡(jiǎn)單的封裝,寫在stomp-client.js里面
/** * 對(duì) stomp 客戶端進(jìn)行封裝 */ var client; var subscribes = []; var errorTimes = 0; var endpoint = "/ws"; /** * 建立websocket連接 * @param {Function} onConnecting 開(kāi)始連接時(shí)的回調(diào) * @param {Function} onConnected 連接成功回調(diào) * @param {Function} onError 連接異常或斷開(kāi)回調(diào) */ function connect(onConnecting, onConnected, onError) { onConnecting instanceof Function && onConnecting(); var sock = new SockJS(endpoint); client = Stomp.over(sock); console.log("ws: start connect to " + endpoint); client.connect({}, function (frame) { errorTimes = 0; console.log('connected: ' + frame); // 連接成功后重新訂閱 subscribes.forEach(function (item) { client.subscribe(item.destination, function (resp) { console.debug("ws收到消息: ", resp); item.cb(JSON.parse(resp.body)); }); }); onConnected instanceof Function && onConnected(); }, function (err) { errorTimes = errorTimes > 8 ? 0 : errorTimes; var nextTime = ++errorTimes * 3000; console.warn("與服務(wù)器斷開(kāi)連接," + nextTime + " 秒后重新連接", err); setTimeout(function () { console.log("嘗試重連……"); connect(onConnecting, onConnected, onError); }, nextTime); onError instanceof Function && onError(); }); } /** * 訂閱消息,若當(dāng)前未連接,則會(huì)在連接成功后自動(dòng)訂閱 * * 注意,為防止重連導(dǎo)致重復(fù)訂閱,請(qǐng)勿使用匿名函數(shù)做回調(diào) * * @param {String} destination 目標(biāo) * @param {Function} cb 回調(diào) */ function subscribe(destination, cb) { var exist = subscribes.filter(function (sub) { return sub.destination === destination && sub.cb === cb }); // 防止重復(fù)訂閱 if (exist && exist.length) { return; } // 記錄所有訂閱,在連接成功時(shí)統(tǒng)一處理 subscribes.push({ destination: destination, cb: cb }); if (client && client.connected) { client.subscribe(destination, function (resp) { console.debug("ws收到消息: ", resp); cb instanceof Function && cb(JSON.parse(resp.body)); }); } else { console.warn("ws未連接,暫時(shí)無(wú)法訂閱:" + destination) } } /** * 發(fā)送消息 * @param {String} destination 目標(biāo) * @param {Object} msg 消息體對(duì)象 */ function send(destination, msg) { if (!client) { console.error("客戶端未連接,無(wú)法發(fā)送消息!") } client.send(destination, {}, JSON.stringify(msg)); } window.onbeforeunload = function () { // 當(dāng)窗口關(guān)閉時(shí)斷開(kāi)連接 if (client && client.connected) { client.disconnect(function () { console.log("websocket disconnected "); }); } };
前端的html頁(yè)面index.html如下:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>STOMP</title> </head> <body> <h1 id="tip">Welcome!</h1> <p>狀態(tài): <span id="status"></span></p> <input type="text" id="content" placeholder="請(qǐng)輸入要發(fā)送的消息"> <br> <button onclick="sendTextMsg()">發(fā)送</button> <ul id="ul"> </ul> <script th:src="@{lib/sockjs.min.js}"></script> <script th:src="@{lib/stomp.min.js}"></script> <script th:src="@{stomp-client.js}"></script> <script> connect(function () { statusChange("連接中..."); }, function () { statusChange("在線"); // 注意,為防止重連導(dǎo)致重復(fù)訂閱,請(qǐng)勿使用匿名函數(shù)做回調(diào) subscribe("/user/topic/subNewMsg", onNewMsg); }, function () { statusChange("離線"); }); function onNewMsg(msg) { var li = document.createElement("li"); li.innerText = msg.content; document.getElementById("ul").appendChild(li); } function sendTextMsg() { var content = document.getElementById("content").value; var msg = { msgType: 1, content: content }; send("/app/echo", msg); } function statusChange(status) { document.getElementById("status").innerText = status; } </script> </body> </html>
后端代碼
依賴引入,主要引入下面的包,其它的包略過(guò)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
配置類
@Slf4j @Setter @Configuration @EnableWebSocketMessageBroker @ConfigurationProperties(prefix = "websocket") @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> { private final BrokerConfig brokerConfig; private String[] allowOrigins; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 繼承DefaultHandshakeHandler并重寫determineUser方法,可以自定義如何確定用戶 // 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler) registry.addEndpoint("/ws") .setAllowedOrigins(allowOrigins) .withSockJS(); } /** * 配置消息代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); if (brokerConfig.isUseSimpleBroker()) { // 使用 SimpleBroker // 配置前綴, 有這些前綴的消息會(huì)路由到broker registry.enableSimpleBroker("/topic", "/queue") //配置stomp協(xié)議里, server返回的心跳 .setHeartbeatValue(new long[]{10000L, 10000L}) //配置發(fā)送心跳的scheduler .setTaskScheduler(new DefaultManagedTaskScheduler()); } else { // 使用外部 Broker // 指定前綴,有這些前綴的消息會(huì)路由到broker registry.enableStompBrokerRelay("/topic", "/queue") // 廣播用戶目標(biāo),如果要推送的用戶不在本地,則通過(guò) broker 廣播給集群的其他成員 .setUserDestinationBroadcast("/topic/log-unresolved-user") // 用戶注冊(cè)廣播,一旦有用戶登錄,則廣播給集群中的其他成員 .setUserRegistryBroadcast("/topic/log-user-registry") // 虛擬地址 .setVirtualHost(brokerConfig.getVirtualHost()) // 用戶密碼 .setSystemLogin(brokerConfig.getUsername()) .setSystemPasscode(brokerConfig.getPassword()) .setClientLogin(brokerConfig.getUsername()) .setClientPasscode(brokerConfig.getPassword()) // 心跳間隔 .setSystemHeartbeatSendInterval(10000) .setSystemHeartbeatReceiveInterval(10000) // 使用 setTcpClient 以配置多個(gè) broker 地址,setRelayHost/Port 只能配置一個(gè) .setTcpClient(createTcpClient()); } } /** * 創(chuàng)建 TcpClient 工廠,用于配置多個(gè) broker 地址 */ private ReactorNettyTcpClient<byte[]> createTcpClient() { return new ReactorNettyTcpClient<>( // BrokerAddressSupplier 用于獲取中繼地址,一次只使用一個(gè),如果該中繼出錯(cuò),則會(huì)獲取下一個(gè) client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()), new StompReactorNettyCodec()); } @Override public void onApplicationEvent(BrokerAvailabilityEvent event) { if (!event.isBrokerAvailable()) { log.warn("stomp broker is not available!!!!!!!!"); } else { log.info("stomp broker is available"); } } }
消息處理
@Slf4j @Controller @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class StompController { private final SimpMessageSendingOperations msgOperations; private final SimpUserRegistry simpUserRegistry; /** * 回音消息,將用戶發(fā)來(lái)的消息內(nèi)容加上 Echo 前綴后推送回客戶端 */ @MessageMapping("/echo") public void echo(Principal principal, Msg msg) { String username = principal.getName(); msg.setContent("Echo: " + msg.getContent()); msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); int userCount = simpUserRegistry.getUserCount(); int sessionCount = simpUserRegistry.getUser(username).getSessions().size(); log.info("當(dāng)前本系統(tǒng)總在線人數(shù): {}, 當(dāng)前用戶: {}, 該用戶的客戶端連接數(shù): {}", userCount, username, sessionCount); } }
實(shí)現(xiàn)效果
報(bào)文分析
開(kāi)啟調(diào)試模式,我們根據(jù)報(bào)文來(lái)分析一下前后端互通的報(bào)文
握手
客戶端請(qǐng)求報(bào)文如下
GET ws://localhost:8025/ws/035/5hy4avgm/websocket HTTP/1.1 Host: localhost:8025 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36 Upgrade: websocket Origin: http://localhost:8025 Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9 Cookie: 略 Sec-WebSocket-Key: PlMHmdl2JRzDAVk3feOaeA== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
服務(wù)端響應(yīng)握手請(qǐng)求
HTTP/1.1 101 Upgrade: websocket Connection: upgrade Sec-WebSocket-Accept: 9CKY8n1j/cHoKsWmpmX4pNlQuZg= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 X-Content-Type-Options: nosniff X-XSS-Protection: 1; mode=block Cache-Control: no-cache, no-store, max-age=0, must-revalidate Pragma: no-cache Expires: 0 X-Frame-Options: DENY Date: Thu, 08 Feb 2024 06:58:28 GMT
stomp報(bào)文分析
在瀏覽器消息一欄,我們可以看到長(zhǎng)連接過(guò)程中通信的報(bào)文
下面來(lái)簡(jiǎn)單分析一下stomp的報(bào)文
客戶端請(qǐng)求連接
其中\(zhòng)n表示換行
[ "CONNECT\naccept-version:1.1,1.0\nheart-beat:10000,10000\n\n\u0000" ]
可以看到請(qǐng)求連接的命令是CONNECT,連接報(bào)文里面還包含了心跳的信息
服務(wù)端返回連接成功
[ "CONNECTED\nversion:1.1\nheart-beat:10000,10000\nuser-name:admin\n\n\u0000" ]
CONNECTED是服務(wù)端連接成功的命令,報(bào)文中也包含了心跳的信息
客戶端訂閱
訂閱的目的地是:/user/topic/subNewMsg
["SUBSCRIBE\nid:sub-0\ndestination:/user/topic/subNewMsg\n\n\u0000"]
客戶端發(fā)送消息
發(fā)送的目的地是:/app/echo
[ "SEND\ndestination:/app/echo\ncontent-length:35\n\n{\"msgType\":1,\"content\":\"你好啊\"}\u0000" ]
服務(wù)端響應(yīng)消息
響應(yīng)的目的地是:/user/topic/subNewMsg,當(dāng)訂閱了這個(gè)目的地的,方法,將會(huì)被回調(diào)
[ "MESSAGE\ndestination:/user/topic/subNewMsg\ncontent-type:application/json;charset=UTF-8\nsubscription:sub-0\nmessage-id:5hy4avgm-1\ncontent-length:41\n\n{\"content\":\"Echo: 你好啊\",\"msgType\":1}\u0000" ]
心跳報(bào)文
可以看到,約每隔10S,客戶端和服務(wù)端都有一次心跳報(bào)文,發(fā)送的報(bào)文內(nèi)容為一個(gè)回車。
[\n]
項(xiàng)目鏈接:https://gitee.com/syk1234/stomp-demo.git
以上就是Springboot+Stomp協(xié)議實(shí)現(xiàn)聊天功能的詳細(xì)內(nèi)容,更多關(guān)于Springboot+Stomp聊天的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能
本文主要介紹了SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Mybatis-plus foreach拼接字符串查詢無(wú)數(shù)據(jù)返回問(wèn)題
這篇文章主要介紹了Mybatis-plus foreach拼接字符串查詢無(wú)數(shù)據(jù)返回問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Netty + ZooKeeper 實(shí)現(xiàn)簡(jiǎn)單的服務(wù)注冊(cè)與發(fā)現(xiàn)
服務(wù)注冊(cè)和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊(cè)中心,如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的服務(wù)注冊(cè)和發(fā)現(xiàn)。,需要的朋友可以參考下2019-06-06Java異或運(yùn)算應(yīng)用場(chǎng)景詳解
這篇文章主要給大家介紹了關(guān)于Java異或運(yùn)算應(yīng)用場(chǎng)景的相關(guān)資料,異或運(yùn)算會(huì)應(yīng)用在很多算法題中,這里整理了幾個(gè)最常見(jiàn)的應(yīng)用場(chǎng)景,文中通過(guò)代碼示例介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07SpringBoot實(shí)現(xiàn)多租戶架構(gòu)
在SpringBoot中可以通過(guò)多數(shù)據(jù)源和動(dòng)態(tài)路由來(lái)實(shí)現(xiàn)多租戶機(jī)制,本文主要介紹了SpringBoot實(shí)現(xiàn)多租戶架構(gòu),具有一定的參考價(jià)值,感興趣的可以里哦啊接一下2024-03-03springboot集成Feign的實(shí)現(xiàn)示例
Feign是聲明式HTTP客戶端,用于簡(jiǎn)化微服務(wù)之間的REST調(diào)用,本文就來(lái)介紹一下springboot集成Feign的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11