亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示

 更新時(shí)間:2024年06月14日 08:28:10   作者:艾迪的技術(shù)之路  
本文主要介紹了Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示

為什么使用Kafka?

不使用Rabbitmq或者Rocketmq是因?yàn)镵afka是Hadoop集群下的組成部分,對于大數(shù)據(jù)的相關(guān)開發(fā)適應(yīng)性好,且當(dāng)前業(yè)務(wù)場景下不需要使用死信隊(duì)列,不過要注意Kafka對于更新時(shí)間慢的數(shù)據(jù)拉取也較慢,因此對與實(shí)時(shí)性要求高可以選擇其他MQ。

使用消息隊(duì)列是因?yàn)樵撝虚g件具有實(shí)時(shí)性,且可以作為廣播進(jìn)行消息分發(fā)。

為什么使用SSE?

使用Websocket傳輸信息的時(shí)候,會(huì)轉(zhuǎn)成二進(jìn)制數(shù)據(jù),產(chǎn)生一定的時(shí)間損耗,SSE直接傳輸文本,不存在這個(gè)問題

由于Websocket是雙向的,讀取日志的時(shí)候,如果有人連接ws日志,會(huì)發(fā)送大量異常信息,會(huì)給使用段和日志段造成問題;SSE是單向的,不需要考慮這個(gè)問題,提高了安全性
另外就是SSE支持?jǐn)嗑€重連;Websocket協(xié)議本身并沒有提供心跳機(jī)制,所以長時(shí)間沒有數(shù)據(jù)發(fā)送時(shí),會(huì)將這個(gè)連接斷掉,因此需要手寫心跳機(jī)制進(jìn)行實(shí)現(xiàn)。

此外,由于是長連接的一個(gè)實(shí)現(xiàn)方式,所以SSE也可以替代Websocket實(shí)現(xiàn)掃碼登陸(比如通過SSE的超時(shí)組件在實(shí)現(xiàn)二維碼的超時(shí)功能,具體實(shí)現(xiàn)我可以整理一下)

另外,如果是普通項(xiàng)目,不需要過高的實(shí)時(shí)性,則不需要使用Websocket,使用SSE即可

代碼實(shí)現(xiàn)

pom.xml引入SSE和Kafka

<!-- SSE,一般springboot開發(fā)web應(yīng)用的都有 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
<!-- kafka,最主要的是第一個(gè),剩下兩個(gè)是測試用的 -->
       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

application.properties增加Kafka配置信息

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

配置Kafka信息

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

配置controller,通過web方式開啟效果

@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {

    private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * @param message
     * @apiNote 發(fā)送信息到Kafka主題中
     */
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }

    /**
     * 監(jiān)聽Kafka數(shù)據(jù)
     *
     * @param message
     */
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }

    /**
     * 連接sse服務(wù)
     *
     * @param id
     * @return
     * @throws IOException
     */
    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(@RequestParam("id") String id) throws IOException {
        // 超時(shí)時(shí)間設(shè)置為5分鐘,用于演示客戶端自動(dòng)重連
        SseEmitter sseEmitter = new SseEmitter(5_60_000L);
        // 設(shè)置前端的重試時(shí)間為1s
        // send(): 發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對象,那么傳遞參數(shù)會(huì)被封裝到 data 中
        sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功"));
        sseCache.put(id, sseEmitter);
        System.out.println("add " + id);
        sseEmitter.send("你好", MediaType.APPLICATION_JSON);
        SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");
        sseEmitter.send(data);
        // onTimeout(): 超時(shí)回調(diào)觸發(fā)
        sseEmitter.onTimeout(() -> {
            System.out.println(id + "超時(shí)");
            sseCache.remove(id);
        });
        // onCompletion(): 結(jié)束之后的回調(diào)觸發(fā)
        sseEmitter.onCompletion(() -> System.out.println("完成?。。?));
        return sseEmitter;
    }
    /**
     * http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa
     * @param id
     * @param content
     * @return
     * @throws IOException
     */
    @ResponseBody
    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.send(content);
        }
        return "over";
    }

    @ResponseBody
    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            // complete(): 表示執(zhí)行完畢,會(huì)斷開連接
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }

}

前端方式

