spring boot Rabbit高級(jí)教程(最新推薦)
消息可靠性
生產(chǎn)者重試機(jī)制
首先第一種情況,就是生產(chǎn)者發(fā)送消息時(shí),出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。
為了解決這個(gè)問(wèn)題,SpringAMQP提供的消息發(fā)送時(shí)的重試機(jī)制。即:當(dāng)RabbitTemplate
與MQ連接超時(shí)后,多次重試。
修改publisher
模塊的application.yaml
文件,添加下面的內(nèi)容:
spring: rabbitmq: connection-timeout: 1s # 設(shè)置MQ的連接超時(shí)時(shí)間 template: retry: enabled: true # 開啟超時(shí)重試機(jī)制 initial-interval: 1000ms # 失敗后的初始等待時(shí)間 multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier max-attempts: 3 # 最大重試次數(shù)
注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過(guò)SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說(shuō)多次重試等待的過(guò)程中,當(dāng)前線程是被阻塞的。
如果對(duì)于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請(qǐng)合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然也可以考慮使用異步線程來(lái)執(zhí)行發(fā)送消息的代碼。
:::
生產(chǎn)者確認(rèn)機(jī)制
一般情況下,只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會(huì)出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無(wú)需考慮這種問(wèn)題。
不過(guò),在少數(shù)情況下,也會(huì)出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:
- MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常
- 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到
Exchange
- 生產(chǎn)者發(fā)送消息到達(dá)MQ的
Exchange
后,未找到合適的Queue
,因此無(wú)法路由
針對(duì)上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher Confirm
和Publisher Return
兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會(huì)根據(jù)消息處理的情況返回不同的回執(zhí)。
- 當(dāng)消息投遞到MQ,但是路由失敗時(shí),通過(guò)Publisher Return返回異常信息,同時(shí)返回ack的確認(rèn)信息,代表投遞成功
- 臨時(shí)消息投遞到了MQ,并且入隊(duì)成功,返回ACK,告知投遞成功
- 持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回ACK ,告知投遞成功
- 其它情況都會(huì)返回NACK,告知投遞失敗
其中ack
和nack
屬于Publisher Confirm機(jī)制,ack
是投遞成功;nack
是投遞失敗。而return
則屬于Publisher Return機(jī)制。
默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過(guò)配置文件來(lái)開啟。
在publisher模塊的application.yaml
中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型 publisher-returns: true # 開啟publisher return機(jī)制
這里publisher-confirm-type
有三種模式可選:
none
:關(guān)閉confirm機(jī)制simple
:同步阻塞等待MQ的回執(zhí)correlated
:MQ異步回調(diào)返回回執(zhí)
一般我們推薦使用correlated
,回調(diào)機(jī)制。
定義ReturnCallback
每個(gè)RabbitTemplate
只能配置一個(gè)ReturnCallback
,因此我們可以在配置類中統(tǒng)一設(shè)置。我們?cè)趐ublisher模塊定義一個(gè)配置類:
內(nèi)容如下:
package com.itheima.publisher.config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @AllArgsConstructor @Configuration public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("觸發(fā)return callback,"); log.debug("exchange: {}", returned.getExchange()); log.debug("routingKey: {}", returned.getRoutingKey()); log.debug("message: {}", returned.getMessage()); log.debug("replyCode: {}", returned.getReplyCode()); log.debug("replyText: {}", returned.getReplyText()); } }); } }
定義ConfirmCallback
由于每個(gè)消息發(fā)送時(shí)的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時(shí)定義。具體來(lái)說(shuō),是在調(diào)用RabbitTemplate中的convertAndSend方法時(shí),多傳遞一個(gè)參數(shù):
這里的CorrelationData中包含兩個(gè)核心的東西:
id
:消息的唯一標(biāo)示,MQ對(duì)不同的消息的回執(zhí)以此做判斷,避免混淆SettableListenableFuture
:回執(zhí)結(jié)果的Future對(duì)象
將來(lái)MQ的回執(zhí)就會(huì)通過(guò)這個(gè)Future
來(lái)返回,我們可以提前給CorrelationData
中的Future
添加回調(diào)函數(shù)來(lái)處理消息回執(zhí):
我們新建一個(gè)測(cè)試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback
:
@Test void testPublisherConfirm() { // 1.創(chuàng)建CorrelationData CorrelationData cd = new CorrelationData(); // 2.給Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 2.1.Future發(fā)生異常時(shí)的處理邏輯,基本不會(huì)觸發(fā) log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容 if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí) log.debug("發(fā)送消息成功,收到 ack!"); }else{ // result.getReason(),String類型,返回nack時(shí)的異常描述 log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason()); } } }); // 3.發(fā)送消息 rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); }
執(zhí)行結(jié)果如下:
可以看到,由于傳遞的RoutingKey
是錯(cuò)誤的,路由失敗后,觸發(fā)了return callback
,同時(shí)也收到了ack。
當(dāng)我們修改為正確的RoutingKey
以后,就不會(huì)觸發(fā)return callback
了,只收到ack。
而如果連交換機(jī)都是錯(cuò)誤的,則只會(huì)收到nack。
注意:
開啟生產(chǎn)者確認(rèn)比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發(fā)確認(rèn)的幾種情況:
- 路由失敗:一般是因?yàn)镽outingKey錯(cuò)誤導(dǎo)致,往往是編程導(dǎo)致
- 交換機(jī)名稱錯(cuò)誤:同樣是編程錯(cuò)誤導(dǎo)致
- MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對(duì)消息可靠性要求非常高的業(yè)務(wù)才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了。
數(shù)據(jù)持久化
為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲(chǔ)的臨時(shí)數(shù)據(jù),重啟后就會(huì)消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
我們以控制臺(tái)界面為例來(lái)說(shuō)明。
交換機(jī)持久化
在控制臺(tái)的Exchanges
頁(yè)面,添加交換機(jī)時(shí)可以配置交換機(jī)的Durability
參數(shù):
設(shè)置為Durable
就是持久化模式,Transient
就是臨時(shí)模式。
隊(duì)列持久化
在控制臺(tái)的Queues頁(yè)面,添加隊(duì)列時(shí),同樣可以配置隊(duì)列的Durability
參數(shù):
除了持久化以外。
消息持久化
在控制臺(tái)發(fā)送消息的時(shí)候,可以添加很多參數(shù),而消息的持久化是要配置一個(gè)properties
:
說(shuō)明:在開啟持久化機(jī)制以后,如果同時(shí)還開啟了生產(chǎn)者確認(rèn),那么MQ會(huì)在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。
不過(guò)出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫(kù)的,而是每隔一段時(shí)間批量持久化。一般間隔在100毫秒左右,這就會(huì)導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。
LazyQueue
在默認(rèn)情況下,RabbitMQ會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會(huì)導(dǎo)致消息積壓,比如:
- 消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
- 消息發(fā)送量激增,超過(guò)了消費(fèi)者處理速度
- 消費(fèi)者處理業(yè)務(wù)發(fā)生阻塞
一旦出現(xiàn)消息堆積問(wèn)題,RabbitMQ的內(nèi)存占用就會(huì)越來(lái)越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時(shí)RabbitMQ會(huì)將內(nèi)存消息刷到磁盤上,這個(gè)行為成為PageOut
. PageOut
會(huì)耗費(fèi)一段時(shí)間,并且會(huì)阻塞隊(duì)列進(jìn)程。因此在這個(gè)過(guò)程中RabbitMQ不會(huì)再處理新的消息,生產(chǎn)者的所有請(qǐng)求都會(huì)被阻塞。
為了解決這個(gè)問(wèn)題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存(也就是懶加載)
- 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級(jí)MQ為3.12版本或者所有隊(duì)列都設(shè)置為L(zhǎng)azyQueue模式。
控制臺(tái)配置Lazy模式
在添加隊(duì)列的時(shí)候,添加x-queue-mod=lazy
參數(shù)即可設(shè)置隊(duì)列為L(zhǎng)azy模式:
代碼配置Lazy模式
在利用SpringAMQP聲明隊(duì)列的時(shí)候,添加x-queue-mod=lazy
參數(shù)也可設(shè)置隊(duì)列為L(zhǎng)azy模式:
@Bean public Queue lazyQueue(){ return QueueBuilder .durable("lazy.queue") .lazy() // 開啟Lazy模式 .build(); }
這里是通過(guò)QueueBuilder
的lazy()
函數(shù)配置Lazy模式。
當(dāng)然,我們也可以基于注解來(lái)聲明隊(duì)列并設(shè)置為L(zhǎng)azy模式:
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy") )) public void listenLazyQueue(String msg){ log.info("接收到 lazy.queue的消息:{}", msg); }
更新已有隊(duì)列為lazy模式
對(duì)于已經(jīng)存在的隊(duì)列,也可以配置為lazy模式,但是要通過(guò)設(shè)置policy實(shí)現(xiàn)。
可以基于命令行設(shè)置policy:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一個(gè)策略Lazy
:策略名稱,可以自定義"^lazy-queue$"
:用正則表達(dá)式匹配隊(duì)列的名字'{"queue-mode":"lazy"}'
:設(shè)置隊(duì)列模式為lazy模式--apply-to queues
:策略的作用對(duì)象,是所有的隊(duì)列
當(dāng)然,也可以在控制臺(tái)配置policy,進(jìn)入在控制臺(tái)的Admin
頁(yè)面,點(diǎn)擊Policies
,即可添加配置:
消費(fèi)者的可靠性
當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因?yàn)橄⑼哆f給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:
- 消息投遞的過(guò)程中出現(xiàn)了網(wǎng)絡(luò)故障
- 消費(fèi)者接收到消息后突然宕機(jī)
- 消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異常
一旦發(fā)生上述情況,消息也會(huì)丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。
但問(wèn)題來(lái)了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?
本章我們就一起研究一下消費(fèi)者處理消息時(shí)的可靠性解決方案。
消費(fèi)者確認(rèn)機(jī)制
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:
- ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息
- nack:消息處理失敗,RabbitMQ需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息
一般reject方式用的較少,除非是消息格式有問(wèn)題,那就是開發(fā)問(wèn)題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過(guò)try catch
機(jī)制捕獲,消息處理成功時(shí)返回ack,處理失敗時(shí)返回nack.
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實(shí)現(xiàn)了消息確認(rèn)。并允許我們通過(guò)配置文件設(shè)置ACK處理方式,有三種模式:
**none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用
**manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活
**auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack;
如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;
————————————————
版權(quán)聲明:本文為CSDN博主「過(guò)去日記」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/studycodeday/article/details/133839942
**none**
:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用**manual**
:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack
或reject
,存在業(yè)務(wù)入侵,但更靈活**auto**
:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack
. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:- 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回
nack
; - 如果是消息處理或校驗(yàn)異常,自動(dòng)返回
reject
;
- 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回
通過(guò)下面的配置可以修改SpringAMQP的ACK處理方式:
spring: rabbitmq: listener: simple: acknowledge-mode: none # 不做處理
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理的異常:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { log.info("spring 消費(fèi)者接收到消息:【" + msg + "】"); if (true) { throw new MessageConversionException("故意的"); } log.info("消息處理完成"); }
測(cè)試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時(shí),消息依然被RabbitMQ刪除了。
我們?cè)俅伟汛_認(rèn)機(jī)制修改為auto:
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 自動(dòng)ack
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked
(未確定狀態(tài)):
放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會(huì)自動(dòng)返回reject
,所以消息依然會(huì)被刪除
我們將異常改為RuntimeException類型:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { log.info("spring 消費(fèi)者接收到消息:【" + msg + "】"); if (true) { throw new RuntimeException("故意的"); } log.info("消息處理完成"); }
在異常位置打斷點(diǎn),然后再次發(fā)送消息測(cè)試,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked
(未確定狀態(tài))以后,由于拋出的是業(yè)務(wù)異常,所以Spring返回ack
,最終消息恢復(fù)至Ready
狀態(tài),并且沒(méi)有被RabbitMQ刪除
當(dāng)我們把配置改為auto
時(shí),消息處理失敗后,會(huì)回到RabbitMQ,并重新投遞到消費(fèi)者。
失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯(cuò),消息會(huì)再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。
極端情況就是消費(fèi)者一直無(wú)法執(zhí)行成功,那么消息requeue就會(huì)無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:
當(dāng)然,上述極端情況發(fā)生的概率還是非常低的,不過(guò)不怕一萬(wàn)就怕萬(wàn)一。為了應(yīng)對(duì)上述情況Spring又提供了消費(fèi)者失敗重試機(jī)制:在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費(fèi)者失敗重試 initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒 multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):
- 消費(fèi)者在失敗后消息沒(méi)有重新回到MQ無(wú)限重新投遞,而是在本地重試了3次
- 本地重試3次以后,拋出了
AmqpRejectAndDontRequeueException
異常。查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是reject
結(jié)論:
- 開啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄
失敗處理策略
在之前的測(cè)試中,本地測(cè)試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄。這在某些對(duì)于消息可靠性要求較高的業(yè)務(wù)場(chǎng)景下,顯然不太合適了。
因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個(gè)策略是由MessageRecovery
接口來(lái)定義的,它有3個(gè)不同實(shí)現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接reject
,丟棄消息。默認(rèn)就是這種方式ImmediateRequeueMessageRecoverer
:重試耗盡后,返回nack
,消息重新入隊(duì)RepublishMessageRecoverer
:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer
,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
完整代碼如下:
package com.itheima.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
業(yè)務(wù)冪等性
冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來(lái)描述是這樣的:f(x) = f(f(x))
,例如求絕對(duì)值函數(shù)。
在程序開發(fā)中,則是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。例如:
- 根據(jù)id刪除數(shù)據(jù)
- 查詢數(shù)據(jù)
- 新增數(shù)據(jù)
但數(shù)據(jù)的更新往往不是冪等的,如果重復(fù)執(zhí)行可能造成不一樣的后果。比如:
- 取消訂單,恢復(fù)庫(kù)存的業(yè)務(wù)。如果多次恢復(fù)就會(huì)出現(xiàn)庫(kù)存重復(fù)增加的情況
- 退款業(yè)務(wù)。重復(fù)退款對(duì)商家而言會(huì)有經(jīng)濟(jì)損失。
所以,我們要盡可能避免業(yè)務(wù)被重復(fù)執(zhí)行。
然而在實(shí)際業(yè)務(wù)場(chǎng)景中,由于意外經(jīng)常會(huì)出現(xiàn)業(yè)務(wù)被重復(fù)執(zhí)行的情況,例如:
- 頁(yè)面卡頓時(shí)頻繁刷新導(dǎo)致表單重復(fù)提交
- 服務(wù)間調(diào)用的重試
- MQ消息的重復(fù)投遞
我們?cè)谟脩糁Ц冻晒髸?huì)發(fā)送MQ消息到交易服務(wù),修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復(fù)投遞的情況。如果消費(fèi)者不做判斷,很有可能導(dǎo)致消息被消費(fèi)多次,出現(xiàn)業(yè)務(wù)故障。
舉例:
- 假如用戶剛剛支付完成,并且投遞消息到交易服務(wù),交易服務(wù)更改訂單為已支付狀態(tài)。
- 由于某種原因,例如網(wǎng)絡(luò)故障導(dǎo)致生產(chǎn)者沒(méi)有得到確認(rèn),隔了一段時(shí)間后重新投遞給交易服務(wù)。
- 但是,在新投遞的消息被消費(fèi)之前,用戶選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。
- 退款完成后,新投遞的消息才被消費(fèi),那么訂單狀態(tài)會(huì)被再次改為已支付。業(yè)務(wù)異常。
因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:
- 唯一消息ID
- 業(yè)務(wù)狀態(tài)判斷
唯一消息ID
這個(gè)思路非常簡(jiǎn)單:
- 每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者。
- 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫(kù)
- 如果下次又收到相同消息,去數(shù)據(jù)庫(kù)查詢判斷是否存在,存在則為重復(fù)消息放棄處理。
我們?cè)撊绾谓o消息添加唯一ID呢?
其實(shí)很簡(jiǎn)單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個(gè)功能即可。
以Jackson的消息轉(zhuǎn)換器為例:
@Bean public MessageConverter messageConverter(){ // 1.定義消息轉(zhuǎn)換器 Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(); // 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息 jjmc.setCreateMessageIds(true); return jjmc; }
業(yè)務(wù)判斷
業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來(lái)判斷是否是重復(fù)的請(qǐng)求或消息,不同的業(yè)務(wù)場(chǎng)景判斷的思路也不一樣。
處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時(shí)判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過(guò),無(wú)需重復(fù)處理。
相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫(kù),所以我更推薦使用業(yè)務(wù)判斷的方案。
以支付修改訂單的業(yè)務(wù)為例,我們需要修改OrderServiceImpl
中的markOrderPaySuccess
方法:
@Override public void markOrderPaySuccess(Long orderId) { // 1.查詢訂單 Order old = getById(orderId); // 2.判斷訂單狀態(tài) if (old == null || old.getStatus() != 1) { // 訂單不存在或者訂單狀態(tài)不是1,放棄處理 return; } // 3.嘗試更新訂單 Order order = new Order(); order.setId(orderId); order.setStatus(2); order.setPayTime(LocalDateTime.now()); updateById(order); }
上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動(dòng)作,因此在極小概率下可能存在線程安全問(wèn)題。
我們可以合并上述操作為這樣:
@Override public void markOrderPaySuccess(Long orderId) { // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1 lambdaUpdate() .set(Order::getStatus, 2) .set(Order::getPayTime, LocalDateTime.now()) .eq(Order::getId, orderId) .eq(Order::getStatus, 1) .update(); }
注意看,上述代碼等同于這樣的SQL語(yǔ)句:
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我們?cè)趙here條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說(shuō)明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會(huì)執(zhí)行。
兜底方案
其實(shí)思想很簡(jiǎn)單:既然MQ通知不一定發(fā)送到交易服務(wù),那么交易服務(wù)就必須自己主動(dòng)去查詢支付狀態(tài)。這樣即便支付服務(wù)的MQ通知失敗,我們依然能通過(guò)主動(dòng)查詢來(lái)保證訂單狀態(tài)的一致。
流程如下:
圖中黃色線圈起來(lái)的部分就是MQ通知失敗后的兜底處理方案,由交易服務(wù)自己主動(dòng)去查詢支付狀態(tài)。
不過(guò)需要注意的是,交易服務(wù)并不知道用戶會(huì)在什么時(shí)候支付,如果查詢的時(shí)機(jī)不正確(比如查詢的時(shí)候用戶正在支付中),可能查詢到的支付狀態(tài)也不正確。
那么問(wèn)題來(lái)了,我們到底該在什么時(shí)間主動(dòng)查詢支付狀態(tài)呢?
這個(gè)時(shí)間是無(wú)法確定的,因此,通常我們采取的措施就是利用定時(shí)任務(wù)定期查詢.
- 首先,支付服務(wù)會(huì)正在用戶支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。
- 其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞的可靠性
- 最后,我們還在交易服務(wù)設(shè)置了定時(shí)任務(wù),定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。
延遲消息
在電商的支付業(yè)務(wù)中,對(duì)于一些庫(kù)存有限的商品,為了更好的用戶體驗(yàn),通常都會(huì)在用戶下單時(shí)立刻扣減商品庫(kù)存。例如電影院購(gòu)票、高鐵購(gòu)票,下單后就會(huì)鎖定座位資源,其他人無(wú)法重復(fù)購(gòu)買。
但是這樣就存在一個(gè)問(wèn)題,假如用戶下單后一直不付款,就會(huì)一直占有庫(kù)存資源,導(dǎo)致其他客戶無(wú)法正常交易,最終導(dǎo)致商戶利益受損!
因此,電商中通常的做法就是:對(duì)于超過(guò)一定時(shí)間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫(kù)存。
例如,訂單支付超時(shí)時(shí)間為30分鐘,則我們應(yīng)該在用戶下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫(kù)存。
但問(wèn)題來(lái)了:如何才能準(zhǔn)確的實(shí)現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?
像這種在一段時(shí)間以后才執(zhí)行的任務(wù),我們稱之為延遲任務(wù),而要實(shí)現(xiàn)延遲任務(wù),最簡(jiǎn)單的方案就是利用MQ的延遲消息了。
在RabbitMQ中實(shí)現(xiàn)延遲消息也有兩種方案:
- 死信交換機(jī)+TTL
- 延遲消息插件
這一章我們就一起研究下這兩種方案的實(shí)現(xiàn)方式,以及優(yōu)缺點(diǎn)。
死信交換機(jī)和延遲消息
死信交換機(jī)
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):
- 消費(fèi)者使用
basic.reject
或basic.nack
聲明消費(fèi)失敗,并且消息的requeue
參數(shù)設(shè)置為false - 消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)
- 要投遞的隊(duì)列消息滿了,無(wú)法投遞
如果一個(gè)隊(duì)列中的消息已經(jīng)成為死信,并且這個(gè)隊(duì)列通過(guò)**dead-letter-exchange**
屬性指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)就稱為死信交換機(jī)(Dead Letter Exchange)。而此時(shí)加入有隊(duì)列與死信交換機(jī)綁定,則最終死信就會(huì)被投遞到這個(gè)隊(duì)列中。
死信交換機(jī)有什么作用呢?
- 收集那些因處理失敗而被拒絕的消息
- 收集那些因隊(duì)列滿了而被拒絕的消息
- 收集因TTL(有效期)到期的消息
延遲消息
前面兩種作用場(chǎng)景可以看做是把死信交換機(jī)當(dāng)做一種消息處理的最終兜底方案,與消費(fèi)者重試時(shí)講的RepublishMessageRecoverer
作用類似。
而最后一種場(chǎng)景,大家設(shè)想一下這樣的場(chǎng)景:
如圖,有一組綁定的交換機(jī)(ttl.fanout
)和隊(duì)列(ttl.queue
)。但是ttl.queue
沒(méi)有消費(fèi)者監(jiān)聽,而是設(shè)定了死信交換機(jī)hmall.direct
,而隊(duì)列direct.queue1
則與死信交換機(jī)綁定,RoutingKey是blue:
假如我們現(xiàn)在發(fā)送一條消息到ttl.fanout
,RoutingKey為blue,并設(shè)置消息的有效期為5000毫秒:
注意:盡管這里的ttl.fanout
不需要RoutingKey,但是當(dāng)消息變?yōu)樗佬挪⑼哆f到死信交換機(jī)時(shí),會(huì)沿用之前的RoutingKey,這樣hmall.direct
才能正確路由消息。
消息肯定會(huì)被投遞到ttl.queue
之后,由于沒(méi)有消費(fèi)者,因此消息無(wú)人消費(fèi)。5秒之后,消息的有效期到期,成為死信:
死信被再次投遞到死信交換機(jī)hmall.direct
,并沿用之前的RoutingKey,也就是blue
:
由于direct.queue1
與hmall.direct
綁定的key是blue,因此最終消息被成功路由到direct.queue1
,如果此時(shí)有消費(fèi)者與direct.queue1
綁定, 也就能成功消費(fèi)消息了。但此時(shí)已經(jīng)是5秒鐘以后了:
也就是說(shuō),publisher發(fā)送了一條消息,但最終consumer在5秒后才收到消息。我們成功實(shí)現(xiàn)了延遲消息。
總結(jié)
注意:
RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后不一定會(huì)被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理。
當(dāng)隊(duì)列中消息堆積很多的時(shí)候,過(guò)期消息可能不會(huì)被按時(shí)處理,因此你設(shè)置的TTL時(shí)間不一定準(zhǔn)確。
:::
DelayExchange插件
基于死信隊(duì)列雖然可以實(shí)現(xiàn)延遲消息,但是太麻煩了。因此RabbitMQ社區(qū)提供了一個(gè)延遲消息插件來(lái)實(shí)現(xiàn)相同的效果。
官方文檔說(shuō)明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog
下載
插件下載地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
安裝
因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。
docker volume inspect mq-plugins
結(jié)果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data
這個(gè)目錄,我們上傳插件到該目錄下。
接下來(lái)執(zhí)行命令,安裝插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
運(yùn)行結(jié)果如下:
聲明延遲交換機(jī)
基于注解方式:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延遲消息:{}", msg); }
基于@Bean
的方式:
package com.itheima.consumer.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class DelayExchangeConfig { @Bean public DirectExchange delayExchange(){ return ExchangeBuilder .directExchange("delay.direct") // 指定交換機(jī)類型和名稱 .delayed() // 設(shè)置delay的屬性為true .durable(true) // 持久化 .build(); } @Bean public Queue delayedQueue(){ return new Queue("delay.queue"); } @Bean public Binding delayQueueBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay"); } }
發(fā)送延遲消息
發(fā)送消息時(shí),必須通過(guò)x-delay屬性設(shè)定延遲時(shí)間:
@Test void testPublisherDelayMessage() { // 1.創(chuàng)建消息 String message = "hello, delayed message"; // 2.發(fā)送消息,利用消息后置處理器添加消息頭 rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 添加延遲消息屬性 message.getMessageProperties().setDelay(5000); return message; } }); }
:::warning
注意:
延遲消息插件內(nèi)部會(huì)維護(hù)一個(gè)本地?cái)?shù)據(jù)庫(kù)表,同時(shí)使用Elang Timers功能實(shí)現(xiàn)計(jì)時(shí)。如果消息的延遲時(shí)間設(shè)置較長(zhǎng),可能會(huì)導(dǎo)致堆積的延遲消息非常多,會(huì)帶來(lái)較大的CPU開銷,同時(shí)延遲消息的時(shí)間會(huì)存在誤差。
因此,不建議設(shè)置延遲時(shí)間過(guò)長(zhǎng)的延遲消息。
到此這篇關(guān)于spring boot Rabbit高級(jí)教程的文章就介紹到這了,更多相關(guān)spring boot Rabbit內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot接收接口入?yún)⒌姆绞叫〗Y(jié)
這篇文章主要給大家介紹了SpringBoot接收接口入?yún)⒌膸追N方式,我們從調(diào)用方的視角去看待這個(gè)問(wèn)題,對(duì)調(diào)用方來(lái)說(shuō),它在調(diào)用接口時(shí)有好幾種傳參方式,下面,將會(huì)依次對(duì)這幾種參數(shù)方式進(jìn)行講解和代碼示例,需要的朋友可以參考下2024-01-01SpringBoot的HTTPS配置實(shí)現(xiàn)
本文主要介紹了SpringBoot的HTTPS配置實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式分析
這篇文章主要介紹了Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式,結(jié)合實(shí)例形式分析了java并發(fā)編程中多線程的相關(guān)原理、實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下2020-02-02bootstrap.yml如何讀取nacos配置中心的配置文件
這篇文章主要介紹了bootstrap.yml讀取nacos配置中心的配置文件問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12JAVA日常開發(fā)中一些常見(jiàn)問(wèn)題歸納講解
這篇文章主要給大家介紹了JAVA日常開發(fā)中一些常見(jiàn)問(wèn)題的相關(guān)資料,包括語(yǔ)法錯(cuò)誤、數(shù)據(jù)類型問(wèn)題、面向?qū)ο缶幊虇?wèn)題、集合類問(wèn)題以及文件操作問(wèn)題,通過(guò)詳細(xì)的分析和示例,幫助程序員提高代碼的健壯性和可維護(hù)性,需要的朋友可以參考下2024-12-12關(guān)于HashMap的put方法執(zhí)行全過(guò)程
這篇文章主要介紹了關(guān)于HashMap的put方法執(zhí)行全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例
這篇文章主要介紹了java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例,需要的朋友可以參考下2014-04-04java調(diào)用webservice的.asmx接口的使用步驟
這篇文章主要介紹了java調(diào)用webservice的.asmx接口的使用步驟,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09