亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

spring?kafka?@KafkaListener詳解與使用過程

 更新時(shí)間:2023年02月20日 09:20:06   作者:石臻臻的雜貨鋪  
這篇文章主要介紹了spring-kafka?@KafkaListener詳解與使用,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

說明

  • 從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們將被忽略;
  • 可以使用#{…?}或?qū)傩哉嘉环?{…?})在SpEL上配置注釋上的大多數(shù)屬性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

屬性concurrency將會(huì)從容器中獲取listen.concurrency的值,如果不存在就默認(rèn)用3

@KafkaListener詳解

id 監(jiān)聽器的id

①. 消費(fèi)者線程命名規(guī)則

填寫:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消費(fèi)

沒有填寫ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的監(jiān)聽器ID不能重復(fù)

否則會(huì)報(bào)錯(cuò)

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.會(huì)覆蓋消費(fèi)者工廠的消費(fèi)組GroupId

假如配置文件屬性配置了消費(fèi)組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認(rèn)消費(fèi)組
但是如果設(shè)置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當(dāng)前消費(fèi)者的消費(fèi)組就是consumer-id7 ;

當(dāng)然如果你不想要他作為groupId的話 可以設(shè)置屬性idIsGroup = false;那么還是會(huì)使用默認(rèn)的GroupId;

④. 如果配置了屬性groupId,則其優(yōu)先級(jí)最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代碼中最終這個(gè)消費(fèi)者的消費(fèi)組GroupId是 “groupId-test”

該id屬性(如果存在)將用作Kafka消費(fèi)者group.id屬性,并覆蓋消費(fèi)者工廠中的已配置屬性(如果存在)您還可以groupId顯式設(shè)置或?qū)⑵湓O(shè)置idIsGroup為false,以恢復(fù)使用使用者工廠的先前行為group.id。

groupId 消費(fèi)組名

指定該消費(fèi)組的消費(fèi)組名; 關(guān)于消費(fèi)組名的配置可以看看上面的 id 監(jiān)聽器的id

如何獲取消費(fèi)者 group.id

在監(jiān)聽器中調(diào)用KafkaUtils.getConsumerGroupId()可以獲得當(dāng)前的groupId; 可以在日志中打印出來; 可以知道是哪個(gè)客戶端消費(fèi)的;

topics 指定要監(jiān)聽哪些topic(與topicPattern、topicPartitions 三選一)

可以同時(shí)監(jiān)聽多個(gè)
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic進(jìn)行監(jiān)聽(與topics、topicPartitions 三選一) topicPartitions 顯式分區(qū)分配

