亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

springboot連接kafka集群的使用示例

 更新時(shí)間:2023年09月14日 10:57:35   作者:timi先生  
在項(xiàng)目中使用kafka的場景有很多,尤其是實(shí)時(shí)產(chǎn)生的數(shù)據(jù)流,本文主要介紹了springboot連接kafka集群的使用示例,具有一定的參考價(jià)值,感興趣的可以了解一下

一、環(huán)境搭建

1.1 springboot 環(huán)境

  • JDK 11+
  • Maven 3.8.x+
  • springboot 2.5.4 +

1.2 kafka 依賴

springboot的pom文件導(dǎo)入

       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

二、 kafka 配置類

2.1 發(fā)布者

2.1.1 配置

發(fā)布者我們使用 KafkaTemplate 來進(jìn)行消息發(fā)布,所以需要先對其進(jìn)行一些必要的配置。

@Configuration
@EnableKafka
public class KafkaConfig {
     /***** 發(fā)布者 *****/
    //生產(chǎn)者工廠
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    //生產(chǎn)者配置
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    //生產(chǎn)者模板
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2.1.2 構(gòu)建發(fā)布者類

配置完發(fā)布者,下來就是發(fā)布消息,我們需要繼承 ProducerListener<K, V> 接口,該接口完整信息如下:

public interface ProducerListener<K, V> {
    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);
}

實(shí)現(xiàn)該接口的方法,我們可以獲取包含發(fā)送結(jié)果(成功或失?。┑漠惒交卣{(diào),也就是可以在這個(gè)接口的實(shí)現(xiàn)中獲取發(fā)送結(jié)果。

我們簡單的實(shí)現(xiàn)構(gòu)建一個(gè)發(fā)布者類,接收主題和發(fā)布消息參數(shù),并打印發(fā)布結(jié)果。

@Component
public class KafkaProducer implements ProducerListener<Object,Object> {
    private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class);
    private final KafkaTemplate<Integer, String> kafkaTemplate;
    public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public void producer (String msg,String topic){
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg);
        future.addCallback(new KafkaSendCallback<Integer, String>() {
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                producerlog.info("發(fā)送成功 {}", result);
            }
            @Override
            public void onFailure(KafkaProducerException ex) {
                ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
                producerlog.info("發(fā)送失敗 {}",failed);
            }
        });
    }
}

2.1.3 發(fā)布消息

寫一個(gè)controller類來測試我們構(gòu)建的發(fā)布者類,這個(gè)類中打印接收到的消息,來確保信息接收不出問題。

@RestController
public class KafkaTestController {
    private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class);
    @Resource
    private KafkaProducer kafkaProducer;
    @GetMapping("/kafkaTest")
    public void kafkaTest(String msg,String topic){
        kafkaProducer.producer(msg,topic);
        kafkaTestLog.info("接收到消息 {} {}",msg,topic);
    }
}

一切準(zhǔn)備就緒,我們啟動程序利用postman來進(jìn)行簡單的測試。

進(jìn)行消息發(fā)布:

發(fā)布結(jié)果:

可以看到消息發(fā)送成功。

我們再看看kafka消費(fèi)者有沒有接收到消息:

看以看到,kakfa的消費(fèi)者也接收到了消息。

2.2 消費(fèi)者

2.2.1 配置

消息的接受有多種方式,我們這里選擇的是使用 @KafkaListener 注解來進(jìn)行消息接收。它的使用像下面這樣:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

看起來不是太難吧,但使用這個(gè)注解,我們需要配置底層 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。

我們在原來的kafka配置類 KafkaConfig 中,繼續(xù)配置消費(fèi)者,大概就像下面這樣

@Configuration
@EnableKafka
public class KafkaConfig {
     /***** 發(fā)布者 *****/
    //生產(chǎn)者工廠
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    //生產(chǎn)者配置
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    //生產(chǎn)者模板
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    /***** 消費(fèi)者 *****/
    //容器監(jiān)聽工廠
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    //消費(fèi)者工廠
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    //消費(fèi)者配置
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
        return props;
    }
}

注意,要設(shè)置容器屬性必須使用getContainerProperties()工廠方法。它用作注入容器的實(shí)際屬性的模板

2.2.2 構(gòu)建消費(fèi)者類

配置好后,我們就可以使用這個(gè)注解了。這個(gè)注解的使用有多種方式:

