亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring?Boot整合Kafka教程詳解

 更新時間:2023年03月10日 14:22:49   作者:qianmoq  
這篇文章主要為大家介紹了Spring?Boot整合Kafka教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

正文

本教程將介紹如何在 Spring Boot 應用程序中使用 Kafka。Kafka 是一個分布式的發(fā)布-訂閱消息系統(tǒng),它可以處理大量數(shù)據(jù)并提供高吞吐量。

在本教程中,我們將使用 Spring Boot 2.5.4Kafka 2.8.0

步驟一:添加依賴項

在 pom.xml 中添加以下依賴項:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

步驟二:配置 Kafka

application.yml 文件中添加以下配置:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

這里我們配置了 Kafka 的服務地址為 localhost:9092,配置了一個消費者組 ID 為 my-group,并設(shè)置了一個最早的偏移量來讀取消息。在生產(chǎn)者方面,我們配置了消息序列化程序為 StringSerializer。

步驟三:創(chuàng)建一個生產(chǎn)者

現(xiàn)在,我們將創(chuàng)建一個 Kafka 生產(chǎn)者,用于發(fā)送消息到 Kafka 服務器。在這里,我們將創(chuàng)建一個 RESTful 端點,用于接收 POST 請求并將消息發(fā)送到 Kafka。

首先,我們將創(chuàng)建一個 KafkaProducerConfig 類,用于配置 Kafka 生產(chǎn)者:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代碼中,我們使用 @Configuration 注解將 KafkaProducerConfig 類聲明為配置類。然后,我們使用 @Value 注解注入配置文件中的 bootstrap-servers 屬性。

接下來,我們創(chuàng)建了一個 producerConfigs 方法,用于設(shè)置 Kafka 生產(chǎn)者的配置。在這里,我們設(shè)置了 BOOTSTRAP_SERVERS_CONFIG、KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三個屬性。

然后,我們創(chuàng)建了一個 producerFactory 方法,用于創(chuàng)建 Kafka 生產(chǎn)者工廠。在這里,我們使用了 DefaultKafkaProducerFactory 類,并傳遞了我們的配置。

最后,我們創(chuàng)建了一個 kafkaTemplate 方法,用于創(chuàng)建 KafkaTemplate 實例。在這里,我們使用了剛剛創(chuàng)建的生產(chǎn)者工廠作為參數(shù),然后返回 KafkaTemplate 實例。

接下來,我們將創(chuàng)建一個 RESTful 端點,用于接收 POST 請求并將消息發(fā)送到 Kafka。在這里,我們將使用 @RestController 注解創(chuàng)建一個 RESTful 控制器:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

在上面的代碼中,我們使用 @Autowired 注解將 KafkaTemplate 實例注入到 KafkaController 類中。然后,我們創(chuàng)建了一個 sendMessage 方法,用于發(fā)送消息到 Kafka。

在這里,我們使用 kafkaTemplate.send 方法發(fā)送消息到 my-topic 主題。send 方法返回一個 ListenableFuture 對象,用于異步處理結(jié)果。

步驟四:創(chuàng)建一個消費者

現(xiàn)在,我們將創(chuàng)建一個 Kafka 消費者,用于從 Kafka 服務器接收消息。在這里,我們將創(chuàng)建一個消費者組,并將其配置為從 my-topic 主題讀取消息。

首先,我們將創(chuàng)建一個 KafkaConsumerConfig 類,用于配置 Kafka 消費者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上面的代碼中,我們使用 @Configuration 注解將 KafkaConsumerConfig 類聲明為配置類,并使用 @EnableKafka 注解啟用 Kafka。

然后,我們使用 @Value 注解注入配置文件中的 bootstrap-serversconsumer.group-id 屬性。

接下來,我們創(chuàng)建了一個 consumerConfigs 方法,用于設(shè)置 Kafka 消費者的配置。在這里,我們設(shè)置了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG、AUTO_OFFSET_RESET_CONFIG、KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五個屬性。

然后,我們創(chuàng)建了一個 consumerFactory 方法,用于創(chuàng)建 Kafka 消費者工廠。在這里,我們使用了 DefaultKafkaConsumerFactory 類,并傳遞了我們的配置。

最后,我們創(chuàng)建了一個 kafkaListenerContainerFactory 方法,用于創(chuàng)建一個 ConcurrentKafkaListenerContainerFactory 實例。在這里,我們將消費者工廠注入到 kafkaListenerContainerFactory 實例中。

