Springboot Websocket Stomp 消息訂閱推送
需求背景
閑話不扯,直奔主題。需要和web前端建立長鏈接,互相實時通訊,因此想到了websocket,后面隨著需求的變更,需要用戶訂閱主題,實現(xiàn)消息的精準(zhǔn)推送,發(fā)布訂閱等,則想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的簡單文本協(xié)議。
websocket協(xié)議
想到了之前寫的一個websocket長鏈接的demo,也貼上代碼供大家參考。
pom文件
直接引入spring-boot-starter-websocket即可。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
聲明websocket endpoint
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @ClassName WebSocketConfig
* @Author scott
* @Date 2021/6/16
* @Version V1.0
**/
@Configuration
public class WebSocketConfig {
/**
* 注入一個ServerEndpointExporter,該Bean會自動注冊使用@ServerEndpoint注解申明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
websocket實現(xiàn)類,其中通過注解監(jiān)聽了各種事件,實現(xiàn)了推送消息等相關(guān)邏輯
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName: DataTypePushWebSocket
* @Author: scott
* @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {
private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);
/**
* 記錄當(dāng)前在線連接數(shù)
*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
.initialCapacity(10)
.maximumSize(300)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("token")String token) {
String sessionId = session.getId();
onlineCount.incrementAndGet(); // 在線數(shù)加1
this.sendMessage("sessionId:" + sessionId +",已經(jīng)和server建立連接", session);
SESSION_CACHE.put(sessionId,session);
log.info("有新連接加入:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get());
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose(Session session,@PathParam("token")String token) {
onlineCount.decrementAndGet(); // 在線數(shù)減1
SESSION_CACHE.invalidate(session.getId());
log.info("有一連接關(guān)閉:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get());
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @param message 客戶端發(fā)送過來的消息
*/
@OnMessage
public void onMessage(String message, Session session,@PathParam("token")String token) {
log.info("服務(wù)端收到客戶端[{}]的消息:{}", session.getId(), message);
this.sendMessage("服務(wù)端已收到推送消息:" + message, session);
}
@OnError
public void onError(Session session, Throwable error) {
log.error("發(fā)生錯誤");
error.printStackTrace();
}
/**
* 服務(wù)端發(fā)送消息給客戶端
*/
private static void sendMessage(String message, Session toSession) {
try {
log.info("服務(wù)端給客戶端[{}]發(fā)送消息{}", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("服務(wù)端發(fā)送消息給客戶端失?。簕}", e);
}
}
public static AjaxResult sendMessage(String message, String sessionId){
Session session = SESSION_CACHE.getIfPresent(sessionId);
if(Objects.isNull(session)){
return AjaxResult.error("token已失效");
}
sendMessage(message,session);
return AjaxResult.success();
}
public static AjaxResult sendBroadcast(String message){
long size = SESSION_CACHE.size();
if(size <=0){
return AjaxResult.error("當(dāng)前沒有在線客戶端,無法推送消息");
}
ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
Set<String> keys = sessionConcurrentMap.keySet();
for (String key : keys) {
Session session = SESSION_CACHE.getIfPresent(key);
DataTypePushWebSocket.sendMessage(message,session);
}
return AjaxResult.success();
}
}
至此websocket服務(wù)端代碼已經(jīng)完成。
stomp協(xié)議
前端代碼.這個是在某個vue工程中寫的js,各位大佬自己動手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws
import Stomp from 'stompjs'
import Settings from '@/settings.js'
export default {
// 是否啟用日志 默認(rèn)啟用
debug:true,
// 客戶端連接信息
stompClient:{},
// 初始化
init(callBack){
this.stompClient = Stomp.client(Settings.wsPath)
this.stompClient.hasDebug = this.debug
this.stompClient.connect({},suce =>{
this.console("連接成功,信息如下 ↓")
this.console(this.stompClient)
if(callBack){
callBack()
}
},err => {
if(err) {
this.console("連接失敗,信息如下 ↓")
this.console(err)
}
})
},
// 訂閱
sub(address,callBack){
if(!this.stompClient.connected){
this.console("沒有連接,無法訂閱")
return
}
// 生成 id
let timestamp= new Date().getTime() + address
this.console("訂閱成功 -> "+address)
this.stompClient.subscribe(address,message => {
this.console(address+" 訂閱消息通知,信息如下 ↓")
this.console(message)
let data = message.body
callBack(data)
},{
id: timestamp
})
},
unSub(address){
if(!this.stompClient.connected){
this.console("沒有連接,無法取消訂閱 -> "+address)
return
}
let id = ""
for(let item in this.stompClient.subscriptions){
if(item.endsWith(address)){
id = item
break
}
}
this.stompClient.unsubscribe(id)
this.console("取消訂閱成功 -> id:"+ id + " address:"+address)
},
// 斷開連接
disconnect(callBack){
if(!this.stompClient.connected){
this.console("沒有連接,無法斷開連接")
return
}
this.stompClient.disconnect(() =>{
console.log("斷開成功")
if(callBack){
callBack()
}
})
},
// 單位 秒
reconnect(time){
setInterval(() =>{
if(!this.stompClient.connected){
this.console("重新連接中...")
this.init()
}
},time * 1000)
},
console(msg){
if(this.debug){
console.log(msg)
}
},
// 向訂閱發(fā)送消息
send(address,msg) {
this.stompClient.send(address,{},msg)
}
}
后端stomp config,里面都有注釋,寫的很詳細(xì),并且我加入了和前端的心跳ping pong。
package com.cn.scott.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* @ClassName: WebSocketStompConfig
* @Author: scott
* @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
private static long HEART_BEAT=10000;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//允許使用socketJs方式訪問,訪問點為webSocket,允許跨域
//在網(wǎng)頁上我們就可以通過這個鏈接
//ws://127.0.0.1:port/ws來和服務(wù)器的WebSocket連接
registry.addEndpoint("/ws").setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
//基于內(nèi)存的STOMP消息代理來代替mq的消息代理
//訂閱Broker名稱,/user代表點對點即發(fā)指定用戶,/topic代表發(fā)布廣播即群發(fā)
//setHeartbeatValue 設(shè)置心跳及心跳時間
registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
//點對點使用的訂閱前綴,不設(shè)置的話,默認(rèn)也是/user/
registry.setUserDestinationPrefix("/user/");
}
}
后端stomp協(xié)議接受、訂閱等動作通知
package com.cn.scott.ws;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName StompSocketHandler
* @Author scott
* @Date 2021/6/30
* @Version V1.0
**/
@RestController
public class StompSocketHandler {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MethodName: subscribeMapping
* @Description: 訂閱成功通知
* @Param: [id]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@SubscribeMapping("/user/{id}/listener")
public void subscribeMapping(@DestinationVariable("id") final long id) {
System.out.println(">>>>>>用戶:"+id +",已訂閱");
SubscribeMsg param = new SubscribeMsg(id,String.format("用戶【%s】已訂閱成功", id));
sendToUser(param);
}
/**
* @MethodName: test
* @Description: 接收訂閱topic消息
* @Param: [id, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@MessageMapping(value = "/user/{id}/listener")
public void UserSubListener(@DestinationVariable long id, String msg) {
System.out.println("收到客戶端:" +id+",的消息");
SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用戶【%s】發(fā)送消息【%s】", id,msg));
sendToUser(param);
}
@GetMapping("/refresh/{userId}")
public void refresh(@PathVariable Long userId, String msg) {
StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服務(wù)端向用戶【%s】發(fā)送消息【%s】", userId,msg));
sendToUser(param);
}
/**
* @MethodName: sendToUser
* @Description: 推送消息給訂閱用戶
* @Param: [userId]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendToUser(SubscribeMsg screenChangeMsg){
//這里可以控制權(quán)限等。。。
simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
}
/**
* @MethodName: sendBroadCast
* @Description: 發(fā)送廣播,需要用戶事先訂閱廣播
* @Param: [topic, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
public void sendBroadCast(String topic,String msg){
simpMessagingTemplate.convertAndSend(topic,msg);
}
/**
* @ClassName: SubMsg
* @Author: scott
* @Date: 2021/6/30
**/
public static class SubscribeMsg {
private Long userId;
private String msg;
public SubscribeMsg(Long UserId, String msg){
this.userId = UserId;
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public String getMsg() {
return msg;
}
}
}
連接展示
建立連接成功,這里可以看出是基于websocket協(xié)議

連接信息

ping pong

調(diào)用接口向訂閱用戶1發(fā)送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客戶端控制臺查看已經(jīng)收到了消息。這個時候不同用戶通過自己的userId可以區(qū)分訂閱的主題,可以做到通過userId精準(zhǔn)的往客戶端推送消息。

還記得我們在后端配置的時候還指定了廣播的訂閱主題/topic,這時我們前端通過js只要訂閱了這個主題,那么后端在像這個主題推送消息時,所有訂閱的客戶端都能收到,感興趣的小伙伴可以自己試試,api我都寫好了。

至此,實戰(zhàn)完畢,喜歡的小伙伴麻煩關(guān)注加點贊。
springboot + stomp后端源碼地址:https://gitee.com/ErGouGeSiBaKe/stomp-server
到此這篇關(guān)于Springboot Websocket Stomp 消息訂閱推送的文章就介紹到這了,更多相關(guān)Springboot Websocket Stomp 消息訂閱推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實現(xiàn)從網(wǎng)上下載圖片到本地的方法
這篇文章主要介紹了java實現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對文件操作的相關(guān)技巧,非常簡單實用,需要的朋友可以參考下2015-07-07
Java SpringBoot實現(xiàn)帶界面的代碼生成器詳解
這篇文章主要介紹了Java SpringBoot如何實現(xiàn)帶界面的代碼生成器,幫助大家更好的理解和使用Java SpringBoot編程語言,感興趣的朋友可以了解下2021-09-09
hadoop?詳解如何實現(xiàn)數(shù)據(jù)排序
在很多業(yè)務(wù)場景下,需要對原始的數(shù)據(jù)讀取分析后,將輸出的結(jié)果按照指定的業(yè)務(wù)字段進(jìn)行排序輸出,方便上層應(yīng)用對結(jié)果數(shù)據(jù)進(jìn)行展示或使用,減少二次排序的成本2022-02-02
淺析Bean?Searcher?與?MyBatis?Plus?區(qū)別介紹
Bean?Searcher號稱任何復(fù)雜的查詢都可以一行代碼搞定,但?Mybatis?Plus?似乎也有類似的動態(tài)查詢功能,最近火起的?Bean?Searcher?與?MyBatis?Plus?倒底有啥區(qū)別?帶著這個問題一起通過本文學(xué)習(xí)下吧2022-05-05
Java基礎(chǔ)詳解之集合框架工具Collections
這篇文章主要介紹了Java基礎(chǔ)詳解之集合框架工具Collections,文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-04-04
解決Lombok使用@Builder無法build父類屬性的問題
這篇文章主要介紹了解決Lombok使用@Builder無法build父類屬性的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-09-09
Eclipse連接Mysql數(shù)據(jù)庫操作總結(jié)
這篇文章主要介紹了Eclipse連接Mysql數(shù)據(jù)庫操作總結(jié)的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-08-08
EasyUi+Spring Data 實現(xiàn)按條件分頁查詢的實例代碼
這篇文章主要介紹了EasyUi+Spring Data 實現(xiàn)按條件分頁查詢的實例代碼,非常具有實用價值,需要的朋友可以參考下2017-07-07
Spring 源碼解析CommonAnnotationBeanPostProcessor
這篇文章主要為大家介紹了Spring 源碼解析CommonAnnotationBeanPostProcessor示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10

