RabbitMQ消息隊(duì)列實(shí)現(xiàn)延遲任務(wù)示例
一、序言
延遲任務(wù)應(yīng)用廣泛,延遲任務(wù)典型應(yīng)用場(chǎng)景有訂單超時(shí)自動(dòng)取消;支付回調(diào)重試。其中訂單超時(shí)取消具有冪等性屬性,無(wú)需考慮重復(fù)消費(fèi)問(wèn)題;支付回調(diào)重試需要考慮重復(fù)消費(fèi)問(wèn)題。
延遲任務(wù)具有如下特點(diǎn):在未來(lái)的某個(gè)時(shí)間點(diǎn)執(zhí)行;一般僅執(zhí)行一次。
1、實(shí)現(xiàn)原理
生產(chǎn)者將帶有延遲信息的消息發(fā)送到RabbitMQ交換機(jī)中,等待延遲時(shí)間結(jié)束方將消息轉(zhuǎn)發(fā)到綁定的隊(duì)列中,消費(fèi)者通過(guò)監(jiān)聽(tīng)隊(duì)列消費(fèi)消息。延遲任務(wù)的關(guān)鍵在消息在交換機(jī)中停留。
顯而易見(jiàn),基于RabbitMQ實(shí)現(xiàn)延遲任務(wù)對(duì)服務(wù)器的可靠性要求極高,交換機(jī)內(nèi)部消息無(wú)持久化機(jī)制,比如單機(jī)模式服務(wù)重啟,未開(kāi)始的延遲任務(wù)均丟失。
2、組件選型
二、方案設(shè)計(jì)
(一)服務(wù)器
RabbitMQ服務(wù)需要安裝x-delayed-message
插件以處理延遲消息。
(二)生產(chǎn)者
延遲任務(wù)的實(shí)現(xiàn)對(duì)生產(chǎn)者的要求是將消息可靠的投遞到交換機(jī),因此使用confirm確認(rèn)機(jī)制即可。
訂單生成之后,先入庫(kù),然后以訂單ID為key將訂單詳情存入Redis中(持久化),向RabbitMQ發(fā)送異步confirm確定請(qǐng)求。如果收到正常投遞返回,則刪除Redis中訂單ID為key的數(shù)據(jù),回收內(nèi)存,否則以訂單ID為key,從Redis中查詢(xún)出訂單數(shù)據(jù),重新發(fā)送。
(三)消費(fèi)者
延遲任務(wù)的實(shí)現(xiàn)對(duì)消費(fèi)者的要求是以信息不丟失的方式消費(fèi)消息,具體表現(xiàn)在:手動(dòng)確認(rèn)消息的消費(fèi),防止消息丟失;消費(fèi)端持續(xù)穩(wěn)定,防止消息堆積;消息消費(fèi)失敗有重試機(jī)制。
考慮到訂單延遲取消屬于冪等性操作,因此無(wú)需考慮消息的重復(fù)消費(fèi)問(wèn)題。
三、SpringBoot實(shí)現(xiàn)
實(shí)現(xiàn)部分僅貼一部分核心源碼,完整項(xiàng)目請(qǐng)?jiān)L問(wèn)GitHub。
(一)生產(chǎn)者
考慮到下單是極為重要的操作,因此首先將訂單落庫(kù)、存盤(pán),然后進(jìn)行后續(xù)操作。
for (long i = 1; i <= 10; i++) { /* 1.模擬生成訂單 */ BuOrder order = createOrder(i); /* 2.訂單入庫(kù) */ orderService.removeById(order); orderService.saveOrUpdate(order); /* 3.將訂單存入信息Redis */ RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order); /* 4.向RabbitMQ異步投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); }
生產(chǎn)者可靠投遞消息
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String key = ORDER_PREFIX + correlationData.getId(); if (ack) { /* 如果消息投遞成功,則刪除Redis中訂單數(shù)據(jù),回收內(nèi)存 */ RedisUtils.deleteObject(key); } else { /* 從Redis中讀取訂單數(shù)據(jù),重新投遞 */ BuOrder order = RedisUtils.getObject(key, BuOrder.class); /* 重新投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); } }
(二)消費(fèi)者
消費(fèi)者端手動(dòng)確認(rèn),避免消息丟失;失敗自動(dòng)重試。
@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME) public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException { if (Objects.equals(0, order.getOrderStatus())) { /* 修改訂單狀態(tài),設(shè)置為關(guān)閉狀態(tài) */ orderService.updateById(new BuOrder(order.getOrderId(), -1)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info(String.format("消費(fèi)者節(jié)點(diǎn)01消費(fèi)編號(hào)為【%s】的消息", order.getOrderId())); } }
消費(fèi)者可靠消費(fèi)應(yīng)至少開(kāi)啟兩個(gè)及以上應(yīng)用,確保消息隊(duì)列中不積壓消息。
(三)通用工具包
上述代碼涉及一個(gè)工具類(lèi)RabbitUtils
,存在于如下依賴(lài)中,主要封裝RabbitMQ極常用的工具方法。
<dependency> <groupId>xin.altitude.cms</groupId> <artifactId>ucode-cms-common</artifactId> <version>1.4.3.1</version> </dependency>
以上就是RabbitMQ消息隊(duì)列實(shí)現(xiàn)延遲任務(wù)示例的詳細(xì)內(nèi)容,更多關(guān)于RabbitMQ消息隊(duì)列延遲任務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring高級(jí)之注解@PropertySource的原理
這篇文章主要介紹了Spring高級(jí)之注解@PropertySource的原理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03List集合多線(xiàn)程并發(fā)條件下不安全如何解決
List是我們常用的集合,但是在多線(xiàn)程并發(fā)的條件下,會(huì)出現(xiàn)安全問(wèn)題嗎?下面我們就來(lái)測(cè)試一下,如果出現(xiàn)安全問(wèn)題,該如何解決,感興趣的可以了解一下2021-12-12Netty分布式FastThreadLocal的set方法實(shí)現(xiàn)邏輯剖析
這篇文章主要為大家介紹了Netty分布式FastThreadLocal的set方法實(shí)現(xiàn)邏輯剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03java 取交集方法retainAll的實(shí)現(xiàn)
這篇文章主要介紹了java 取交集方法retainAll的實(shí)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06使用Java如何對(duì)復(fù)雜的數(shù)據(jù)類(lèi)型排序和比大小
我相信大家在第一次接觸算法的時(shí)候,最先接觸的肯定也是從排序算法開(kāi)始的,下面這篇文章主要給大家介紹了關(guān)于使用Java如何對(duì)復(fù)雜的數(shù)據(jù)類(lèi)型排序和比大小的相關(guān)資料,需要的朋友可以參考下2023-12-12RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解
這篇文章主要介紹了RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解,簡(jiǎn)單來(lái)說(shuō),就是你必須關(guān)閉 RabbitMQ 的自動(dòng)ack ,可以通過(guò)一個(gè) api 來(lái)調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把,需要的朋友可以參考下2023-12-12Mybatis實(shí)現(xiàn)動(dòng)態(tài)增刪改查功能的示例代碼
這篇文章主要介紹了Mybatis實(shí)現(xiàn)動(dòng)態(tài)增刪改查功能的示例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04解析Java中所有錯(cuò)誤和異常的父類(lèi)java.lang.Throwable
這篇文章主要介紹了Java中所有錯(cuò)誤和異常的父類(lèi)java.lang.Throwable,文章中簡(jiǎn)單地分析了其源碼,說(shuō)明在代碼注釋中,需要的朋友可以參考下2016-03-03