亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

springboot+kafka中@KafkaListener動態(tài)指定多個topic問題

 更新時間:2022年12月27日 11:35:56   作者:Forward233  
這篇文章主要介紹了springboot+kafka中@KafkaListener動態(tài)指定多個topic問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

說明

本項目為springboot+kafak的整合項目,故其用了springboot中對kafak的消費注解@KafkaListener

首先,application.properties中配置用逗號隔開的多個topic。

方法:利用Spring的SpEl表達式,將topics 配置為:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)

運行程序,console打印的效果如下:

因為只開了一條消費者線程,所以所有的topic和分區(qū)都分配給這條線程。

如果你想開多條消費者線程去消費這些topic,添加@KafkaListener注解的參數(shù)concurrency的值為自己想要的消費者個數(shù)即可(注意,消費者數(shù)要小于等于你開的所有topic的分區(qū)數(shù)總和)

運行程序,console打印的效果如下:

總結(jié)一下大家問的最多的一個問題

如何在程序運行的過程中,改變topic,消費者能夠消費修改后的topic?

ans: 經(jīng)過嘗試,使用@KafkaListener注解實現(xiàn)不了此需求,在程序啟動的時候,程序就會根據(jù)@KafkaListener的注解信息初始化好消費者去消費指定好的topic。如果在程序運行的過程中,修改topic,不會讓此消費者修改消費者的配置再重新訂閱topic的。

不過我們可以有個折中的辦法,就是利用@KafkaListener的topicPattern參數(shù)來進行topic匹配。

具體如何操作的可以看下這篇文章:

http://chabaoo.cn/article/271098.htm

終極方法

思路

不使用@KafkaListener,使用kafka原生客戶端依賴,手動初始化消費者,開啟消費者線程。

在消費者線程中,每次循環(huán)都從配置、數(shù)據(jù)庫或者其他配置源獲取最新的topic信息,與之前的topic比較,如果發(fā)生變化,重新訂閱topic或者初始化消費者。

實現(xiàn)

加入kafka客戶端依賴(本次測試服務(wù)端kafka版本:2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>

代碼

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消費者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消費者(配置寫死是為了快速測試,請大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服務(wù)器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必須指定消費者組
        props.put("group.id", "haha");
        //設(shè)置數(shù)據(jù)key和value的序列化處理類
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //創(chuàng)建消息者實例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //訂閱topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 開啟消費者線程
     * 異常請自己根據(jù)需求自己處理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 開啟一個消費者線程
            new Thread(() -> {
                while (true) {
                    // 模擬從配置源中獲取最新的topic(字符串,逗號隔開)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic發(fā)生變化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 發(fā)生變化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新訂閱topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:關(guān)閉原來的消費者,重新初始化一個消費者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}

說一下第72行代碼:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

上面這行代碼表示:在100ms內(nèi)等待Kafka的broker返回數(shù)據(jù).超市參數(shù)指定poll在多久之后可以返回,不管有沒有可用的數(shù)據(jù)都要返回。

在修改topic后,必須等到此次poll拉取的消息處理完,while(true)循環(huán)的時候檢測topic發(fā)生變化,才能重新訂閱topic.

poll()方法一次拉取得消息數(shù)默認為:500,如下圖,kafka客戶端源碼中設(shè)置的。

如果想自定義此配置,可在初始化消費者時加入

運行結(jié)果(測試的topic中都無數(shù)據(jù))

注意:KafkaConsumer是線程不安全的,不要用一個KafkaConsumer實例開啟多個消費者,要開啟多個消費者,需要new 多個KafkaConsumer實例。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Datagram Scoket雙向通信

    Datagram Scoket雙向通信

    這篇文章主要介紹了Datagram Scoket雙向通信,需要的朋友可以參考下
    2014-04-04
  • SpringMVC多個文件上傳及上傳后立即顯示圖片功能

    SpringMVC多個文件上傳及上傳后立即顯示圖片功能

    這篇文章主要介紹了SpringMVC多個文件上傳及上傳后立即顯示圖片功能,非常不錯,具有參考借鑒價值功能,需要的朋友可以參考下
    2017-10-10
  • 淺談SpringMVC請求映射handler源碼解讀

    淺談SpringMVC請求映射handler源碼解讀

    這篇文章主要介紹了淺談SpringMVC請求映射handler源碼解讀,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Java Socket實現(xiàn)聊天室附1500行源代碼

    Java Socket實現(xiàn)聊天室附1500行源代碼

    Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。本篇文章手把手帶你通過Java Socket來實現(xiàn)自己的聊天室,大家可以在過程中查缺補漏,溫故而知新
    2021-10-10
  • Java8 中的ParallelStream

    Java8 中的ParallelStream

    這篇文章主要介紹了Java8 中的并行流 ParallelStreams,Java8并行流ParallelStream和Stream的區(qū)別就是支持并行執(zhí)行,提高程序運行效率。下面就來看看文章內(nèi)容具體介紹吧
    2021-10-10
  • SpringBoot整合MyBatis-Plus3.1教程詳解

    SpringBoot整合MyBatis-Plus3.1教程詳解

    這篇文章主要介紹了SpringBoot整合MyBatis-Plus3.1詳細教程,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-08-08
  • MybatisPlus查詢條件為空字符串或null問題及解決

    MybatisPlus查詢條件為空字符串或null問題及解決

    這篇文章主要介紹了MybatisPlus查詢條件為空字符串或null問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Java 編程之IO流資料詳細整理

    Java 編程之IO流資料詳細整理

    這篇文章主要介紹了Java 編程之IO流資料詳細整理的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • 在Intellij IDEA中使用Debug(圖文教程)

    在Intellij IDEA中使用Debug(圖文教程)

    下面小編就為大家?guī)硪黄贗ntellij IDEA中使用Debug(圖文教程)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • 詳解java封裝繼承多態(tài)

    詳解java封裝繼承多態(tài)

    這篇文章主要介紹了java封裝繼承多態(tài),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03

最新評論