接下來,我們將創(chuàng)建一個 Kafka 消費者類 KafkaConsumer,用于監(jiān)聽 my-topic 主題并接收消息:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代碼中,我們使用 @KafkaListener 注解聲明了一個消費者方法,用于接收從 my-topic 主題中讀取的消息。在這里,我們將消費者組 ID 設(shè)置為 my-group-id

現(xiàn)在,我們已經(jīng)完成了 Kafka 生產(chǎn)者和消費者的設(shè)置。我們可以使用 mvn spring-boot:run 命令啟動應用程序,并使用 curl 命令發(fā)送 POST 請求到 http://localhost:8080/send 端點,以將消息發(fā)送到 Kafka。然后,我們可以在控制臺上查看消費者接收到的消息。

這就是使用 Spring Boot 和 Kafka 的基本設(shè)置。我們可以根據(jù)需要進行更改和擴展,以滿足特定的需求。

以上就是Spring Boot整合Kafka教程詳解的詳細內(nèi)容,更多關(guān)于Spring Boot整合Kafka的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Spring?Cloud?Eureka基礎(chǔ)應用及原理

    Spring?Cloud?Eureka基礎(chǔ)應用及原理

    這篇文章主要介紹了Spring?Cloud?Eureka基礎(chǔ)應用,Eureka?Client中內(nèi)置一個負載均衡器,用來進行基本的負載均衡,下面我們將通過搭建一個簡單的Eureka例子來了解Eureka的運作原理,感興趣的朋友一起看看吧
    2022-05-05
  • Mybatis不啟動項目直接測試Mapper的實現(xiàn)方法

    Mybatis不啟動項目直接測試Mapper的實現(xiàn)方法

    在項目開發(fā)中,測試單個Mybatis Mapper方法通常需要啟動整個SpringBoot項目,消耗大量時間,本文介紹通過Main方法和Mybatis配置類,快速測試Mapper功能,無需啟動整個項目,這方法使用AnnotationConfigApplicationContext容器
    2024-09-09
  • Java使用OpenCV3.2實現(xiàn)視頻讀取與播放

    Java使用OpenCV3.2實現(xiàn)視頻讀取與播放

    這篇文章主要為大家詳細介紹了Java使用OpenCV3.2實現(xiàn)視頻讀取與播放,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-07-07
  • Spring Cloud Gateway 記錄請求應答數(shù)據(jù)日志操作

    Spring Cloud Gateway 記錄請求應答數(shù)據(jù)日志操作

    這篇文章主要介紹了Spring Cloud Gateway 記錄請求應答數(shù)據(jù)日志操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • java基于websocket實現(xiàn)im聊天功能

    java基于websocket實現(xiàn)im聊天功能

    這篇文章主要為大家介紹了java基于websocket實現(xiàn)im聊天功能示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-11-11
  • MyBatis通用Mapper中的通用example(排序)詳解

    MyBatis通用Mapper中的通用example(排序)詳解

    這篇文章主要介紹了MyBatis通用Mapper中的通用example(排序)詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • MapReduce實現(xiàn)TopN效果示例解析

    MapReduce實現(xiàn)TopN效果示例解析

    這篇文章主要為大家介紹了MapReduce實現(xiàn)TopN效果示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-07-07
  • Spring Boot集成Thymeleaf的方法

    Spring Boot集成Thymeleaf的方法

    這篇文章主要介紹了Spring Boot集成Thymeleaf的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-10-10
  • 深入理解java動態(tài)代理機制

    深入理解java動態(tài)代理機制

    本篇文章主要介紹了深入理解java動態(tài)代理機制,詳細的介紹動態(tài)代理有哪些應用場景,什么是動態(tài)代理,怎樣使用,它的局限性在什么地方?有興趣的可以了解一下。
    2017-02-02
  • Java手寫線程池之向JDK線程池進發(fā)

    Java手寫線程池之向JDK線程池進發(fā)

    在前面的文章自己動手寫乞丐版線程池中,我們寫了一個非常簡單的線程池實現(xiàn),這個只是一個非常簡單的實現(xiàn),在本篇文章當中我們將要實現(xiàn)一個和JDK內(nèi)部實現(xiàn)的線程池非常相似的線程池,需要的可以了解一下
    2022-10-10

最新評論