亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

關(guān)于kafka-consumer-offset位移問(wèn)題

 更新時(shí)間:2023年03月07日 10:58:13   作者:SeaDhdhdhdhdh  
這篇文章主要介紹了關(guān)于kafka-consumer-offset位移問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

1 offset的默認(rèn)維護(hù)位置

_consumer_offsets主題里面采用key和 value的方式存儲(chǔ)數(shù)據(jù)。

key是 group.id+topic+分區(qū)號(hào),value 就是當(dāng)前offset的值。

每隔一段時(shí)間,kafka 內(nèi)部會(huì)對(duì)這個(gè)topic進(jìn)行compact(壓縮),也就是每個(gè)group.id+topic+分區(qū)號(hào)就保留最新數(shù)據(jù)。

Kafka0.9版本之前,consumer黑認(rèn)將offset保存在Zookeeper中。0.9版本開(kāi)始,consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的topic中,該topic為_(kāi)consumer_offsets。

將offset信息存儲(chǔ)在zk中的不足:如果將offset信息存儲(chǔ)在zk中,那么所有的consumer都會(huì)訪問(wèn)zk,會(huì)消耗大量的網(wǎng)絡(luò)資源,消費(fèi)速度慢。

1.1 消費(fèi)offset案例

思想:_consumer_offsets為Kafka中的 topic,那就可以通過(guò)消費(fèi)者進(jìn)行消費(fèi)。

在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默認(rèn)是 true,表示不能消費(fèi)系統(tǒng)主題。為了查看該系統(tǒng)主題數(shù)據(jù),所以該參數(shù)修改為false。修改以后執(zhí)行分發(fā)命令:xsync consumer.properties。

采用命令行方式,創(chuàng)建一個(gè)新的topic。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2

啟動(dòng)生產(chǎn)者往atguigu生產(chǎn)數(shù)據(jù)。

[atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092

啟動(dòng)消費(fèi)者消費(fèi)atguigu數(shù)據(jù)。

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test

注意:指定消費(fèi)者組名稱,更好觀察數(shù)據(jù)存儲(chǔ)位置(key是 group.id+topic+分區(qū)號(hào))。查看消費(fèi)者消費(fèi)主題_consumer_offsets。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

2 自動(dòng)提交offset

為了使我們能夠?qū)W⒂谧约旱臉I(yè)務(wù)邏輯,Kafka提供了自動(dòng)提交offset的功能。自動(dòng)提交offset的相關(guān)參數(shù):

  • enable.auto.commit:是否開(kāi)啟自動(dòng)提交offset功能,默認(rèn)是true
  • auto.commit.interval.ms:自動(dòng)提交offset的時(shí)間間隔,默認(rèn)是5s

消費(fèi)者配置代碼:

//配置是否是自動(dòng)提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//提交時(shí)間間隔,單位是ms
properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);

3 手動(dòng)提交offset

3.1 原理

雖然自動(dòng)提交offset十分簡(jiǎn)單便利,但由于其是基于時(shí)間提交的,開(kāi)發(fā)人員難以把握offset提交的時(shí)機(jī)。因此Kafka還提供了手動(dòng)提交offset的API。

手動(dòng)提交offset的方法有兩種:分別是commitSync(同步提交)commitAsync(異步提交)。

