亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

消息隊列-kafka消費(fèi)異常問題

 更新時間:2021年07月02日 10:47:47   作者:禿頭披風(fēng)俠_  
這篇文章主要給大家介紹了關(guān)于kafka的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

概述

在kafka中,或者是說在任何消息隊列中都有個消費(fèi)順序的問題。為了保證一個隊列順序消費(fèi),當(dāng)當(dāng)中一個消息消費(fèi)異常時,必將影響后續(xù)隊列消息的消費(fèi),這樣業(yè)務(wù)豈不是卡住了。比如筆者舉個最簡單的例子:我發(fā)送1-100的消息,在我的處理邏輯當(dāng)中 msg%5==0我就進(jìn)行 int i=1/0操作,這必將拋異常,一直阻塞在msg=5上,后面6-100無法消費(fèi)。下面筆者給出解決方案。

重試一定次數(shù)(消息丟失)

@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test6(String msg){
              businessProcess(msg);
            }
           private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
       if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

說明:如果讀者使用的是java客戶端,也就是spring進(jìn)行實現(xiàn),那么在不做任何處理的情況下,會自動重試10次,然后消息會被直接處理掉。也就是說如果你的業(yè)務(wù)允許消息丟失,那么你不需要額外的編碼處理

加入到死訊隊列(消息不丟失)

消費(fèi)端代碼:

//1.啟用手動提交offset
//2.配置errorHandler,用來加入到死訊隊列
//3.不管業(yè)務(wù)處理是否處理異常還是正常都提交offset
@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
            errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
    public void test6(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
        }finally {
            //手動提交
            ack.acknowledge();
        }
    }
//1.專門處理死訊隊列消息,都是topicName+.DLT的主題
//2.死訊隊列里,只有消費(fèi)成功的才提交offset,否則等待bug修復(fù)完上線,繼續(xù)處理
    @KafkaHandler
    @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test7(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
//業(yè)務(wù)代碼
    private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
        if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

異常處理器

//1.向容器注冊一個KafkaListenerErrorHandler類型的bean
//2.該bean就是當(dāng)處理消息異常的時候,將消息加入到.DLT主題中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
   @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private static final String TOPIC_DLT=".DLT";
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        System.out.println("消費(fèi)失敗消息:"+message.toString());
        //獲取消息處理異常主題
        MessageHeaders headers = message.getHeaders();
        String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
        //放入死訊隊列
        kafkaTemplate.send(topic,message.getPayload());
        return message;
    }
}

效果圖

image.png 

說明:以上基本上就是使用死訊隊列的方案,也許讀者會覺得這樣編碼復(fù)雜度很高,但其實不用擔(dān)心,其實上面這些代碼基本上是使用死訊隊列的模板代碼,在成熟一點(diǎn)的公司,一般會使用上述代碼進(jìn)行簡單封裝,這里筆者給個思路,有興趣同學(xué)可以實現(xiàn)一下。我們其實可以使用aop思想,進(jìn)行自定義一個@EnableDLT這樣的注解去實現(xiàn),這樣上面這個方案使用起來是不是就簡單優(yōu)雅了。之前筆者在開發(fā)過程中使用過亞馬遜的消息隊列服務(wù),也不過是這樣實現(xiàn)罷了。

總結(jié)

本篇文章就到這里了,希望可以給你帶來一些幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!

相關(guān)文章

  • SpringBoot中使用MyBatis-Plus實現(xiàn)分頁接口的詳細(xì)教程

    SpringBoot中使用MyBatis-Plus實現(xiàn)分頁接口的詳細(xì)教程

    MyBatis-Plus是一個MyBatis的增強(qiáng)工具,在MyBatis的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生,在SpringBoot項目中使用MyBatis-Plus可以大大簡化分頁邏輯的編寫,本文將介紹如何在 SpringBoot項目中使用MyBatis-Plus實現(xiàn)分頁接口
    2024-03-03
  • SpringMVC自定義日期轉(zhuǎn)換器方式

    SpringMVC自定義日期轉(zhuǎn)換器方式

    這篇文章主要介紹了SpringMVC如何自定義日期轉(zhuǎn)換器問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Java的Jackson自定義序列化詳解

    Java的Jackson自定義序列化詳解

    這篇文章主要介紹了Java的Jackson自定義序列化詳解,對比序列化器,可以看到,使用@JsonValue注解已經(jīng)將Leader類的序列化方式改變了,進(jìn)而影響了Country類,再來執(zhí)行test7()測試反序列化,結(jié)果與之前是一致的,需要的朋友可以參考下
    2023-11-11
  • 解決idea中java出現(xiàn)無效的源發(fā)行版問題

    解決idea中java出現(xiàn)無效的源發(fā)行版問題

    這篇文章主要給大家介紹了關(guān)于解決idea中java出現(xiàn)無效的源發(fā)行版問題的相關(guān)資料,無效的源發(fā)行版是指IntelliJ IDEA無法正確識別和處理的源代碼版本,這可能是由于錯誤的配置、缺少依賴項、不兼容的插件或其他問題導(dǎo)致的,需要的朋友可以參考下
    2024-01-01
  • java使用FastJson解析Json數(shù)據(jù)

    java使用FastJson解析Json數(shù)據(jù)

    本篇文章主要介紹了java使用FastJson解析Json數(shù)據(jù),fastjson 是一個性能極好的用 Java 語言實現(xiàn)的 JSON 解析器和生成器,有興趣的可以了解一下。
    2017-02-02
  • SpringBoot中的自動裝配原理解析

    SpringBoot中的自動裝配原理解析

    這篇文章主要介紹了SpringBoot中的自動裝配原理解析,自動裝配就是指 Spring 容器在不使用<constructor-arg>和<property>標(biāo)簽的情況下,可以自動裝配(autowire)相互協(xié)作的Bean之間的關(guān)聯(lián)關(guān)系,將一個 Bean注入其他Bean的Property中,需要的朋友可以參考下
    2023-08-08
  • Mybatis中強(qiáng)大的resultMap功能介紹

    Mybatis中強(qiáng)大的resultMap功能介紹

    這篇文章主要給大家介紹了關(guān)于Mybatis中強(qiáng)大的resultMap功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • MyBatis查詢數(shù)據(jù),賦值給List集合時,數(shù)據(jù)缺少的問題及解決

    MyBatis查詢數(shù)據(jù),賦值給List集合時,數(shù)據(jù)缺少的問題及解決

    這篇文章主要介紹了MyBatis查詢數(shù)據(jù),賦值給List集合時,數(shù)據(jù)缺少的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Springboot接口參數(shù)校驗的方法

    Springboot接口參數(shù)校驗的方法

    在設(shè)計接口時我們通常需要對接口中的非法參數(shù)做校驗,以降低在程序運(yùn)行時因為一些非法參數(shù)而導(dǎo)致程序發(fā)生異常的風(fēng)險,這篇文章給大家介紹Springboot接口參數(shù)校驗的方法,感興趣的朋友一起看看吧
    2024-03-03
  • 淺談Java中的克隆close()和賦值引用的區(qū)別

    淺談Java中的克隆close()和賦值引用的區(qū)別

    下面小編就為大家?guī)硪黄獪\談Java中的克隆close()和賦值引用的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-09-09

最新評論