亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

@KafkaListener 如何使用

 更新時(shí)間:2023年02月20日 09:23:32   作者:DemonHunter211  
這篇文章主要介紹了@KafkaListener 如何使用,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家詳細(xì)講解,文末給大家介紹了kafka的消費(fèi)者分區(qū)分配策略,需要的朋友可以參考下

@KafkaListener 如何使用

spring-kafka使用基于@KafkaListener注解,@KafkaListener使用方式如下

@KafkaListener(topics = "topic1")
public void? ?kafkaListen(List<ConsumerRecord<xxx, xxx>> records) {
? ? ...
}

在注解內(nèi)指定topic名稱(chēng),當(dāng)對(duì)應(yīng)的topic內(nèi)有新的消息時(shí),testListen方法會(huì)被調(diào)用,參數(shù)就是topic內(nèi)新的消息。這個(gè)過(guò)程是異步進(jìn)行的。

@KafkaListener工作流程主要有以下幾步:

解析;解析@KafkaListener注解。
注冊(cè);解析后的數(shù)據(jù)注冊(cè)到spring-kafka。
監(jiān)聽(tīng);開(kāi)始監(jiān)聽(tīng)topic變更。
調(diào)用;調(diào)用注解標(biāo)識(shí)的方法,將監(jiān)聽(tīng)到的數(shù)據(jù)作為參數(shù)傳入。
下面我們一步一步分析

解析

@KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor類(lèi)解析,后者實(shí)現(xiàn)了BeanPostProcessor接口,這個(gè)接口如下

public interface BeanPostProcessor {

    Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;

    Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

接口內(nèi)部有2個(gè)方法,分別在bean初始化前后被調(diào)用。

KafkaListenerAnnotationBeanPostProcessor內(nèi)會(huì)在postProcessAfterInitialization方法內(nèi)解析@KafkaListener注解。

注冊(cè)
解析步驟里,我們可以獲取到所有含有@KafkaListener注解的類(lèi),之后這些類(lèi)的相關(guān)信息會(huì)被注冊(cè)到 KafkaListenerEndpointRegistry內(nèi),包括注解所在的方法,當(dāng)前的bean等。KafkaListenerEndpointRegistry這個(gè)類(lèi)內(nèi)部會(huì)維護(hù)多個(gè)Listener Container,每一個(gè)@KafkaListener都會(huì)對(duì)應(yīng)一個(gè)Listener Container。并且每個(gè)Container對(duì)應(yīng)一個(gè)線程。

監(jiān)聽(tīng)
注冊(cè)完成之后,每個(gè)Listener Container會(huì)開(kāi)始工作,會(huì)新啟一個(gè)新的線程,初始化KafkaConsumer,監(jiān)聽(tīng)topic變更等。

調(diào)用
監(jiān)聽(tīng)到數(shù)據(jù)之后,container會(huì)組織消息的格式,隨后調(diào)用解析得到的@KafkaListener注解標(biāo)識(shí)的方法,將組織后的消息作為參數(shù)傳入方法,執(zhí)行用戶邏輯。

@KafkaListener和@KafkaListners

@KafkaListeners是@KafkaListener的Container Annotation,這也是jdk8的新特性之一,注解可以重復(fù)標(biāo)注。

@KafkaListeners({@KafkaListener(topics="topic1"), @KafkaListener(topics="topic2")})
public void listen(ConsumerRecord<Integer, String> msg) {}
 
等同于
 
@KafkaListener(topics="topic1")
@KafkaListener(topics="topic2")
public void listen(ConsumerRecord<Integer, String> msg) {}

擴(kuò)展:kafka的消費(fèi)者分區(qū)分配策略

kafka有三種分區(qū)分配策略

1. RoundRobin

2. Range

3. Sticky

1. RoundRobin

(1)把所有topic的分區(qū)partition放入一個(gè)隊(duì)列中,按照name的hashcode進(jìn)行排序;

(2)把consumer放在一個(gè)循環(huán)隊(duì)列,按照name的hashcode進(jìn)行排序;

(3)循環(huán)遍歷consumer,從partition隊(duì)列pop出一個(gè)partition,分配給當(dāng)前consumer;以此類(lèi)推,取下一個(gè)consumer,繼續(xù)從partition隊(duì)列pop出來(lái)分配給當(dāng)前consumer;直到partition隊(duì)列中的元素被分配完;

2. Range

(1)假設(shè)topicA有4個(gè)分區(qū),topicB有5個(gè)分區(qū),topicC有6個(gè)分區(qū);一共有3個(gè)consumer;

(2)遍歷3個(gè)topic的分區(qū)集合,先取topicA的分區(qū)集合,然后準(zhǔn)備依次給3個(gè)consumer分配分區(qū);對(duì)于第1個(gè)consumer,所分配的分區(qū)數(shù)量根據(jù)以下公式:假設(shè)消費(fèi)者數(shù)量為N,當(dāng)前主題剩下的分區(qū)數(shù)量為M,則當(dāng)前消費(fèi)者應(yīng)該分配的分區(qū)數(shù)量 = M%N==0? M/N +1 : M/N ;按照公式,3個(gè)消費(fèi)者應(yīng)該分配的分區(qū)數(shù)量依次為:2/1/1,即topicA-partition-0/1分配給consumer-0,topicA-partition-2分配給consumer-1,topicA-partition-3分配給consumer-2;

(3)按照上述規(guī)則按序把topicB和topicC的分區(qū)分配給3個(gè)consumer;依次為:2/2/1,2/2/2;

3. Sticky

kafka在0.11版本引入了Sticky分區(qū)分配策略,它的兩個(gè)主要目的是:

1. 分區(qū)的分配要盡可能的均勻,分配給消費(fèi)者者的主題分區(qū)數(shù)最多相差一個(gè);

2. 分區(qū)的分配盡可能的與上次分配的保持相同;

當(dāng)兩者發(fā)生沖突時(shí),第一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo);

粘性分區(qū)是由Kafka從0.11x版本開(kāi)始引入的分配策略,首先會(huì)盡量均衡的分配分區(qū)到消費(fèi)者上面,在出現(xiàn)同一消費(fèi)組內(nèi)消費(fèi)者出現(xiàn)問(wèn)題的時(shí)候,會(huì)盡量保持原來(lái)的分配的分區(qū)不變;

Sticky分區(qū)初始分配分區(qū)的方法與Range相似,但是不同;拿7個(gè)分區(qū)3個(gè)消費(fèi)者為例,消費(fèi)者消費(fèi)的分區(qū)依舊是3/2/2,但是不同與Range的是Range分區(qū)是排好序的,但是Sticky分區(qū)是隨機(jī)的;

到此這篇關(guān)于@KafkaListener 如何使用方式的文章就介紹到這了,更多相關(guān)@KafkaListener 使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論