兩者的相同點(diǎn)是,都會(huì)將本次提交的一批數(shù)據(jù)最高的偏移量提交;不同點(diǎn)是,同步提交阻塞當(dāng)前線程,一直到提交成功,并且會(huì)自動(dòng)失敗重試(由不可控因素導(dǎo)致,也會(huì)出現(xiàn)提交失?。?;而異步提交則沒(méi)有失敗重試機(jī)制,故有可能提交失敗。

  • commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。
  • commitAsync(異步提交):發(fā)送完提交offset請(qǐng)求后,就開(kāi)始消費(fèi)下一批數(shù)據(jù)了

3.2 代碼示例

3.2.1 同步提交

//手動(dòng)提交屬性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消費(fèi)代碼邏輯
XXX
XXX
XXX
//手動(dòng)提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
//手動(dòng)提交offset
kafkaConsumer.commitsync();

3.2.2 異步提交(生產(chǎn)常用)

//手動(dòng)提交屬性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消費(fèi)代碼邏輯
XXX
XXX
XXX
//手動(dòng)提交代碼(處理完數(shù)據(jù)以后,這里為了方便,只展示關(guān)鍵代碼)
//手動(dòng)提交offset
kafkaConsumer.commitAsync();

4 指定offset消費(fèi)

auto.offset.reset = earliest | latest | none 默認(rèn)是latest。

當(dāng)Kafka 中沒(méi)有初始偏移量(消費(fèi)者組第一次消費(fèi))或服務(wù)器上不再存在當(dāng)前偏移量時(shí)(例如該數(shù)據(jù)已被刪除),該怎么辦?

  • earliest:自動(dòng)將偏移量重置為最早的偏移量,--from-beginning。
  • latest(默認(rèn)值):自動(dòng)將偏移量重置為最新偏移量。
  • none:如果未找到消費(fèi)者組的先前偏移量,則向消費(fèi)者拋出異常。

任意指定offset位移開(kāi)始消費(fèi)。

//1創(chuàng)建消費(fèi)者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2訂閱主題
ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);
 
