消息隊列-kafka消費(fèi)異常問題
概述
在kafka中,或者是說在任何消息隊列中都有個消費(fèi)順序的問題。為了保證一個隊列順序消費(fèi),當(dāng)當(dāng)中一個消息消費(fèi)異常時,必將影響后續(xù)隊列消息的消費(fèi),這樣業(yè)務(wù)豈不是卡住了。比如筆者舉個最簡單的例子:我發(fā)送1-100的消息,在我的處理邏輯當(dāng)中 msg%5==0我就進(jìn)行 int i=1/0操作,這必將拋異常,一直阻塞在msg=5上,后面6-100無法消費(fèi)。下面筆者給出解決方案。
重試一定次數(shù)(消息丟失)
@KafkaHandler @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1") public void test6(String msg){ businessProcess(msg); } private void businessProcess(String msg){ System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode()); if (Integer.valueOf(msg) % 5 == 0) { int i = 1 / 0; } }
說明:如果讀者使用的是java客戶端,也就是spring進(jìn)行實現(xiàn),那么在不做任何處理的情況下,會自動重試10次,然后消息會被直接處理掉。也就是說如果你的業(yè)務(wù)允許消息丟失,那么你不需要額外的編碼處理
加入到死訊隊列(消息不丟失)
消費(fèi)端代碼:
//1.啟用手動提交offset //2.配置errorHandler,用來加入到死訊隊列 //3.不管業(yè)務(wù)處理是否處理異常還是正常都提交offset @KafkaHandler @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", errorHandler ="kafkaListenerErrorHandler", concurrency = "1") public void test6(String msg,Acknowledgment ack){ try { businessProcess(msg); }finally { //手動提交 ack.acknowledge(); } } //1.專門處理死訊隊列消息,都是topicName+.DLT的主題 //2.死訊隊列里,只有消費(fèi)成功的才提交offset,否則等待bug修復(fù)完上線,繼續(xù)處理 @KafkaHandler @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1") public void test7(String msg,Acknowledgment ack){ try { businessProcess(msg); ack.acknowledge(); }catch (Exception e){ e.printStackTrace(); } } //業(yè)務(wù)代碼 private void businessProcess(String msg){ System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode()); if (Integer.valueOf(msg) % 5 == 0) { int i = 1 / 0; } }
異常處理器
//1.向容器注冊一個KafkaListenerErrorHandler類型的bean //2.該bean就是當(dāng)處理消息異常的時候,將消息加入到.DLT主題中 @Component("kafkaListenerErrorHandler") public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private static final String TOPIC_DLT=".DLT"; @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { System.out.println("消費(fèi)失敗消息:"+message.toString()); //獲取消息處理異常主題 MessageHeaders headers = message.getHeaders(); String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT; //放入死訊隊列 kafkaTemplate.send(topic,message.getPayload()); return message; } }
效果圖:
說明:以上基本上就是使用死訊隊列的方案,也許讀者會覺得這樣編碼復(fù)雜度很高,但其實不用擔(dān)心,其實上面這些代碼基本上是使用死訊隊列的模板代碼,在成熟一點(diǎn)的公司,一般會使用上述代碼進(jìn)行簡單封裝,這里筆者給個思路,有興趣同學(xué)可以實現(xiàn)一下。我們其實可以使用aop思想,進(jìn)行自定義一個@EnableDLT這樣的注解去實現(xiàn),這樣上面這個方案使用起來是不是就簡單優(yōu)雅了。之前筆者在開發(fā)過程中使用過亞馬遜的消息隊列服務(wù),也不過是這樣實現(xiàn)罷了。
總結(jié)
本篇文章就到這里了,希望可以給你帶來一些幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
SpringBoot中使用MyBatis-Plus實現(xiàn)分頁接口的詳細(xì)教程
MyBatis-Plus是一個MyBatis的增強(qiáng)工具,在MyBatis的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生,在SpringBoot項目中使用MyBatis-Plus可以大大簡化分頁邏輯的編寫,本文將介紹如何在 SpringBoot項目中使用MyBatis-Plus實現(xiàn)分頁接口2024-03-03解決idea中java出現(xiàn)無效的源發(fā)行版問題
這篇文章主要給大家介紹了關(guān)于解決idea中java出現(xiàn)無效的源發(fā)行版問題的相關(guān)資料,無效的源發(fā)行版是指IntelliJ IDEA無法正確識別和處理的源代碼版本,這可能是由于錯誤的配置、缺少依賴項、不兼容的插件或其他問題導(dǎo)致的,需要的朋友可以參考下2024-01-01java使用FastJson解析Json數(shù)據(jù)
本篇文章主要介紹了java使用FastJson解析Json數(shù)據(jù),fastjson 是一個性能極好的用 Java 語言實現(xiàn)的 JSON 解析器和生成器,有興趣的可以了解一下。2017-02-02Mybatis中強(qiáng)大的resultMap功能介紹
這篇文章主要給大家介紹了關(guān)于Mybatis中強(qiáng)大的resultMap功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06MyBatis查詢數(shù)據(jù),賦值給List集合時,數(shù)據(jù)缺少的問題及解決
這篇文章主要介紹了MyBatis查詢數(shù)據(jù),賦值給List集合時,數(shù)據(jù)缺少的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01