1、用它來覆蓋容器工廠的concurrency和屬性

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

2、可以使用顯式主題和分區(qū)(以及可選的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

3、將初始偏移應(yīng)用于所有已分配的分區(qū)

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

4、指定以逗號分隔的分區(qū)列表或分區(qū)范圍

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

5、可以向偵聽器提供Acknowledgment

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

6、添加標(biāo)頭

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

我們這里寫一個(gè)簡單的,只用它來接受指定主題的數(shù)據(jù):

@Component
public class KafkaConsumer {
    private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class);
    @KafkaListener(topicPartitions  = @TopicPartition(topic = "kafka-topic-test",
            partitions = "0"))
    public void consumer (String data){
        consumerlog.info("消費(fèi)者接收數(shù)據(jù) {}",data);
    }
}

這里解釋一下,因?yàn)槲覀冞M(jìn)行了手動分配主題/分區(qū),所以 注解中g(shù)roup.id 可以為空。若要指定group.id請?jiān)谙M(fèi)者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “組id”

2.2.3 進(jìn)行消息消費(fèi)

繼續(xù)使用postman調(diào)用我們寫好的發(fā)布者發(fā)布消息,觀察控制臺的消費(fèi)者類是否有相關(guān)日志出現(xiàn)。

 到此這篇關(guān)于springboot連接kafka集群的使用示例的文章就介紹到這了,更多相關(guān)springboot連接kafka集群內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解

    Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解

    這篇文章主要介紹了Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解,文中用代碼舉例講解的很清晰,有感興趣的同學(xué)可以研究下
    2021-03-03
  • IDEA中application.properties的圖標(biāo)顯示不正常的問題及解決方法

    IDEA中application.properties的圖標(biāo)顯示不正常的問題及解決方法

    這篇文章主要介紹了IDEA中application.properties的圖標(biāo)顯示不正常的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • idea如何配置springboot熱部署

    idea如何配置springboot熱部署

    文章介紹了如何在不同版本的IntelliJ IDEA中配置靜態(tài)和動態(tài)編譯,并提供了觸發(fā)熱部署的方法
    2025-01-01
  • Java虛擬機(jī)底層原理詳細(xì)分析

    Java虛擬機(jī)底層原理詳細(xì)分析

    這篇文章主要介紹了Java虛擬機(jī)底層原理詳細(xì)分析,運(yùn)行時(shí)數(shù)據(jù)區(qū)就是俗稱的虛擬機(jī)內(nèi)存,主要包括我們熟悉的堆、棧、本地方法棧、方法區(qū)(元空間)、程序計(jì)數(shù)器,虛擬機(jī)調(diào)優(yōu)主要針對的是運(yùn)行時(shí)數(shù)據(jù)區(qū),也就是虛擬機(jī)內(nèi)存,需要的朋友可以參考下
    2024-01-01
  • Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)

    Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java基于NIO實(shí)現(xiàn)群聊系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • 講解Java設(shè)計(jì)模式編程中的建造者模式與原型模式

    講解Java設(shè)計(jì)模式編程中的建造者模式與原型模式

    這篇文章主要介紹了Java設(shè)計(jì)模式編程中的建造者模式與原型模式,設(shè)計(jì)模式有利于團(tuán)隊(duì)開發(fā)過程中的代碼維護(hù),需要的朋友可以參考下
    2016-02-02
  • Map如何根據(jù)key指定條件進(jìn)行過濾篩選

    Map如何根據(jù)key指定條件進(jìn)行過濾篩選

    這篇文章主要介紹了Map如何根據(jù)key指定條件進(jìn)行過濾篩選問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn)

    SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn)

    本文介紹了SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-11-11
  • 一文詳解Spring是怎么讀取配置Xml文件的

    一文詳解Spring是怎么讀取配置Xml文件的

    這篇文章主要介紹了一文詳解Spring是怎么讀取配置Xml文件的,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,感興趣的小伙伴可以參考一下
    2022-08-08
  • java創(chuàng)建txt文件并寫入內(nèi)容的方法代碼示例

    java創(chuàng)建txt文件并寫入內(nèi)容的方法代碼示例

    這篇文章主要介紹了java創(chuàng)建txt文件并寫入內(nèi)容的兩種方法,分別是使用java.io.FileWriter和BufferedWriter,以及使用Java7的java.nio.file包中的Files和Path類,需要的朋友可以參考下
    2025-01-01

最新評論