//指定位置進(jìn)行消費(fèi)
set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
//保證分區(qū)分配方案已經(jīng)制定完畢,因?yàn)橛捎趌eader消費(fèi)者制定分配方案會(huì)消耗一定時(shí)間,有可能此時(shí)獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
while (assignment.size() == 0){
    //促使獲取的分區(qū)數(shù)量不為0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
 
//遍歷所有分區(qū),指定消費(fèi)的offset
for (TopicPartition topicPartition : assignment) {
    kafkaConsumer.seek(topicPartition, 100);
}
 
// 3消費(fèi)數(shù)據(jù)
while (true){

5 指定時(shí)間消費(fèi)

需求:在生產(chǎn)環(huán)境中,會(huì)遇到最近消費(fèi)的幾個(gè)小時(shí)數(shù)據(jù)異常,想重新按照時(shí)間消費(fèi)。

例如要求按照時(shí)間消費(fèi)前一天的數(shù)據(jù),怎么處理?

//1創(chuàng)建消費(fèi)者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2訂閱主題
ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);
 
//指定位置進(jìn)行消費(fèi)
set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區(qū)信息
//保證分區(qū)分配方案已經(jīng)制定完畢,因?yàn)橛捎趌eader消費(fèi)者制定分配方案會(huì)消耗一定時(shí)間,有可能此時(shí)獲取不到分區(qū)信息,所以加一層分區(qū)空間判斷
while (assignment.size() == 0){
    //促使獲取的分區(qū)數(shù)量不為0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
//希望把時(shí)間轉(zhuǎn)換為對(duì)應(yīng)的offset
HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
//封裝對(duì)應(yīng)集合
for (TopicPartition topicPartition : assignment) {
    //希望獲取當(dāng)前系統(tǒng)時(shí)間一天前的數(shù)據(jù)。
    topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);
 
 
//遍歷所有分區(qū),指定消費(fèi)的offset
//指定消費(fèi)的offset
for (TopicPartition topicPartition : assignment) {
    OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
 
// 3消費(fèi)數(shù)據(jù)
while (true){

6 漏消費(fèi)和重復(fù)消費(fèi)分析

6.1 重復(fù)消費(fèi)

場(chǎng)景1:重復(fù)消費(fèi)。自動(dòng)提交offset引起。

6.2 漏消費(fèi)

場(chǎng)景1:漏消費(fèi)。設(shè)置offset為手動(dòng)提交,當(dāng)offset被提交時(shí),數(shù)據(jù)還在內(nèi)存中未落盤,此時(shí)剛好消費(fèi)者線程被kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導(dǎo)致這部分內(nèi)存中的數(shù)據(jù)丟失。

6.3 消費(fèi)者事務(wù)

如果想完成Consumer端的精準(zhǔn)一次性消費(fèi),那么需要Kafka消費(fèi)端將消費(fèi)過(guò)程和提交offset過(guò)程做原子綁定

此時(shí)我們需要將Kafka的offset保存到支持事務(wù)的自定義介質(zhì)(比如MySQL)。這部分知識(shí)會(huì)在后續(xù)項(xiàng)目部分涉及。

7 數(shù)據(jù)積壓

方案1:如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)

方案2:如果是下游的數(shù)據(jù)處理不及時(shí):提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過(guò)少(拉取數(shù)據(jù)/處理時(shí)間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java數(shù)獨(dú)游戲完整版分享

    java數(shù)獨(dú)游戲完整版分享

    這篇文章主要為大家分享了java數(shù)獨(dú)游戲的完整版,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-12-12
  • java基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)全局唯一ID的示例

    java基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)全局唯一ID的示例

    本文主要介紹了java基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)全局唯一ID的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Java?中?hashCode()?與?equals()?的關(guān)系(面試)

    Java?中?hashCode()?與?equals()?的關(guān)系(面試)

    這篇文章主要介紹了Java中hashCode()與equals()的關(guān)系,ava中hashCode()和equals()的關(guān)系是面試中的??键c(diǎn),文章對(duì)hashCode與equals的關(guān)系做出詳解,需要的小伙伴可以參考一下
    2022-09-09
  • 淺談Spring Boot 整合ActiveMQ的過(guò)程

    淺談Spring Boot 整合ActiveMQ的過(guò)程

    本篇文章主要介紹了淺談Spring Boot 整合ActiveMQ的過(guò)程,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • java8從list集合中取出某一屬性的值的集合案例

    java8從list集合中取出某一屬性的值的集合案例

    這篇文章主要介紹了java8從list集合中取出某一屬性的值的集合案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-08-08
  • 引入mybatis-plus報(bào) Invalid bound statement錯(cuò)誤問(wèn)題的解決方法

    引入mybatis-plus報(bào) Invalid bound statement錯(cuò)誤問(wèn)題的解決方法

    這篇文章主要介紹了引入mybatis-plus報(bào) Invalid bound statement錯(cuò)誤問(wèn)題的解決方法,需要的朋友可以參考下
    2020-05-05
  • 關(guān)于JDK8中的字符串拼接示例詳解

    關(guān)于JDK8中的字符串拼接示例詳解

    字符串拼接問(wèn)題應(yīng)該是每個(gè)Java程序員都熟知的事情了,幾乎每個(gè)Java程序員都讀過(guò)關(guān)于StringBuffer/StringBuilder來(lái)拼接字符串。下面這篇文章主要給大家介紹了關(guān)于JDK8中的字符串拼接的相關(guān)資料,需要的朋友可以參考下。
    2018-04-04
  • 基于自定義BufferedReader中的read和readLine方法

    基于自定義BufferedReader中的read和readLine方法

    下面小編就為大家分享一篇基于自定義BufferedReader中的read和readLine方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • Mybatis日志模塊的適配器模式詳解

    Mybatis日志模塊的適配器模式詳解

    這篇文章主要介紹了Mybatis日志模塊的適配器模式詳解,,mybatis用了適配器模式來(lái)兼容這些框架,適配器模式就是通過(guò)組合的方式,將需要適配的類轉(zhuǎn)為使用者能夠使用的接口
    2022-08-08
  • Spring框架基于AOP實(shí)現(xiàn)簡(jiǎn)單日志管理步驟解析

    Spring框架基于AOP實(shí)現(xiàn)簡(jiǎn)單日志管理步驟解析

    這篇文章主要介紹了Spring框架基于AOP實(shí)現(xiàn)簡(jiǎn)單日志管理步驟解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06

最新評(píng)論