SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
介紹
死信隊(duì)列:沒有被及時(shí)消費(fèi)的消息存放的隊(duì)列,消息沒有被及時(shí)消費(fèi)有以下幾點(diǎn)原因:
1.有消息被拒絕(basic.reject/ basic.nack)并且requeue=false
2.隊(duì)列達(dá)到最大長度
3.消息TTL過期
場景
1.小時(shí)進(jìn)入初始隊(duì)列,等待30分鐘后進(jìn)入5分鐘隊(duì)列
2.消息等待5分鐘后進(jìn)入執(zhí)行隊(duì)列
3.執(zhí)行失敗后重新回到5分鐘隊(duì)列
4.失敗5次后,消息進(jìn)入2小時(shí)隊(duì)列
5.消息等待2小時(shí)進(jìn)入執(zhí)行隊(duì)列
6.失敗5次后,將消息丟棄或做其他處理
使用
安裝MQ
使用docker方式安裝,選擇帶mangement的版本
docker pull rabbitmq:management docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
訪問 localhost: 15672,默認(rèn)賬號密碼guest/guest
項(xiàng)目配置
(1)創(chuàng)建springboot項(xiàng)目
(2)在application.properties配置文件中配置mq連接信息
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
(3)隊(duì)列配置
package com.df.ps.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MqConfig { //time @Value("${spring.df.buffered.min:120}") private int springdfBufferedTime; @Value("${spring.df.high-buffered.min:5}") private int springdfHighBufferedTime; @Value("${spring.df.low-buffered.min:120}") private int springdfLowBufferedTime; // 30min Buffered Queue @Value("${spring.df.queue:spring-df-buffered-queue}") private String springdfBufferedQueue; @Value("${spring.df.topic:spring-df-buffered-topic}") private String springdfBufferedTopic; @Value("${spring.df.route:spring-df-buffered-route}") private String springdfBufferedRouteKey; // 5M Buffered Queue @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}") private String springdfHighBufferedQueue; @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private String springdfHighBufferedTopic; @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private String springdfHighBufferedRouteKey; // High Queue @Value("${spring.df.high.queue:spring-df-high-queue}") private String springdfHighQueue; @Value("${spring.df.high.topic:spring-df-high-topic}") private String springdfHighTopic; @Value("${spring.df.high.route:spring-df-high-route}") private String springdfHighRouteKey; // 2H Low Buffered Queue @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}") private String springdfLowBufferedQueue; @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private String springdfLowBufferedTopic; @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private String springdfLowBufferedRouteKey; // Low Queue @Value("${spring.df.low.queue:spring-df-low-queue}") private String springdfLowQueue; @Value("${spring.df.low.topic:spring-df-low-topic}") private String springdfLowTopic; @Value("${spring.df.low.route:spring-df-low-route}") private String springdfLowRouteKey; @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue") Queue springdfBufferedQueue() { int bufferedTime = 1000 * 60 * springdfBufferedTime; return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue") Queue springdfHighBufferedQueue() { int highBufferedTime = 1000 * 60 * springdfHighBufferedTime; return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue") Queue springdfHighQueue() { return new Queue(springdfHighQueue, true); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue") Queue springdfLowBufferedQueue() { int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime; return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue") Queue springdfLowQueue() { return new Queue(springdfLowQueue, true); } @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic") TopicExchange springdfBufferedTopic() { return new TopicExchange(springdfBufferedTopic); } @Bean Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) { return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic") TopicExchange springdfHighBufferedTopic() { return new TopicExchange(springdfHighBufferedTopic); } @Bean Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) { return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic") TopicExchange springdfHighTopic() { return new TopicExchange(springdfHighTopic); } @Bean Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) { return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic") TopicExchange springdfLowBufferedTopic() { return new TopicExchange(springdfLowBufferedTopic); } @Bean Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) { return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey); } @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic") TopicExchange springdfLowTopic() { return new TopicExchange(springdfLowTopic); } @Bean Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) { return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(springdfHighQueue, springdfLowQueue); container.setMessageListener(listenerAdapter); return container; } @Bean MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) { MessageListenerAdapter adapter = new MessageListenerAdapter(receiver); adapter.setDefaultListenerMethod("receive"); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive"); queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); return adapter; } private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", topic); args.put("x-dead-letter-routing-key", routeKey); args.put("x-message-ttl", bufferedTime); // 是否持久化 boolean durable = true; // 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除 boolean exclusive = false; // 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列 boolean autoDelete = false; return new Queue(queueName, durable, exclusive, autoDelete, args); } }
消費(fèi)者配置
package com.df.ps.mq; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import java.util.Map; public class MqReceiver { private static Logger logger = LoggerFactory.getLogger(MqReceiver.class); @Value("${high-retry:5}") private int highRetry; @Value("${low-retry:5}") private int lowRetry; @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}") private String springdfHighBufferedTopic; @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}") private String springdfHighBufferedRouteKey; @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}") private String springdfLowBufferedTopic; @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}") private String springdfLowBufferedRouteKey; private final RabbitTemplate rabbitTemplate; @Autowired public MqReceiver(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void receive(Object message) { if (logger.isInfoEnabled()) { logger.info("default receiver: " + message); } } /** * 消息從初始隊(duì)列進(jìn)入5分鐘的高速緩沖隊(duì)列 * @param message */ public void highReceiver(Object message){ ObjectMapper mapper = new ObjectMapper(); Map msg = mapper.convertValue(message, Map.class); try{ logger.info("這里做消息處理..."); }catch (Exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < highRetry) { msg.put("times", times + 1); rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message); } else { msg.put("times", 0); rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message); } } } /** * 消息從5分鐘緩沖隊(duì)列進(jìn)入2小時(shí)緩沖隊(duì)列 * @param message */ public void lowReceiver(Object message){ ObjectMapper mapper = new ObjectMapper(); Map msg = mapper.convertValue(message, Map.class); try { logger.info("這里做消息處理..."); }catch (Exception e){ int times = msg.get("times") == null ? 0 : (int) msg.get("times"); if (times < lowRetry) { rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message); }else{ logger.info("消息無法被消費(fèi)..."); } } } }
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- springboot整合rabbitmq的示例代碼
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- Springboot 配置RabbitMQ文檔的方法步驟
- SpringBoot+RabbitMq具體使用的幾種姿勢
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot中連接多個(gè)RabbitMQ的方法詳解
- 一文掌握Springboot集成RabbitMQ的方法
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)通配符模式
相關(guān)文章
學(xué)習(xí)Java之Java中的異常處理機(jī)制詳解
在本文中,小編將帶領(lǐng)大家來學(xué)習(xí)Java的異常處理機(jī)制,包括異常機(jī)制、異常類型、如何捕獲異常、如何拋出異常以及如何創(chuàng)建自定義異常等核心內(nèi)容,感興趣的同學(xué)跟著小編一起來看看吧2023-08-08Java服務(wù)中的大文件上傳和下載優(yōu)化技巧分享
在Java服務(wù)中處理大文件的上傳和下載是一項(xiàng)常見但復(fù)雜的任務(wù),為了提供優(yōu)秀的用戶體驗(yàn)和高效的系統(tǒng)性能,我們將探索多種策略和技術(shù),并在每一點(diǎn)上都提供代碼示例以便實(shí)戰(zhàn)應(yīng)用,需要的朋友可以參考下2023-10-10Java中用于SMB/CIFS網(wǎng)絡(luò)的JCIFS庫的用法詳解
JCIFS是一個(gè)強(qiáng)大的庫,允許Java應(yīng)用程序無縫地與SMB/CIFS資源進(jìn)行交互,本文將探討JCIFS的概念和工作原理以及如何在?Java?應(yīng)用程序中有效使用它,希望對大家有所幫助2024-12-12