亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))

 更新時間:2020年07月01日 10:53:49   作者:程序員內(nèi)點(diǎn)事  
這篇文章主要介紹了springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家分享小編實(shí)際開發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),內(nèi)容簡單易懂,需要的朋友可以參考下

本文收錄在個人博客:www.chengxy-nds.top,技術(shù)資源共享,一起進(jìn)步

最近部門號召大伙多組織一些技術(shù)分享會,說是要活躍公司的技術(shù)氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術(shù)交流還是很有助于個人成長的。

于是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點(diǎn)KPI,就是想和大伙一起學(xué)習(xí)學(xué)習(xí)!

這次我分享的是 springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,以及在實(shí)際開發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),其實(shí)整體的內(nèi)容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。

可以看到使用了 RabbitMQ 以后,我們的業(yè)務(wù)鏈路明顯變長了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場景也增加了。例如:

  • 消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失?。?/li>
  • rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失
  • 消息消費(fèi)者 - > rabbitmq服務(wù)(消費(fèi)消息失?。?/li>

所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認(rèn)機(jī)制以后,盡管很大程度上保證了消息的準(zhǔn)確送達(dá),但由于頻繁的確認(rèn)交互,rabbitmq 整體效率變低,吞吐量下降嚴(yán)重,不是非常重要的消息真心不建議你用消息確認(rèn)機(jī)制。

下邊我們先來實(shí)現(xiàn)springboot + rabbitmq消息確認(rèn)機(jī)制,再對遇到的問題做具體分析。

一、準(zhǔn)備環(huán)境

1、引入 rabbitmq 依賴包

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、修改 application.properties 配置

配置中需要開啟 發(fā)送端消費(fèi)端 的消息確認(rèn)。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 發(fā)送者開啟 confirm 確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開啟 return 確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true
####################################################
# 設(shè)置消費(fèi)端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

3、定義 Exchange 和 Queue

定義交換機(jī) confirmTestExchange 和隊列 confirm_test_queue ,并將隊列綁定在交換機(jī)上。

@Configuration
public class QueueConfig {

 @Bean(name = "confirmTestQueue")
 public Queue confirmTestQueue() {
 return new Queue("confirm_test_queue", true, false, false);
 }

 @Bean(name = "confirmTestExchange")
 public FanoutExchange confirmTestExchange() {
 return new FanoutExchange("confirmTestExchange");
 }

 @Bean
 public Binding confirmTestFanoutExchangeAndQueue(
 @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
 @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
 return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
 }
}

rabbitmq 的消息確認(rèn)分為兩部分:發(fā)送消息確認(rèn) 和 消息接收確認(rèn)。

二、消息發(fā)送確認(rèn)

發(fā)送消息確認(rèn):用來確認(rèn)生產(chǎn)者 producer 將消息發(fā)送到 broker ,broker 上的交換機(jī) exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

消息從 producerrabbitmq broker有一個 confirmCallback 確認(rèn)模式。

消息從 exchangequeue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達(dá)。

1、 ConfirmCallback確認(rèn)模式

消息只要被 rabbitmq broker 接收到就會觸發(fā) confirmCallback 回調(diào) 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
 
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {

 if (!ack) {
 log.error("消息發(fā)送異常!");
 } else {
 log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
 }
 }
}

實(shí)現(xiàn)接口 ConfirmCallback ,重寫其confirm()方法,方法內(nèi)有三個參數(shù)correlationData、ack、cause。

  • correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當(dāng)前消息的唯一性。
  • ack:消息投遞到broker 的狀態(tài),true表示成功。
  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會被投遞到目標(biāo) queue 里。所以接下來需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標(biāo) queue 里將觸發(fā)回調(diào) returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
 }
}

實(shí)現(xiàn)接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數(shù)message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊列)。

下邊是具體的消息發(fā)送,在rabbitTemplate中設(shè)置 ConfirmReturn 回調(diào),我們通過setDeliveryMode()對消息做持久化處理,為了后續(xù)測試創(chuàng)建一個 CorrelationData對象,添加一個id10000000000。

