亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

解決kafka消息堆積及分區(qū)不均勻的問題

 更新時間:2021年09月11日 16:02:25   作者:筏鏡  
這篇文章主要介紹了解決kafka消息堆積及分區(qū)不均勻的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

kafka消息堆積及分區(qū)不均勻的解決

我在環(huán)境中發(fā)現(xiàn)代碼里面的kafka有所延遲,查看kafka消息發(fā)現(xiàn)堆積嚴(yán)重,經(jīng)過檢查發(fā)現(xiàn)是kafka消息分區(qū)不均勻造成的,消費速度過慢。這里由自己在虛擬機上演示相關(guān)問題,給大家提供相應(yīng)問題的參考思路。

這篇文章有點遺憾并沒重現(xiàn)分區(qū)不均衡的樣例和Warning: Consumer group ‘testGroup1' is rebalancing. 這里僅將正確的方式展示,等后續(xù)重現(xiàn)了在進行補充。

主要有兩個要點:

  • 1、一個消費者組只消費一個topic.
  • 2、factory.setConcurrency(concurrency);這里設(shè)置監(jiān)聽并發(fā)數(shù)為 部署單元節(jié)點*concurrency=分區(qū)數(shù)量

1、先在kafka消息中創(chuàng)建

對應(yīng)分區(qū)數(shù)目的topic(testTopic2,testTopic3)testTopic1由代碼創(chuàng)建

./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2

2、添加配置文件application.properties

kafka.test.topic1=testTopic1
kafka.test.topic2=testTopic2
kafka.test.topic3=testTopic3
kafka.broker=192.168.25.128:9092
auto.commit.interval.time=60000
#kafka.test.group=customer-test
kafka.test.group1=testGroup1
kafka.test.group2=testGroup2
kafka.test.group3=testGroup3
kafka.offset=earliest
kafka.auto.commit=false

session.timeout.time=10000
kafka.concurrency=2

3、創(chuàng)建kafka工廠

package com.yin.customer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