<html>
  <head>
    <script>
      console.log('start')
      const clientId = "your_client_id_x"; // 設(shè)置客戶端ID
      const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 訂閱服務(wù)器端的SSE

      eventSource.onmessage = event => {
        console.log(event.data)
        const message = JSON.parse(event.data);
        console.log(`Received message from server: ${message}`);
      };

      // 發(fā)送消息給服務(wù)器端 可通過 postman 調(diào)用,所以下面 sendMessage() 調(diào)用被注釋掉了
      function sendMessage() {
        const message = "hello sse";
        fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify(message)
        });
        console.log('dddd'+JSON.stringify(message))
      }
      // sendMessage()
    </script>
  </head>
</html>

到此這篇關(guān)于Spring Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示的文章就介紹到這了,更多相關(guān)SpringBoot實(shí)時(shí)數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot集成tkmapper及基本使用教程

    springboot集成tkmapper及基本使用教程

    tk.mybatis可以節(jié)省程序員的大部分時(shí)間,對于程序員來說關(guān)于一張表的操作無非就是增刪改查,tk.mybatis提供了一些基本操作的SQL語句,比如說按表的主鍵查詢、刪除等基本操作,我們接下來就來介紹一些springboot集成tkmapper及基本使用
    2022-11-11
  • 關(guān)于Java中的 JSP 詳解

    關(guān)于Java中的 JSP 詳解

    JSP 代表 Java 服務(wù)器頁面。它是一種在應(yīng)用服務(wù)器端使用的編程工具。JSP 基本上用于支持平臺(tái)–獨(dú)立和動(dòng)態(tài)的方法來構(gòu)建 Web 依賴的應(yīng)用程序。JSP 頁面類似于 ASP 頁面,因?yàn)樗鼈兪窃诜?wù)器上編譯的,而不是在用戶的 Web 瀏覽器上進(jìn)行編譯。下面來看看文章的詳細(xì)介紹內(nèi)容
    2021-11-11
  • 用Java進(jìn)行zip文件壓縮與解壓縮

    用Java進(jìn)行zip文件壓縮與解壓縮

    這篇文章主要介紹了用Java進(jìn)行zip文件壓縮與解壓縮的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-12-12
  • MyBatis-Plus自定義SQL和復(fù)雜查詢的實(shí)現(xiàn)

    MyBatis-Plus自定義SQL和復(fù)雜查詢的實(shí)現(xiàn)

    MyBatis-Plus增強(qiáng)了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復(fù)雜查詢?nèi)缍啾黻P(guān)聯(lián)、動(dòng)態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實(shí)現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下
    2024-10-10
  • Java Enum的簡單使用

    Java Enum的簡單使用

    這篇文章主要為大家詳細(xì)介紹了Java Enum的簡單使用,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-09-09
  • SpringBoot動(dòng)態(tài)修改yml配置文件的方法詳解

    SpringBoot動(dòng)態(tài)修改yml配置文件的方法詳解

    這篇文章主要為大家詳細(xì)介紹了SpringBoot動(dòng)態(tài)修改yml配置文件的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • Java數(shù)據(jù)結(jié)構(gòu)之KMP算法的實(shí)現(xiàn)

    Java數(shù)據(jù)結(jié)構(gòu)之KMP算法的實(shí)現(xiàn)

    這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)中KMP算法的原理與實(shí)現(xiàn),文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下
    2022-11-11
  • SpringBoot自動(dòng)配置特點(diǎn)與原理詳細(xì)分析

    SpringBoot自動(dòng)配置特點(diǎn)與原理詳細(xì)分析

    這篇文章主要介紹了SpringBoot自動(dòng)配置原理分析,SpringBoot是我們經(jīng)常使用的框架,那么你能不能針對SpringBoot實(shí)現(xiàn)自動(dòng)配置做一個(gè)詳細(xì)的介紹。如果可以的話,能不能畫一下實(shí)現(xiàn)自動(dòng)配置的流程圖。牽扯到哪些關(guān)鍵類,以及哪些關(guān)鍵點(diǎn)
    2022-08-08
  • SpringBoot超詳細(xì)講解@Value注解

    SpringBoot超詳細(xì)講解@Value注解

    在使用spring框架的項(xiàng)目中,@Value是經(jīng)常使用的注解之一。作用是將配置文件中的鍵對應(yīng)的值分配給某類內(nèi)帶注解的屬性。本文使您系統(tǒng)地了解@Value的用法。在使用Spring框架的項(xiàng)目中@Value是經(jīng)常使用的注解之一,其作用是將配置文件中的鍵對應(yīng)的值分配給某類內(nèi)帶注解的屬性
    2022-07-07
  • java性能優(yōu)化之分代回收

    java性能優(yōu)化之分代回收

    這篇文章主要介紹了java性能優(yōu)化之分代回收,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-07-07

最新評論