RabbitMQ死信機(jī)制實現(xiàn)延遲隊列的實戰(zhàn)
延遲隊列
延遲隊列存儲的對象肯定是對應(yīng)的延時消息,所謂”延時消息”是指當(dāng)消息被發(fā)送以后,并不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進(jìn)行消費。
應(yīng)用場景
三方支付,掃碼支付調(diào)用上游的掃碼接口,當(dāng)掃碼有效期過后去調(diào)用查詢接口查詢結(jié)果。實現(xiàn)方式:每當(dāng)一筆掃碼支付請求后,立即將此訂單號放入延遲隊列中(RabbitMQ),隊列過期時間為二維碼有效期,此隊列沒有設(shè)置消費者,過了有效期后消息會重新路由到指定的的隊列,有消費者去執(zhí)行訂單查詢。
RabbitMQ本身沒有直接支持延遲隊列功能,但是可以通過以下特性模擬出延遲隊列的功能。 但是我們可以通過RabbitMQ的兩個特性來曲線實現(xiàn)延遲隊列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
Time To Live(TTL)
RabbitMQ可以針對Queue和Message設(shè)置 x-message-tt,來控制消息的生存時間,如果超時,則消息變?yōu)閐ead letter(死信)RabbitMQ針對隊列中的消息過期時間有兩種方法可以設(shè)置。
A: 通過隊列屬性設(shè)置,隊列中所有消息都有相同的過期時間。
<!-- 將消息放入此隊列里,此隊列設(shè)置過期時間,不制造消費者讓其過期,過期后變成死信,消息會放入指定的新隊列里,實現(xiàn)消息的延遲消費 --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">${qrcode.expire.icbc}</value> </entry> <!--消息過期根據(jù)重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
B: 對消息進(jìn)行單獨設(shè)置,每條消息TTL可以不同。
<!-- 將消息放入此隊列里,次隊列設(shè)置過期時間,不制造消費者讓其過期,過期后變成死信,消息會放入指定的新隊列里,實現(xiàn)消息的延遲消費 --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <!--消息過期根據(jù)重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(ttl.toString()); return message; } }
如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn)。消息在隊列的生存時間一旦超過設(shè)置的TTL值,就成為dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了dead letter,則按照這兩個參數(shù)重新路由。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:指定routing-key發(fā)送
隊列出現(xiàn)dead letter的情況有:
消息或者隊列的TTL過期
隊列達(dá)到最大長度
消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false
利用DLX,當(dāng)消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。
注意一:ttl設(shè)置之后,下次修改時間,會報錯,這時候,需要先刪除該隊列,重啟項目。否則會報錯。
注意二:消費者中,拋異常了沒處理,會一直重復(fù)消費
注意三:下面的代碼我模擬了1-10號消息,消息的內(nèi)容里面是1-10。過期的時間是10-1秒。這里要注意,雖然10是第一個發(fā)送,但是它過期的時間最長。
過了10s以后,消費者開始收到數(shù)據(jù),但是它是一次性收到如下結(jié)果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個收到的還是10。10是第一個放進(jìn)隊列,但是它的過期時間最長。所以由此可見,即使一個消息比在同一隊列中的其他消息提前過期,提前過期的也不會優(yōu)先進(jìn)入死信隊列,它們還是按照入庫的順序讓消費者消費。如果第一進(jìn)去的消息過期時間是1小時,那么死信隊列的消費者也許等1小時才能收到第一個消息。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當(dāng)過期的消息到了隊列的頂端(隊首),才會被真正的丟棄或者進(jìn)入死信隊列。
所以在考慮使用RabbitMQ來實現(xiàn)延遲任務(wù)隊列的時候,需要確保業(yè)務(wù)上每個任務(wù)的延遲時間是一致的。如果遇到不同的任務(wù)類型需要不同的延時的話,需要為每一種不同延遲時間的消息建立單獨的消息隊列。(也可以考慮緩存隊列,DelayQueue實現(xiàn)定時延遲執(zhí)行任務(wù),但是也有缺點:就是項目重啟緩存里的數(shù)據(jù)就會丟失,DelayQueue的使用詳見其他博文)
for(int i = 10; i>0; i-- ){ amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime)); }
到此這篇關(guān)于RabbitMQ死信機(jī)制實現(xiàn)延遲隊列的實戰(zhàn)的文章就介紹到這了,更多相關(guān)RabbitMQ 延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jenkins Pipeline 部署 SpringBoot 應(yīng)用的教程詳解
這篇文章主要介紹了Jenkins Pipeline 部署 SpringBoot 應(yīng)用的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07Java基礎(chǔ)教程之final關(guān)鍵字淺析
這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)教程之final關(guān)鍵字的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06Jpa 實現(xiàn)自動更新表中的創(chuàng)建日期和修改時間
這篇文章主要介紹了Jpa 實現(xiàn)自動更新表中的創(chuàng)建日期和修改時間,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01Java的MyBatis框架中關(guān)鍵的XML字段映射的配置參數(shù)詳解
將XML文件的schema字段映射到數(shù)據(jù)庫的schema是我們操作數(shù)據(jù)庫的常用手段,這里我們就來整理一些Java的MyBatis框架中關(guān)鍵的XML字段映射的配置參數(shù)詳解,需要的朋友可以參考下2016-06-06