SpringBoot+RabbitMQ實現(xiàn)消息可靠傳輸詳解
環(huán)境配置
SpringBoot
整合 RabbitMQ
實現(xiàn)消息的發(fā)送。
1.添加 maven
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.添加 application.yml 配置文件
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3.配置交換機(jī)、隊列以及綁定
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
4.生產(chǎn)發(fā)送消息
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【發(fā)送消息】" + message) return "【send message】" + message; }
5.消費(fèi)者接收消息
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 當(dāng)前時間" + time);
6.調(diào)用生產(chǎn)端發(fā)送消息 hello
,控制臺輸出:
【發(fā)送消息】hello
【接收信息】hello 當(dāng)前時間2022-05-12 10:21:14
說明消息已經(jīng)被成功接收。
消息丟失分析
一條消息的從生產(chǎn)到消費(fèi),消息丟失可能發(fā)生在以下幾個階段:
- 生產(chǎn)端丟失: 生產(chǎn)者無法傳輸?shù)?nbsp;
RabbitMQ
- 存儲端丟失:
RabbitMQ
存儲自身掛了 - 消費(fèi)端丟失:存儲由于網(wǎng)絡(luò)問題,無法發(fā)送到消費(fèi)端,或者消費(fèi)掛了,無法發(fā)送正常消費(fèi)
RabbitMQ
從生產(chǎn)端、儲存端、消費(fèi)端都對可靠性傳輸做很好的支持。
生產(chǎn)階段
生產(chǎn)階段通過請求確認(rèn)機(jī)制,來確保消息的可靠傳輸。當(dāng)發(fā)送消息到 RabbitMQ 服務(wù)器 之后,RabbitMQ 收到消息之后,給發(fā)送返回一個請求確認(rèn),表示RabbitMQ 服務(wù)器已成功的接收到了消息。
配置application.yml
spring: rabbitmq: # 消息確認(rèn)機(jī)制 生產(chǎn)者 -> 交換機(jī) publisher-confirms: true # 消息返回機(jī)制 交換機(jī) -> 隊列 publisher-returns: true
配置
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【發(fā)送成功】"); } else { log.info("【發(fā)送失敗】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息發(fā)送失敗】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
消息從 生產(chǎn)者 到 交換機(jī), 有confirmCallback
確認(rèn)模式。發(fā)送消息成功后消息會調(diào)用方法confirm(CorrelationData correlationData, boolean ack, String cause)
,根據(jù) ack
判斷消息是否發(fā)送成功。
消息從 交換機(jī) 到 隊列,有returnCallback
退回模式。
發(fā)送消息 product message
控制臺輸出如下:
【發(fā)送消息】product message
【接收信息】product message 當(dāng)前時間2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【發(fā)送成功】
生產(chǎn)端模擬消息丟失
這里有兩個方案:
- 發(fā)送消息后立馬關(guān)閉 broke,后者把網(wǎng)絡(luò)關(guān)閉,但是broker關(guān)閉之后控制臺一直就會報錯,發(fā)送消息也報500錯誤。
- 發(fā)送不存在的交換機(jī):
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
結(jié)果:
【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【發(fā)送失敗】
當(dāng)發(fā)送失敗可以對消息進(jìn)行重試
交換機(jī)正確,發(fā)送不存在的隊列:
交換機(jī)接收到消息,返回成功通知,控制臺輸出:
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【發(fā)送成功】
交換機(jī)沒有找到隊列,返回失敗信息:
【消息發(fā)送失敗】
【message】product message
【replyCode】312
RabbitMQ
開啟隊列持久化,創(chuàng)建的隊列和交換機(jī)默認(rèn)配置是持久化的。首先把隊列和交換機(jī)設(shè)置正確,修改消費(fèi)監(jiān)聽的隊列,使得消息存放在隊列里。
修改隊列的持久化,修改成非持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
發(fā)送消息之后,消息存放在隊列中,然后重啟 RabbitMQ
,消息不存在了。
設(shè)置隊列持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
重啟之后,隊列的消息還存在。
消費(fèi)端
消費(fèi)端默認(rèn)開始 ack
自動確認(rèn)模式,當(dāng)隊列消息被消費(fèi)者接收,不管有沒有被消費(fèi)端消息,都自動刪除隊列中的消息。所以為了確保消費(fèi)端能成功消費(fèi)消息,將自動模式改成手動確認(rèn)模式:
修改application.yml 文件
spring: rabbitmq: # 手動消息確認(rèn) listener: simple: acknowledge-mode: manual
消費(fèi)接收消息之后需要手動確認(rèn):
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 當(dāng)前時間" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }
如果不添加:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
發(fā)送兩條消息
消息被接收后,沒有確認(rèn),重新放到隊列中:
重啟項目,之后,隊列的消息會發(fā)送到消費(fèi)者,但是沒有 ack 確認(rèn),還是繼續(xù)會放回隊列中。
加上 channel.basicAck
之后,再重啟項目
隊列消息就被刪除了
basicAck
方法最后一個參數(shù) multiple
表示是刪除之前的隊列。
multiple
設(shè)置成 true
,把后面的隊列都清理掉了
到此這篇關(guān)于SpringBoot+RabbitMQ實現(xiàn)消息可靠傳輸詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ消息可靠傳輸內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java.io.File的renameTo方法移動文件失敗的解決方案
這篇文章主要介紹了java.io.File的renameTo方法移動文件失敗的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07用html css javascript打造自己的RIA圖文教程
用html&css&javascript打造自己的RIA之一,包括了配置等2009-07-07關(guān)于Java中如何實現(xiàn)文件的讀寫操作
在Java中,可以使用File和FileInputStream、FileOutputStream、BufferedReader、PrintWriter等類來進(jìn)行文件讀寫操作,需要的朋友可以參考下2023-05-05Java基于裝飾者模式實現(xiàn)的圖片工具類實例【附demo源碼下載】
這篇文章主要介紹了Java基于裝飾者模式實現(xiàn)的圖片工具類,結(jié)合完整實例形式分析了裝飾者模式實現(xiàn)圖片的判斷、水印、縮放、復(fù)制等功能,并附帶demo源碼供讀者下載參考,需要的朋友可以參考下2017-09-09MyBatis動態(tài)SQL如何實現(xiàn)前端指定返回字段
這篇文章主要介紹了MyBatis動態(tài)SQL如何實現(xiàn)前端指定返回字段,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01