/**
 * @author yin
 * @Date 2019/11/24 15:54
 * @Method
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${kafka.broker}")
    private String broker;
    @Value("${kafka.auto.commit}")
    private String autoCommit;

   // @Value("${kafka.test.group}")
    //private String testGroup;

    @Value("${session.timeout.time}")
    private String sessionOutTime;

    @Value("${auto.commit.interval.time}")
    private String autoCommitTime;

    @Value("${kafka.offset}")
    private String offset;
    @Value("${kafka.concurrency}")
    private Integer concurrency;

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //監(jiān)聽設(shè)置兩個個分區(qū)
        factory.setConcurrency(concurrency);
        //打開批量拉取數(shù)據(jù)
        factory.setBatchListener(true);
        //這里設(shè)置的是心跳時間也是拉的時間,也就說每間隔max.poll.interval.ms我們就調(diào)用一次poll,kafka默認是300s,心跳只能在poll的時候發(fā)出,如果連續(xù)兩次poll的時候超過
        //max.poll.interval.ms 值就會導(dǎo)致rebalance
        //心跳導(dǎo)致GroupCoordinator以為本地consumer節(jié)點掛掉了,引發(fā)了partition在consumerGroup里的rebalance。
        // 當(dāng)rebalance后,之前該consumer擁有的分區(qū)和offset信息就失效了,同時導(dǎo)致不斷的報auto offset commit failed。
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private ConsumerFactory<String,String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        //kafka的地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        //是否自動提交 Offset
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        // enable.auto.commit 設(shè)置成 false,那么 auto.commit.interval.ms 也就不被再考慮
        //默認5秒鐘,一個 Consumer 將會提交它的 Offset 給 Kafka
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,  5000);

        //這個值必須設(shè)置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
        //zookeeper.session.timeout.ms 默認值:6000
        //ZooKeeper的session的超時時間,如果在這段時間內(nèi)沒有收到ZK的心跳,則會被認為該Kafka server掛掉了。
        // 如果把這個值設(shè)置得過低可能被誤認為掛掉,如果設(shè)置得過高,如果真的掛了,則需要很長時間才能被server得知。
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //組與組間的消費者是沒有關(guān)系的。
        //topic中已有分組消費數(shù)據(jù),新建其他分組ID的消費者時,之前分組提交的offset對新建的分組消費不起作用。
        //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);

        //當(dāng)創(chuàng)建一個新分組的消費者時,auto.offset.reset值為latest時,
        // 表示消費新的數(shù)據(jù)(從consumer創(chuàng)建開始,后生產(chǎn)的數(shù)據(jù)),之前產(chǎn)生的數(shù)據(jù)不消費。
        // https://blog.csdn.net/u012129558/article/details/80427016

        //earliest 當(dāng)分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費。
       // latest 當(dāng)分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)。

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        //不是指每次都拉50條數(shù)據(jù),而是一次最多拉50條數(shù)據(jù)()
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        return propsMap;
    }
}

4、展示kafka消費者

@Component
public class KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "${kafka.test.topic1}",groupId = "${kafka.test.group1}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
        logger.info("testTopic1 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    Thread.sleep(300);
                    logger.info("p1 topic is:{} received message={}",topic, message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "${kafka.test.topic2}",groupId = "${kafka.test.group2}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
        logger.info("testTopic2 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    Thread.sleep(300);
                    logger.info("p2 topic :{},received message={}",topic, message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "${kafka.test.topic3}",groupId = "${kafka.test.group3}",containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        logger.info("testTopic3 recevice a message size :{}" , records.size());

        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                logger.info("received:{} " , record);
                if (kafkaMessage.isPresent()) {
                    Object message = record.value();
                    String topic = record.topic();
                    logger.info("p3 topic :{},received message={}",topic, message);
                    Thread.sleep(300);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ack.acknowledge();
        }
    }
}

查看分區(qū)消費情況:

在這里插入圖片描述

kafka出現(xiàn)若干分區(qū)不消費的現(xiàn)象

近日,有用戶反饋kafka有topic出現(xiàn)某個消費組消費的時候,有幾個分區(qū)一直不消費消息,消息一直積壓(圖1)。除了一直積壓外,還有一個現(xiàn)象就是消費組一直在重均衡,大約每5分鐘就會重均衡一次。具體表現(xiàn)為消費分區(qū)的owner一直在改變(圖2)。

(圖1)

(圖2)

定位過程

業(yè)務(wù)側(cè)沒有報錯,同時kafka服務(wù)端日志也一切正常,同事先將消費組的機器滾動重啟,仍然還是那幾個分區(qū)沒有消費,之后將這幾個不消費的分區(qū)遷移至別的broker上,依然沒有消費。

還有一個奇怪的地方,就是每次重均衡后,不消費的那幾個分區(qū)的消費owner所在機器的網(wǎng)絡(luò)都有流量變化。按理說不消費應(yīng)該就是拉取不到分區(qū)不會有流量的。于是讓運維去拉了下不消費的consumer的jstack日志。一看果然發(fā)現(xiàn)了問題所在。

從堆??矗琧onsumer已經(jīng)拉取到消息,然后就一直卡在處理消息的業(yè)務(wù)邏輯上。這說明kafka是沒有問題的,用戶的業(yè)務(wù)邏輯有問題。

consumer在拉取完一批消息后,就一直在處理這批消息,但是這批消息中有若干條消息無法處理,而業(yè)務(wù)又沒有超時操作或者異常處理導(dǎo)致進程一直處于消費中,無法去poll下一批數(shù)據(jù)。

又由于業(yè)務(wù)采用的是autocommit的offset提交方式,而根據(jù)源碼可知,consumer只有在下一次poll中才會自動提交上次poll的offset,所以業(yè)務(wù)一直在拉取同一批消息而無法更新offset。反映的現(xiàn)象就是該consumer對應(yīng)的分區(qū)的offset一直沒有變,所以有積壓的現(xiàn)象。

至于為什么會一直在重均衡消費組的原因也很明了了,就是因為有消費者一直卡在處理消息的業(yè)務(wù)邏輯上,超過了max.poll.interval.ms(默認5min),消費組就會將該消費者踢出消費組,從而發(fā)生重均衡。

驗證

讓業(yè)務(wù)方去查證業(yè)務(wù)日志,驗證了積壓的這幾個分區(qū),總是在循環(huán)的拉取同一批消息。

解決方法

臨時解決方法就是跳過有問題的消息,將offset重置到有問題的消息之后。本質(zhì)上還是要業(yè)務(wù)側(cè)修改業(yè)務(wù)邏輯,增加超時或者異常處理機制,最好不要采用自動提交offset的方式,可以手動管理。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java中Maven Shade插件的具體使用

    Java中Maven Shade插件的具體使用

    Maven Shade插件它可以幫助你在構(gòu)建項目時打包所有依賴項,并將其打包到一個單獨的JAR文件中,本文就介紹一下Maven Shade插件的具體使用,具有一定參考價值,感興趣的可以了解一下
    2023-08-08
  • JDK14性能管理工具之Jconsole的使用詳解

    JDK14性能管理工具之Jconsole的使用詳解

    JConsole是JDK自帶的管理工具,在JAVA_HOME/bin下面,直接命令JConsole即可開啟JConsole。接下來通過本文給大家分享JDK14性能管理工具之Jconsole的使用,感興趣的朋友一起看看吧
    2020-05-05
  • BigDecimal divide除法除不盡報錯的問題及解決

    BigDecimal divide除法除不盡報錯的問題及解決

    這篇文章主要介紹了BigDecimal divide除法除不盡報錯的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • MybatisPlus開啟二級緩存的方法詳解

    MybatisPlus開啟二級緩存的方法詳解

    這篇文章主要介紹了MybatisPlus開啟二級緩存的方法詳解,二級緩存是基于mapper文件的namespace級別,也就是說多個sqlSession可以共享一個mapper中的二級緩存區(qū)域,需要的朋友可以參考下
    2023-11-11
  • SpringMVC詳解如何映射請求數(shù)據(jù)

    SpringMVC詳解如何映射請求數(shù)據(jù)

    這篇文章主要給大家介紹了關(guān)于SpringMvc映射請求數(shù)據(jù)的相關(guān)資料,文中通過實例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2022-06-06
  • 解決SpringBoot掃描不到公共類的實體問題

    解決SpringBoot掃描不到公共類的實體問題

    這篇文章主要介紹了解決SpringBoot掃描不到公共類的實體問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java中線程組ThreadGroup與線程池的區(qū)別及示例

    Java中線程組ThreadGroup與線程池的區(qū)別及示例

    這篇文章主要介紹了Java中線程組與線程池的區(qū)別及示例,ThreadGroup是用來管理一組線程的,可以控制線程的執(zhí)行,查看線程的執(zhí)行狀態(tài)等操作,方便對于一組線程的統(tǒng)一管理,需要的朋友可以參考下
    2023-05-05
  • Java中雙冒號::的作用舉例詳解

    Java中雙冒號::的作用舉例詳解

    這篇文章主要給大家介紹了關(guān)于Java中雙冒號::作用的相關(guān)資料,雙冒號(::)運算符在Java?8中被用作方法引用(method?reference),方法引用是與lambda表達式相關(guān)的一個重要特性,需要的朋友可以參考下
    2023-11-11
  • java如何替換word/doc文件中的內(nèi)容

    java如何替換word/doc文件中的內(nèi)容

    docx格式的文件本質(zhì)上是一個XML文件,只要用占位符在指定的地方標(biāo)記,然后替換掉標(biāo)記出的內(nèi)容,這篇文章主要介紹了java替換word/doc文件中的內(nèi)容,需要的朋友可以參考下
    2023-06-06
  • 解析Runtime中shutdown hook的使用詳解

    解析Runtime中shutdown hook的使用詳解

    本篇文章是對解析Runtime中shutdown hook的使用進行了詳細的分析介紹,需要的朋友參考下
    2013-05-05

最新評論