可以為監(jiān)聽器配置明確的主題和分區(qū)(以及可選的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 監(jiān)聽topic1的0,1分區(qū);監(jiān)聽topic2的第0分區(qū),并且第1分區(qū)從offset為100的開始消費(fèi);

errorHandler 異常處理

實(shí)現(xiàn)KafkaListenerErrorHandler; 然后做一些異常處理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

調(diào)用的時(shí)候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 監(jiān)聽器工廠

指定生成監(jiān)聽器的工廠類;

例如我寫一個(gè) 批量消費(fèi)的工廠類

    /**
     * 監(jiān)聽器工廠 批量消費(fèi)
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客戶端前綴

會(huì)覆蓋消費(fèi)者工廠的kafka.consumer.client-id屬性; 最為前綴后面接 -n n是數(shù)字

concurrency并發(fā)數(shù)

會(huì)覆蓋消費(fèi)者工廠中的concurrency ,這里的并發(fā)數(shù)就是多線程消費(fèi); 比如說單機(jī)情況下,你設(shè)置了3; 相當(dāng)于就是啟動(dòng)了3個(gè)客戶端來分配消費(fèi)分區(qū);分布式情況 總線程數(shù)=concurrency*機(jī)器數(shù)量; 并不是設(shè)置越多越好,具體如何設(shè)置請(qǐng)看Java concurrency之集合

    /**
     * 監(jiān)聽器工廠 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監(jiān)聽器數(shù)量 是1;

properties 配置其他屬性

kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 獲取所有注冊(cè)的監(jiān)聽器
        registry.getAllListenerContainers();

設(shè)置入?yún)Ⅱ?yàn)證器

當(dāng)您將Spring Boot與驗(yàn)證啟動(dòng)器一起使用時(shí),將LocalValidatorFactoryBean自動(dòng)配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

spring-kafka官方文檔

擴(kuò)展:Spring for Apache Kafka @KafkaListener使用及注意事項(xiàng)

官方文檔:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo類:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}</code>

配置類及注解:
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

到此這篇關(guān)于spring-kafka @KafkaListener詳解與使用的文章就介紹到這了,更多相關(guān)spring-kafka使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot @FixMethodOrder 如何調(diào)整單元測(cè)試順序

    SpringBoot @FixMethodOrder 如何調(diào)整單元測(cè)試順序

    這篇文章主要介紹了SpringBoot @FixMethodOrder 調(diào)整單元測(cè)試順序方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java web實(shí)現(xiàn)自動(dòng)登錄功能

    java web實(shí)現(xiàn)自動(dòng)登錄功能

    這篇文章主要為大家詳細(xì)介紹了java web實(shí)現(xiàn)自動(dòng)登錄功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • SpringBoot項(xiàng)目中連接SQL Server的三種方式

    SpringBoot項(xiàng)目中連接SQL Server的三種方式

    連接SQL Server是許多Spring Boot項(xiàng)目中常見的需求之一,本文主要介紹了SpringBoot項(xiàng)目中連接SQL Server的三種方式,具有一定的參考價(jià)值 ,感興趣的可以了解一下
    2023-09-09
  • Java基于Javafaker生成測(cè)試數(shù)據(jù)

    Java基于Javafaker生成測(cè)試數(shù)據(jù)

    這篇文章主要介紹了Java基于Javafaker生成測(cè)試數(shù)據(jù)的方法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-12-12
  • Hibernate核心類和接口的詳細(xì)介紹

    Hibernate核心類和接口的詳細(xì)介紹

    今天小編就為大家分享一篇關(guān)于Hibernate核心類和接口的詳細(xì)介紹,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • java long轉(zhuǎn)String +Codeforces110A案例

    java long轉(zhuǎn)String +Codeforces110A案例

    這篇文章主要介紹了java long轉(zhuǎn)String +Codeforces110A案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • XFire構(gòu)建web service客戶端的五種方式

    XFire構(gòu)建web service客戶端的五種方式

    本篇文章主要介紹了XFire構(gòu)建web service客戶端的五種方式。具有很好的參考價(jià)值,下面跟著小編一起來看下吧
    2017-01-01
  • Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全

    Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全

    在Java中QueryWrapper是MyBatis-Plus框架中的一個(gè)查詢構(gòu)造器,它提供了豐富的查詢方法,其中包括and和or方法,可以用于構(gòu)建復(fù)雜的查詢條件,這篇文章主要給大家介紹了關(guān)于Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧的相關(guān)資料,需要的朋友可以參考下
    2024-07-07
  • Lombok在idea中的使用教程

    Lombok在idea中的使用教程

    Lombok是一個(gè)可以通過簡(jiǎn)單的注解形式,來幫助我們簡(jiǎn)化消除一些必須有但顯得很臃腫(如果getter、setter方法)的Java代碼的工具,通過使用對(duì)應(yīng)的注解,可以在編譯源碼的時(shí)候生成對(duì)應(yīng)的方法,這篇文章主要介紹了Lombok在idea中的使用,需要的朋友可以參考下
    2023-03-03
  • Java應(yīng)用服務(wù)器之tomcat會(huì)話復(fù)制集群配置的示例詳解

    Java應(yīng)用服務(wù)器之tomcat會(huì)話復(fù)制集群配置的示例詳解

    這篇文章主要介紹了Java應(yīng)用服務(wù)器之tomcat會(huì)話復(fù)制集群配置的相關(guān)知識(shí),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07

最新評(píng)論