RabbitMQ實(shí)現(xiàn)延時(shí)消息的兩種方法實(shí)戰(zhàn)教程
RabbitMQ實(shí)現(xiàn)延時(shí)消息的兩種方法
1、死信隊(duì)列
1.1消息什么時(shí)候變?yōu)樗佬?dead-letter)
- 消息被否定接收,消費(fèi)者使用basic.reject 或者 basic.nack并且requeue 重回隊(duì)列屬性設(shè)為false。
- 消息在隊(duì)列里得時(shí)間超過了該消息設(shè)置的過期時(shí)間(TTL)。
- 消息隊(duì)列到達(dá)了它的最大長(zhǎng)度,之后再收到的消息。
1.2死信隊(duì)列的原理
當(dāng)一個(gè)消息再隊(duì)列里變?yōu)樗佬艜r(shí),它會(huì)被重新publish到另一個(gè)exchange交換機(jī)上,這個(gè)exchange就為DLX。因此我們只需要在聲明正常的業(yè)務(wù)隊(duì)列時(shí)添加一個(gè)可選的"x-dead-letter-exchange"參數(shù),值為死信交換機(jī),死信就會(huì)被rabbitmq重新publish到配置的這個(gè)交換機(jī)上,我們接著監(jiān)聽這個(gè)交換機(jī)就可以了。
1.3 代碼實(shí)現(xiàn)
引入amqp依賴
聲明交換機(jī),隊(duì)列
package com.lank.demo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitmqConfig { //死信交換機(jī),隊(duì)列,路由相關(guān)配置 public static final String DLK_EXCHANGE = "dlk.exchange"; public static final String DLK_ROUTEKEY = "dlk.routeKey"; public static final String DLK_QUEUE = "dlk.queue"; //業(yè)務(wù)交換機(jī),隊(duì)列,路由相關(guān)配置 public static final String DEMO_EXCHANGE = "demo.exchange"; public static final String DEMO_QUEUE = "demo.queue"; public static final String DEMO_ROUTEKEY = "demo.routeKey"; //延時(shí)插件DelayedMessagePlugin的交換機(jī),隊(duì)列,路由相關(guān)配置 public static final String DMP_EXCHANGE = "dmp.exchange"; public static final String DMP_ROUTEKEY = "dmp.routeKey"; public static final String DMP_QUEUE = "dmp.queue"; @Bean public DirectExchange demoExchange(){ return new DirectExchange(DEMO_EXCHANGE,true,false); } @Bean public Queue demoQueue(){ //只需要在聲明業(yè)務(wù)隊(duì)列時(shí)添加x-dead-letter-exchange,值為死信交換機(jī) Map<String,Object> map = new HashMap<>(1); map.put("x-dead-letter-exchange",DLK_EXCHANGE); //該參數(shù)x-dead-letter-routing-key可以修改該死信的路由key,不設(shè)置則使用原消息的路由key map.put("x-dead-letter-routing-key",DLK_ROUTEKEY); return new Queue(DEMO_QUEUE,true,false,false,map); } @Bean public Binding demoBind(){ return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY); } @Bean public DirectExchange dlkExchange(){ return new DirectExchange(DLK_EXCHANGE,true,false); } @Bean public Queue dlkQueue(){ return new Queue(DLK_QUEUE,true,false,false); } @Bean public Binding dlkBind(){ return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY); } //延遲插件使用 //1、聲明一個(gè)類型為x-delayed-message的交換機(jī) //2、參數(shù)添加一個(gè)x-delayed-type值為交換機(jī)的類型用于路由key的映射 @Bean public CustomExchange dmpExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue dmpQueue(){ return new Queue(DMP_QUEUE,true,false,false); } @Bean public Binding dmpBind(){ return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs(); } }
聲明一個(gè)類用于發(fā)送帶過期時(shí)間的消息
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** 1. @author lank 2. @since 2020/12/14 10:33 */ @Component @Slf4j public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; //使用死信隊(duì)列發(fā)送消息方法封裝 public void send(String message,Integer time){ String ttl = String.valueOf(time*1000); //exchange和routingKey都為業(yè)務(wù)的就可以,只需要設(shè)置消息的過期時(shí)間 rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //設(shè)置消息的過期時(shí)間,是以毫秒為單位的 message.getMessageProperties().setExpiration(ttl); return message; } }); log.info("使用死信隊(duì)列消息:{}發(fā)送成功,過期時(shí)間:{}秒。",message,time); } //使用延遲插件發(fā)送消息方法封裝 public void send2(String message,Integer time){ rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //使用延遲插件只需要在消息的header中添加x-delay屬性,值為過期時(shí)間,單位毫秒 message.getMessageProperties().setHeader("x-delay",time*1000); return message; } }); log.info("使用延遲插件發(fā)送消息:{}發(fā)送成功,過期時(shí)間:{}秒。",message,time); } }
編寫一個(gè)類用于消費(fèi)消息
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageReceiver { @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE) public void onMessage(Message message){ log.info("使用死信隊(duì)列,收到消息:{}",new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE) public void onMessage2(Message message){ log.info("使用延遲插件,收到消息:{}",new String(message.getBody())); } }
編寫Controller調(diào)用發(fā)送消息方法測(cè)試結(jié)果
package com.lank.demo.controller; import com.lank.demo.rabbitmq.MessageSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired public MessageSender messageSender; //死信隊(duì)列controller @GetMapping("/send") public String send(@RequestParam String msg,Integer time){ messageSender.send(msg,time); return "ok"; } //延遲插件controller @GetMapping("/send2") public String sendByPlugin(@RequestParam String msg,Integer time){ messageSender.send2(msg,time); return "ok"; } }
配置文件application.properties
server.port=4399 #virtual-host使用默認(rèn)的/就好,如果需要/demo需自己在控制臺(tái)添加 spring.rabbitmq.virtual-host=/demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
啟動(dòng)項(xiàng)目,打開rabbitmq控制臺(tái),可以看到交換機(jī)和隊(duì)列已經(jīng)創(chuàng)建好。
在瀏覽器中請(qǐng)求http://localhost:4399/send?msg=hello&time=5,從控制臺(tái)的輸出來看,剛好5s后接收到消息。
2020-12-16 22:47:28.071 INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信隊(duì)列消息:hello發(fā)送成功,過期時(shí)間:5秒。 2020-12-16 22:47:33.145 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信隊(duì)列,收到消息:hello
1.4死信隊(duì)列的一個(gè)小注意點(diǎn)
當(dāng)我往死信隊(duì)列中發(fā)送兩條不同過期時(shí)間的消息時(shí),如果先發(fā)送的消息A的過期時(shí)間大于后發(fā)送的消息B的過期時(shí)間時(shí),由于消息的順序消費(fèi),消息B過期后并不會(huì)立即重新publish到死信交換機(jī),而是會(huì)等到消息A過期后一起被消費(fèi)。
依次發(fā)送兩個(gè)請(qǐng)求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先發(fā)送,過期時(shí)間30S,消息B后發(fā)送,過期時(shí)間10S,我們想要的結(jié)果應(yīng)該是10S收到消息B,30S后收到消息A,但結(jié)果并不是,控制臺(tái)輸出如下:
2020-12-16 22:54:47.339 INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信隊(duì)列消息:消息A發(fā)送成功,過期時(shí)間:30秒。
2020-12-16 22:54:54.278 INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信隊(duì)列消息:消息B發(fā)送成功,過期時(shí)間:10秒。
2020-12-16 22:55:17.356 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信隊(duì)列,收到消息:消息A
2020-12-16 22:55:17.357 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信隊(duì)列,收到消息:消息B
消息A30S后被成功消費(fèi),緊接著消息B被消費(fèi)。因此當(dāng)我們使用死信隊(duì)列時(shí)應(yīng)該注意是否消息的過期時(shí)間都是一樣的,比如訂單超過10分鐘未支付修改其狀態(tài)。如果當(dāng)一個(gè)隊(duì)列各個(gè)消息的過期時(shí)間不一致時(shí),使用死信隊(duì)列就可能達(dá)不到延時(shí)的作用。這時(shí)候我們可以使用延時(shí)插件來實(shí)現(xiàn)這需求。
2 、延時(shí)插件
RabbitMQ Delayed Message Plugin是一個(gè)rabbitmq的插件,所以使用前需要安裝它,可以參考的GitHub地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
2.1如何實(shí)現(xiàn)
- 安裝好插件后只需要聲明一個(gè)類型type為"x-delayed-message"的exchange,并且在其可選參數(shù)下配置一個(gè)key為"x-delayed-typ",值為交換機(jī)類型(topic/direct/fanout)的屬性。
- 聲明一個(gè)隊(duì)列綁定到該交換機(jī)
- 在發(fā)送消息的時(shí)候消息的header里添加一個(gè)key為"x-delay",值為過期時(shí)間的屬性,單位毫秒。
- 代碼就在上面,配置類為DMP開頭的,發(fā)送消息的方法為send2()。
- 啟動(dòng)后在rabbitmq控制臺(tái)可以看到一個(gè)類型為x-delayed-message的交換機(jī)。
繼續(xù)在瀏覽器中發(fā)送兩個(gè)請(qǐng)求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,控制臺(tái)輸出如下,不會(huì)出現(xiàn)死信隊(duì)列出現(xiàn)的問題:
2020-12-16 23:31:19.819 INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用延遲插件發(fā)送消息:消息A發(fā)送成功,過期時(shí)間:30秒。
2020-12-16 23:31:27.673 INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用延遲插件發(fā)送消息:消息B發(fā)送成功,過期時(shí)間:10秒。
2020-12-16 23:31:37.833 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver : 使用延遲插件,收到消息:消息B
2020-12-16 23:31:49.917 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver : 使用延遲插件,收到消息:消息A
到此這篇關(guān)于RabbitMQ實(shí)現(xiàn)延時(shí)消息的兩種方法的文章就介紹到這了,更多相關(guān)RabbitMQ延時(shí)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java結(jié)構(gòu)型設(shè)計(jì)模式之享元模式示例詳解
享元模式(FlyWeight?Pattern),也叫蠅量模式,運(yùn)用共享技術(shù),有效的支持大量細(xì)粒度的對(duì)象,享元模式就是池技術(shù)的重要實(shí)現(xiàn)方式。本文將通過示例詳細(xì)講解享元模式,感興趣的可以了解一下2022-09-09IDEA如何修改maven的JVM啟動(dòng)內(nèi)存參數(shù)
這篇文章主要介紹了IDEA如何修改maven的JVM啟動(dòng)內(nèi)存參數(shù)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-09-09Java實(shí)現(xiàn)超市會(huì)員管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)超市會(huì)員管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03idea創(chuàng)建javaweb原生項(xiàng)目的實(shí)現(xiàn)示例
這篇文章主要介紹了idea創(chuàng)建javaweb原生項(xiàng)目的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的
今天小編就為大家分享一篇關(guān)于springmvc中RequestMappingHandlerAdapter與HttpMessageConverter的裝配講解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-01-01SpringBoot中數(shù)據(jù)傳輸對(duì)象(DTO)的實(shí)現(xiàn)
本文主要介紹了SpringBoot中數(shù)據(jù)傳輸對(duì)象(DTO)的實(shí)現(xiàn),包括了手動(dòng)創(chuàng)建DTO、使用ModelMapper和Lombok創(chuàng)建DTO的示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07Spring運(yùn)行環(huán)境Environment的解析
本文主要介紹了Spring運(yùn)行環(huán)境Environment的解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08關(guān)于Mybatis與JPA的優(yōu)缺點(diǎn)說明
這篇文章主要介紹了關(guān)于Mybatis與JPA的優(yōu)缺點(diǎn)說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06