SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)
一、SSE概述
1、SSE簡(jiǎn)介
SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,也就是服務(wù)器主動(dòng)發(fā)送事件,客戶端可以獲取到服務(wù)器發(fā)送的事件。
我們常見的 http 交互方式是客戶端發(fā)起請(qǐng)求,服務(wù)端響應(yīng),然后一次請(qǐng)求完畢。但是在SSE的使用場(chǎng)景下,客戶端發(fā)起請(qǐng)求,然后建立SEE連接一直保持,服務(wù)端就可以返回?cái)?shù)據(jù)給客戶端。
SSE簡(jiǎn)單來說就是服務(wù)器主動(dòng)向前端推送數(shù)據(jù)的一種技術(shù),它是單向的。SSE適用于消息推送,監(jiān)控等只需要服務(wù)器推送數(shù)據(jù)的場(chǎng)景中。比如:文件下載時(shí),后端可以推送下載進(jìn)度條信息。
2、特點(diǎn)
SSE (Server Send Event)服務(wù)端主動(dòng)推送:
- html5新標(biāo)準(zhǔn),用來從服務(wù)端實(shí)時(shí)推送數(shù)據(jù)到瀏覽器端
- 直接建立在當(dāng)前http連接上,本質(zhì)上是保持一個(gè)http長連接,輕量協(xié)議
簡(jiǎn)單的服務(wù)器數(shù)據(jù)推送的場(chǎng)景,使用服務(wù)器推送事件還是很方便的。
3、SSE和WebScoket的區(qū)別
SSE 是單通道,只能服務(wù)端向客戶端發(fā)消息;而WebScoket 是雙通道。
| SE | WebScoket |
|---|---|
| http 協(xié)議 | 獨(dú)立的 websocket 協(xié)議 |
| 輕量,使用簡(jiǎn)單 | 相對(duì)復(fù)雜 |
| 默認(rèn)支持?jǐn)嗑€重連 | 需要自己實(shí)現(xiàn)斷線重連 |
| 文本傳輸 | 二進(jìn)制傳輸 |
| 支持自定義發(fā)送的消息類型 | - |
4、SSE規(guī)范
在 html5 的定義中,服務(wù)端SSE,一般需要遵循以下要求:
1)請(qǐng)求頭開啟長連接 + 流方式傳遞:
Content-Type: text/event-stream;charset=UTF-8 Cache-Control: no-cache Connection: keep-alive
2)數(shù)據(jù)格式服務(wù)端發(fā)送的消息,由 message 組成,其格式如下:
field:value
其中 field 有五種可能:
- 空:即以:開頭,表示注釋,可以理解為服務(wù)端向客戶端發(fā)送的心跳,確保連接不中斷
- data:數(shù)據(jù)
- event:事件,默認(rèn)值
- id:數(shù)據(jù)標(biāo)識(shí)符用 id 字段表示,相當(dāng)于每一條數(shù)據(jù)的編號(hào)
- retry:重連時(shí)間
二、SSE實(shí)戰(zhàn)
使用 SpringBoot簡(jiǎn)單實(shí)現(xiàn)一個(gè)SSE服務(wù)端主動(dòng)推送數(shù)據(jù)為前端,前端頁面接受后展示進(jìn)度條。
1、SseEmitter類簡(jiǎn)介
SpringBoot 利用 SseEmitter 來支持SSE,并對(duì)SSE規(guī)范做了一些封裝,使用起來非常簡(jiǎn)單。我們操作SseEmitter對(duì)象,關(guān)注消息文本即可。
SseEmitter類的幾個(gè)方法:
- send():發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對(duì)象,那么傳遞參數(shù)會(huì)被封裝到 data 中。
- complete():表示執(zhí)行完畢,會(huì)斷開連接。
- onTimeout():連接超時(shí)時(shí)回調(diào)觸發(fā)。
- onCompletion():結(jié)束之后的回調(diào)觸發(fā)。
- onError():報(bào)錯(cuò)時(shí)的回調(diào)觸發(fā)。
2、示例實(shí)戰(zhàn)
創(chuàng)建 SpringBoot項(xiàng)目,引入依賴:
<!-- 里面包含了 spring-webmvc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
</dependency>2.1 創(chuàng)建 SseServer
我們創(chuàng)建一個(gè) SseServer來簡(jiǎn)單封裝一下業(yè)務(wù)操作SSE的方法。
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* SseServer業(yè)務(wù)封裝類來操作SEE
*/
@Slf4j
public class SseServer {
/**
* 當(dāng)前連接總數(shù)
*/
private static AtomicInteger currentConnectTotal = new AtomicInteger(0);
/**
* messageId的 SseEmitter對(duì)象映射集
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 創(chuàng)建sse連接
*
* @param messageId - 消息id(唯一)
* @return
*/
public static SseEmitter createConnect(String messageId) {
/**
* 設(shè)置連接超時(shí)時(shí)間。0表示不過期,默認(rèn)是30秒,超過時(shí)間未完成會(huì)拋出異常
*/
SseEmitter sseEmitter = new SseEmitter(0L);
/*
// 超時(shí)時(shí)間設(shè)置為3s,設(shè)置前端的重試時(shí)間為1s。重連時(shí),注意總數(shù)的統(tǒng)計(jì)
SseEmitter sseEmitter = new SseEmitter(3_000L);
try {
sseEmitter.send(
SseEmitter.event()
.reconnectTime(1000L)
//.data("前端重連成功") // 重連成功的提示信息
);
} catch (IOException e) {
log.error("前端重連異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
e.printStackTrace();
}*/
// 注冊(cè)回調(diào)
sseEmitter.onCompletion(completionCallBack(messageId));
sseEmitter.onTimeout(timeOutCallBack(messageId));
sseEmitter.onError(errorCallBack(messageId));
sseEmitterMap.put(messageId, sseEmitter);
//記錄一下連接總數(shù)。數(shù)量+1
int count = currentConnectTotal.incrementAndGet();
log.info("創(chuàng)建sse連接成功 ==> 當(dāng)前連接總數(shù)={}, messageId={}", count, messageId);
return sseEmitter;
}
/**
* 給指定 messageId發(fā)消息
*
* @param messageId - 消息id(唯一)
* @param message - 消息文本
*/
public static void sendMessage(String messageId, String message) {
if (sseEmitterMap.containsKey(messageId)) {
try {
sseEmitterMap.get(messageId).send(message);
} catch (IOException e) {
log.error("發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
e.printStackTrace();
}
} else {
throw new RuntimeException("連接不存在或者超時(shí), messageId=" + messageId);
}
}
/**
* 給所有 messageId廣播發(fā)送消息
*
* @param message
*/
public static void batchAllSendMessage(String message) {
sseEmitterMap.forEach((messageId, sseEmitter) -> {
try {
sseEmitter.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("廣播發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
removeMessageId(messageId);
}
});
}
/**
* 給指定 messageId集合群發(fā)消息
*
* @param messageIds
* @param message
*/
public static void batchSendMessage(List<String> messageIds, String message) {
if (CollectionUtils.isEmpty(messageIds)) {
return;
}
// 去重
messageIds = messageIds.stream().distinct().collect(Collectors.toList());
messageIds.forEach(userId -> sendMessage(userId, message));
}
/**
* 給指定組群發(fā)消息(即組播,我們讓 messageId滿足我們的組命名確定即可)
*
* @param groupId
* @param message
*/
public static void groupSendMessage(String groupId, String message) {
if (MapUtils.isEmpty(sseEmitterMap)) {
return;
}
sseEmitterMap.forEach((messageId, sseEmitter) -> {
try {
// 這里 groupId作為前綴
if (messageId.startsWith(groupId)) {
sseEmitter.send(message, MediaType.APPLICATION_JSON);
}
} catch (IOException e) {
log.error("組播發(fā)送消息異常 ==> groupId={}, 異常信息:", groupId, e.getMessage());
removeMessageId(messageId);
}
});
}
/**
* 移除 MessageId
*
* @param messageId
*/
public static void removeMessageId(String messageId) {
sseEmitterMap.remove(messageId);
//數(shù)量-1
currentConnectTotal.getAndDecrement();
log.info("remove messageId={}", messageId);
}
/**
* 獲取所有的 MessageId集合
*
* @return
*/
public static List<String> getMessageIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 獲取當(dāng)前連接總數(shù)
*
* @return
*/
public static int getConnectTotal() {
return currentConnectTotal.intValue();
}
/**
* 斷開SSE連接時(shí)的回調(diào)
*
* @param messageId
* @return
*/
private static Runnable completionCallBack(String messageId) {
return () -> {
log.info("結(jié)束連接 ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
/**
* 連接超時(shí)時(shí)回調(diào)觸發(fā)
*
* @param messageId
* @return
*/
private static Runnable timeOutCallBack(String messageId) {
return () -> {
log.info("連接超時(shí) ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
/**
* 連接報(bào)錯(cuò)時(shí)回調(diào)觸發(fā)。
*
* @param messageId
* @return
*/
private static Consumer<Throwable> errorCallBack(String messageId) {
return throwable -> {
log.error("連接異常 ==> messageId={}", messageId);
removeMessageId(messageId);
};
}
}2.2 業(yè)務(wù)controller
@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseDemoController {
/**
* 用戶SSE連接
* 它返回一個(gè)SseEmitter實(shí)例,這時(shí)候連接就已經(jīng)創(chuàng)建了.
*
* @return
*/
@GetMapping("/userConnect")
public SseEmitter connect() {
/**
* 一般取登錄用戶賬號(hào)作為 messageId。分組的話需要約定 messageId的格式。
* 這里模擬創(chuàng)建一個(gè)用戶連接
*/
String userId = "userId-" + RandomUtils.nextInt(1, 10);
return SseServer.createConnect(userId);
}
/**
* 模擬實(shí)例:下載進(jìn)度條顯示。 前端訪問下載接口之前,先建立用戶SSE連接,然后訪問下載接口,服務(wù)端推送消息。
* http://localhost:8080/sse/downLoad/userId-1
*
* @throws InterruptedException
*/
@GetMapping("/downLoad/{userId}")
public void pushOne(@PathVariable("userId") String userId) throws InterruptedException {
for (int i = 0; i <= 100; i++) {
if (i > 50 && i < 70) {
Thread.sleep(500L);
} else {
Thread.sleep(100L);
}
System.out.println("sendMessage --> 消息=" + i);
SseServer.sendMessage(userId, String.valueOf(i));
}
System.out.println("下載成功");
}
/**
* 廣播發(fā)送。http://localhost:8080/sse/pushAllUser
*
* @throws InterruptedException
*/
@GetMapping("/pushAllUser")
public void pushAllUser() throws InterruptedException {
for (int i = 0; i <= 100; i++) {
if (i > 50 && i < 70) {
Thread.sleep(500L);
} else {
Thread.sleep(100L);
}
System.out.println("batchAllSendMessage --> 消息=" + i);
SseServer.batchAllSendMessage(String.valueOf(i));
}
}
}2.3 前端html
簡(jiǎn)單寫一個(gè)html來演示效果。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Home</title>
<script>
var sseSource = new EventSource("http://localhost:8080/sse/userConnect");
<!-- 添加一個(gè)信息回調(diào) -->
sseSource.onmessage = function(event){
console.log("test=>",event)
document.getElementById("result").innerText = event.data+'%';
document.getElementById("my-progress").value = event.data;
}
// 使用vue交互事件,可以添加一些SSE的回調(diào)
// sseSource.dispatchEvent();
// sseSource.close();
</script>
</head>
<body>
<div id="result"></div>
下載進(jìn)度條:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
</body>
</html>3、演示效果

參考文章:
HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html
到此這篇關(guān)于SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot SSE服務(wù)端主動(dòng)推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea中解決maven包沖突的問題(maven helper)
這篇文章主要介紹了idea中解決maven包沖突的問題(maven helper),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12
Spring實(shí)現(xiàn)控制反轉(zhuǎn)和依賴注入的示例詳解
這篇文章主要為大家詳細(xì)介紹IoC(控制反轉(zhuǎn))和DI(依賴注入)的概念,以及如何在Spring框架中實(shí)現(xiàn)它們,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-08-08
springboot 啟動(dòng)如何修改application.properties的參數(shù)
這篇文章主要介紹了springboot 啟動(dòng)如何修改application.properties的參數(shù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
Springboot使用sharedingjdbc實(shí)現(xiàn)分庫分表
這篇文章主要介紹了Springboot使用sharedingjdbc實(shí)現(xiàn)分庫分表,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
SpringBoot創(chuàng)建線程池的六種方式小結(jié)
本文主要介紹了SpringBoot創(chuàng)建線程池的六種方式小結(jié),包括自定義線程池,固定長度線程池,單一線程池,共享線程池,定時(shí)線程池,SpringBoot中注入異步線程池,感興趣的可以了解一下2023-11-11
springboot~nexus項(xiàng)目打包要注意的地方示例代碼詳解
這篇文章主要介紹了springboot~nexus項(xiàng)目打包要注意的地方,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
javafx tableview鼠標(biāo)觸發(fā)更新屬性詳解
這篇文章主要為大家詳細(xì)介紹了javafx tableview鼠標(biāo)觸發(fā)更新屬性的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08
Springboot 使用 JSR 303 對(duì) Controller 控制層校驗(yàn)及 Service 服務(wù)層 AOP 校驗(yàn)
這篇文章主要介紹了Springboot 使用 JSR 303 對(duì) Controller 控制層校驗(yàn)及 Service 服務(wù)層 AOP 校驗(yàn) 使用消息資源文件對(duì)消息國際化的相關(guān)知識(shí),需要的朋友可以參考下2017-12-12