@Autowired
 private RabbitTemplate rabbitTemplate;

 @Autowired
 private ConfirmCallbackService confirmCallbackService;

 @Autowired
 private ReturnCallbackService returnCallbackService;

 public void sendMessage(String exchange, String routingKey, Object msg) {

 /**
 * 確保消息發(fā)送失敗后可以重新返回到隊列中
 * 注意:yml需要配置 publisher-returns: true
 */
 rabbitTemplate.setMandatory(true);

 /**
 * 消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理
 */
 rabbitTemplate.setConfirmCallback(confirmCallbackService);

 /**
 * 消息投遞到隊列失敗回調(diào)處理
 */
 rabbitTemplate.setReturnCallback(returnCallbackService);

 /**
 * 發(fā)送消息
 */
 rabbitTemplate.convertAndSend(exchange, routingKey, msg,
 message -> {
  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  return message;
 },
 new CorrelationData(UUID.randomUUID().toString()));
 }

三、消息接收確認(rèn)

消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡單一點(diǎn),因?yàn)橹挥幸粋€消息回執(zhí)(ack)的過程。使用@RabbitHandler注解標(biāo)注的方法要增加 channel(信道)、message 兩個參數(shù)。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
 
 @RabbitHandler
 public void processHandler(String msg, Channel channel, Message message) throws IOException {

 try {
 log.info("小富收到消息:{}", msg);

 //TODO 具體業(yè)務(wù)
 
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 } catch (Exception e) {
 
 if (message.getMessageProperties().getRedelivered()) {
 
 log.error("消息已重復(fù)處理失敗,拒絕再次接收...");
 
 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
 } else {
 
 log.error("消息即將再次返回隊列處理...");
 
 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
 }
 }
 }
}

消費(fèi)消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認(rèn),使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投遞序號,每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag的消息進(jìn)行acknack、reject等操作。

multiple:是否批量確認(rèn),值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

舉個栗子: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時deliveryTag為8,multiple設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。

2、basicNack

basicNack :表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號。

multiple:是否批量確認(rèn)。

requeue:值為 true 消息將重新入隊列。

3、basicReject

basicReject:拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

四、測試

發(fā)送消息測試一下消息確認(rèn)機(jī)制是否生效,從執(zhí)行結(jié)果上看發(fā)送者發(fā)消息后成功回調(diào),消費(fèi)端成功的消費(fèi)了消息。

用抓包工具Wireshark 觀察一下rabbitmq amqp協(xié)議交互的變化,也多了 ack 的過程。

五、踩坑日志

1、不消息確認(rèn)

這是一個非常沒技術(shù)含量的坑,但卻是非常容易犯錯的地方。

開啟消息確認(rèn)機(jī)制,消費(fèi)消息別忘了channel.basicAck,否則消息會一直存在,導(dǎo)致重復(fù)消費(fèi)。

2、消息無限投遞

在我最開始接觸消息確認(rèn)機(jī)制的時候,消費(fèi)端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0 發(fā)生異常后將消息重新投入隊列。

@RabbitHandler
 public void processHandler(String msg, Channel channel, Message message) throws IOException {

 try {
 log.info("消費(fèi)者 2 號收到:{}", msg);

 int a = 1 / 0;

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 } catch (Exception e) {

 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
 }
 }

但是有個問題是,業(yè)務(wù)代碼一旦出現(xiàn) bug 99.9%的情況是不會自動修復(fù),一條消息會被無限投遞進(jìn)隊列,消費(fèi)端無限執(zhí)行,導(dǎo)致了死循環(huán)。

本地的CPU被瞬間打滿了,大家可以想象一下當(dāng)時在生產(chǎn)環(huán)境導(dǎo)致服務(wù)死機(jī),我是有多慌。

而且rabbitmq management 只有一條未被確認(rèn)的消息。

經(jīng)過測試分析發(fā)現(xiàn),當(dāng)消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費(fèi)者會立刻消費(fèi)這條消息,業(yè)務(wù)處理再拋出異常,消息再重新入隊,如此反復(fù)進(jìn)行。導(dǎo)致消息隊列處理出現(xiàn)阻塞,導(dǎo)致正常消息也無法運(yùn)行。

而我們當(dāng)時的解決方案是,先將消息進(jìn)行應(yīng)答,此時消息隊列會刪除該條消息,同時我們再次發(fā)送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業(yè)務(wù)的進(jìn)行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發(fā)送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
  message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
  JSON.toJSONBytes(msg));

