Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示
為什么使用Kafka?
不使用Rabbitmq或者Rocketmq是因?yàn)镵afka是Hadoop集群下的組成部分,對于大數(shù)據(jù)的相關(guān)開發(fā)適應(yīng)性好,且當(dāng)前業(yè)務(wù)場景下不需要使用死信隊(duì)列,不過要注意Kafka對于更新時(shí)間慢的數(shù)據(jù)拉取也較慢,因此對與實(shí)時(shí)性要求高可以選擇其他MQ。
使用消息隊(duì)列是因?yàn)樵撝虚g件具有實(shí)時(shí)性,且可以作為廣播進(jìn)行消息分發(fā)。
為什么使用SSE?
使用Websocket傳輸信息的時(shí)候,會(huì)轉(zhuǎn)成二進(jìn)制數(shù)據(jù),產(chǎn)生一定的時(shí)間損耗,SSE直接傳輸文本,不存在這個(gè)問題
由于Websocket是雙向的,讀取日志的時(shí)候,如果有人連接ws日志,會(huì)發(fā)送大量異常信息,會(huì)給使用段和日志段造成問題;SSE是單向的,不需要考慮這個(gè)問題,提高了安全性
另外就是SSE支持?jǐn)嗑€重連;Websocket協(xié)議本身并沒有提供心跳機(jī)制,所以長時(shí)間沒有數(shù)據(jù)發(fā)送時(shí),會(huì)將這個(gè)連接斷掉,因此需要手寫心跳機(jī)制進(jìn)行實(shí)現(xiàn)。
此外,由于是長連接的一個(gè)實(shí)現(xiàn)方式,所以SSE也可以替代Websocket實(shí)現(xiàn)掃碼登陸(比如通過SSE的超時(shí)組件在實(shí)現(xiàn)二維碼的超時(shí)功能,具體實(shí)現(xiàn)我可以整理一下)
另外,如果是普通項(xiàng)目,不需要過高的實(shí)時(shí)性,則不需要使用Websocket,使用SSE即可
代碼實(shí)現(xiàn)
pom.xml引入SSE和Kafka
<!-- SSE,一般springboot開發(fā)web應(yīng)用的都有 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafka,最主要的是第一個(gè),剩下兩個(gè)是測試用的 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
application.properties增加Kafka配置信息
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
配置Kafka信息
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
配置controller,通過web方式開啟效果
@RestController @RequestMapping(path = "sse") public class KafkaSSEController { private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * @param message * @apiNote 發(fā)送信息到Kafka主題中 */ @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } /** * 監(jiān)聽Kafka數(shù)據(jù) * * @param message */ @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } /** * 連接sse服務(wù) * * @param id * @return * @throws IOException */ @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter push(@RequestParam("id") String id) throws IOException { // 超時(shí)時(shí)間設(shè)置為5分鐘,用于演示客戶端自動(dòng)重連 SseEmitter sseEmitter = new SseEmitter(5_60_000L); // 設(shè)置前端的重試時(shí)間為1s // send(): 發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對象,那么傳遞參數(shù)會(huì)被封裝到 data 中 sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功")); sseCache.put(id, sseEmitter); System.out.println("add " + id); sseEmitter.send("你好", MediaType.APPLICATION_JSON); SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈"); sseEmitter.send(data); // onTimeout(): 超時(shí)回調(diào)觸發(fā) sseEmitter.onTimeout(() -> { System.out.println(id + "超時(shí)"); sseCache.remove(id); }); // onCompletion(): 結(jié)束之后的回調(diào)觸發(fā) sseEmitter.onCompletion(() -> System.out.println("完成?。。?)); return sseEmitter; } /** * http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa * @param id * @param content * @return * @throws IOException */ @ResponseBody @GetMapping(path = "push") public String push(String id, String content) throws IOException { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); } return "over"; } @ResponseBody @GetMapping(path = "over") public String over(String id) { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { // complete(): 表示執(zhí)行完畢,會(huì)斷開連接 sseEmitter.complete(); sseCache.remove(id); } return "over"; } }
前端方式
<html> <head> <script> console.log('start') const clientId = "your_client_id_x"; // 設(shè)置客戶端ID const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 訂閱服務(wù)器端的SSE eventSource.onmessage = event => { console.log(event.data) const message = JSON.parse(event.data); console.log(`Received message from server: ${message}`); }; // 發(fā)送消息給服務(wù)器端 可通過 postman 調(diào)用,所以下面 sendMessage() 調(diào)用被注釋掉了 function sendMessage() { const message = "hello sse"; fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(message) }); console.log('dddd'+JSON.stringify(message)) } // sendMessage() </script> </head> </html>
到此這篇關(guān)于Spring Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示的文章就介紹到這了,更多相關(guān)SpringBoot實(shí)時(shí)數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis-Plus自定義SQL和復(fù)雜查詢的實(shí)現(xiàn)
MyBatis-Plus增強(qiáng)了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復(fù)雜查詢?nèi)缍啾黻P(guān)聯(lián)、動(dòng)態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實(shí)現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下2024-10-10SpringBoot動(dòng)態(tài)修改yml配置文件的方法詳解
這篇文章主要為大家詳細(xì)介紹了SpringBoot動(dòng)態(tài)修改yml配置文件的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03Java數(shù)據(jù)結(jié)構(gòu)之KMP算法的實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)中KMP算法的原理與實(shí)現(xiàn),文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下2022-11-11SpringBoot自動(dòng)配置特點(diǎn)與原理詳細(xì)分析
這篇文章主要介紹了SpringBoot自動(dòng)配置原理分析,SpringBoot是我們經(jīng)常使用的框架,那么你能不能針對SpringBoot實(shí)現(xiàn)自動(dòng)配置做一個(gè)詳細(xì)的介紹。如果可以的話,能不能畫一下實(shí)現(xiàn)自動(dòng)配置的流程圖。牽扯到哪些關(guān)鍵類,以及哪些關(guān)鍵點(diǎn)2022-08-08