但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優(yōu)化設(shè)置了消息重試次數(shù),達(dá)到了重試上限以后,手動確認(rèn),隊列刪除此消息,并將消息持久化入MySQL并推送報警,進(jìn)行人工處理和定時任務(wù)做補(bǔ)償。

3、重復(fù)消費(fèi)

如何保證 MQ 的消費(fèi)是冪等性,這個需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL、或者redis 將消息持久化,通過再消息中的唯一性屬性校驗(yàn)。

demoGitHub 地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm

總結(jié)

到此這篇關(guān)于springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))的文章就介紹到這了,更多相關(guān)springboot rabbitmq 消息確認(rèn)機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java中CompleteFuture與Future的區(qū)別小結(jié)

    java中CompleteFuture與Future的區(qū)別小結(jié)

    本文主要介紹了java中CompleteFuture與Future的區(qū)別小結(jié),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-12-12
  • sentinel?整合spring?cloud限流的過程解析

    sentinel?整合spring?cloud限流的過程解析

    這篇文章主要介紹了sentinel?整合spring?cloud限流,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-03-03
  • Spring boot事件監(jiān)聽實(shí)現(xiàn)過程解析

    Spring boot事件監(jiān)聽實(shí)現(xiàn)過程解析

    這篇文章主要介紹了,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-06-06
  • 詳解java實(shí)現(xiàn)簡單掃碼登錄功能(模仿微信網(wǎng)頁版掃碼)

    詳解java實(shí)現(xiàn)簡單掃碼登錄功能(模仿微信網(wǎng)頁版掃碼)

    這篇文章主要介紹了java實(shí)現(xiàn)簡單掃碼登錄功能(模仿微信網(wǎng)頁版掃碼),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-05-05
  • mybatis對象List<String> List<Integer>屬性映射方式

    mybatis對象List<String> List<Integer>屬性映射方式

    這篇文章主要介紹了mybatis對象List<String> List<Integer>屬性映射方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • SpringBoot中全局異常處理的5種實(shí)現(xiàn)方式小結(jié)

    SpringBoot中全局異常處理的5種實(shí)現(xiàn)方式小結(jié)

    在實(shí)際開發(fā)中,異常處理是一個非常重要的環(huán)節(jié),合理的異常處理機(jī)制不僅能提高系統(tǒng)的健壯性,還能大大提升用戶體驗(yàn),下面我們就來看看SpringBoot中全局異常處理的5種實(shí)現(xiàn)方式吧
    2025-03-03
  • Java?NIO?Buffer實(shí)現(xiàn)原理詳解

    Java?NIO?Buffer實(shí)現(xiàn)原理詳解

    本篇文章主要對NIO核心三件套:緩沖區(qū)(Buffer)、選擇器?(Selector)和通道(Channel),其中之一的緩沖區(qū)Buffer實(shí)現(xiàn)原理的學(xué)習(xí)總結(jié)。感興趣的小伙伴可以了解一下
    2021-11-11
  • springboot開啟Bean數(shù)據(jù)校驗(yàn)功能

    springboot開啟Bean數(shù)據(jù)校驗(yàn)功能

    這篇文章主要介紹了springboot開啟Bean數(shù)據(jù)校驗(yàn)功能,通過啟用Bean屬性校驗(yàn)導(dǎo)入JSR303與Hibernate校驗(yàn)框架坐標(biāo),使用@Validated注解啟用校驗(yàn)功能,需要的朋友可以參考下
    2023-10-10
  • SpringBoot的pom.xml文件中設(shè)置多環(huán)境配置信息方法詳解

    SpringBoot的pom.xml文件中設(shè)置多環(huán)境配置信息方法詳解

    這篇文章主要給大家介紹了關(guān)于SpringBoot的pom.xml文件中設(shè)置多環(huán)境配置信息的相關(guān)資料,Java項目通過pom.xml管理多中間件和多環(huán)境配置,結(jié)合application.yml動態(tài)替換配置文件,利用Maven切換不同環(huán)境配置,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-05-05
  • Java插件擴(kuò)展機(jī)制之SPI案例講解

    Java插件擴(kuò)展機(jī)制之SPI案例講解

    這篇文章主要介紹了Java插件擴(kuò)展機(jī)制之SPI案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-07-07

最新評論