RabbitMq消息防丟失功能實現(xiàn)方式講解
1.概述
1.1.數(shù)據(jù)丟失的原因
在消息中有三種可能性造成數(shù)據(jù)丟失:
- 消費者消費消息失敗
- 生產(chǎn)者生產(chǎn)消息失敗
- MQ數(shù)據(jù)丟失
消費者消費消息失?。?/p>
RabbitMq存在應(yīng)答機制,默認為自動應(yīng)答,MQ向消費者推送一條消息,消費者收到這條消息后會返回一個ack(應(yīng)答)給MQ,MQ收到應(yīng)答后會刪除這條消息。
自動應(yīng)答存在一個問題,就是消費者收到消息后立馬就會給MQ返回ack,如果消費者返回完ack但還沒來的及真正處理這條消息時,消費者斷電宕機了,那么這條消息就丟失了。
這就是由于消費者消費消息失敗造成的數(shù)據(jù)丟失。
生產(chǎn)者生產(chǎn)數(shù)據(jù)失?。?/p>
生產(chǎn)者向MQ推送了一條消息,但是由于由于諸如網(wǎng)絡(luò)故障等原因mq并沒有收到該條消息,這樣就造成了這條消息的丟失。
MQ數(shù)據(jù)丟失:
MQ的數(shù)據(jù)是存在內(nèi)存中的,諸如斷電等原因可能會造成數(shù)據(jù)的丟失。
1.2.如何防止數(shù)據(jù)丟失
解決以上列舉的數(shù)據(jù)丟失問題的辦法有三種:
- 手動應(yīng)答
- 消息確認機制
- 持久化
手動應(yīng)答:
RabbitMQ默認是自動應(yīng)答,消費者收到消息后就會自動返回ack給MQ,可以將應(yīng)答模式改為手動應(yīng)答,在消費者一側(cè)消息的消費動作完成后手動來返回ack給MQ,用來解決“消費者消費消息失敗”問題。
消息確認機制:
當消息隊列收到消息后,告知生產(chǎn)者,讓生產(chǎn)者感知到自己生產(chǎn)的消息,消息隊列已經(jīng)接收到,用來解決“生產(chǎn)者生產(chǎn)消息失敗”問題。消息確認機制有兩種實現(xiàn)方式:
- AMQP事務(wù)
- confirm
持久化:
消息隊列的消息持久化到磁盤上,用來解決“MQ數(shù)據(jù)丟失”問題。
2.手動應(yīng)答
手動應(yīng)答是通過設(shè)置channel來實現(xiàn)的,以下為一個完整代碼示例。
配置類:
@Configuration public class config { @Bean public Queue queue(){ return new Queue("queue_01",false); } }
生產(chǎn)者:
@SpringBootTest(classes = Main.class) public class Producer { @Autowired RabbitTemplate rabbitTemplate; @Test public void producerMsg(){ rabbitTemplate.convertAndSend("queue_01","hello_world"); } }
消費者:
@Component @Slf4j public class Consumer { @RabbitListener(queues = {"queue_01"}) public void consumerMsg(String msg, Message message,Channel channel){ try { log.info("消費者消費消息: "+msg); /** * 沒有異常就確認消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:當前消息在隊列中的的索引; * multiple:為true的話就是批量確認 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { /** * 有異常就拒收消息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true為將消息重返當前消息隊列,重新發(fā)送給消費者; * false將消息丟棄 */ try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception ex) { log.error(ex.getMessage()); } } } }
3.消息確認機制
AQMP事務(wù)、confirm其實都是基于channel的。
3.1.AMQP事務(wù)
AMQP事務(wù)和數(shù)據(jù)庫事務(wù)類似,定義一組對MQ的操作,統(tǒng)一提交,成功則全部一起執(zhí)行,失敗則全部回滾。AMQP事務(wù)在spring boot中的使用很簡單,和數(shù)據(jù)庫的事務(wù)一樣,一個注解就可以搞定。
@GetMapping("/direct/wx/transactional") @Transactional(rollbackFor = Exception.class) public String sendDirectMessageTransactional() { rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!"); log.info("開啟事務(wù)消息機制"); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "ok"; }
3.2.confirm
confirm是基于channel的,一旦channel進入confirm模式,所有在該channel上發(fā)布的消息都會被指派一個唯一的ID(從1開始),消息被投遞道匹配隊列后broker會發(fā)送一個確認消息給生產(chǎn)者。如果消息和隊列是可持久化的(durable為true),那么確認消息會在消息被寫入磁盤后發(fā)出。
confirm最大的好處在于異步,生產(chǎn)者在等待上一條消息的確認消息的時候可以繼續(xù)往下發(fā)送。
confirm在spring boot中的使用很簡單,在配置文件中開啟即可,并且支持自定義回調(diào)函數(shù):
配置文件:
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns: true
生產(chǎn)者:
@Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object msg) { // 設(shè)置交換機處理失敗消息的模式 true 表示消息由交換機 到達不了隊列時,會將消息重新返回給生產(chǎn)者 // 如果不設(shè)置這個指令,則交換機向隊列推送消息失敗后,不會觸發(fā) setReturnCallback rabbitTemplate.setMandatory(true); //消息消費者確認收到消息后,手動ack回執(zhí) rabbitTemplate.setConfirmCallback(this); // 暫時關(guān)閉 return 配置 //rabbitTemplate.setReturnCallback(this); //發(fā)送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } /** * 交換機并未將數(shù)據(jù)丟入指定的隊列中時,觸發(fā) * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 參數(shù)三:true 表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄 * @param message 消息對象 * @param replyCode 錯誤碼 * @param replyText 錯誤信息 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); } /** * 消息生產(chǎn)者發(fā)送消息至交換機時觸發(fā),用于判斷交換機是否成功收到消息 * @param correlationData 相關(guān)配置信息 * @param ack exchange 交換機,判斷交換機是否成功收到消息 true 表示交換機收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交換機接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 沒有接收到 log.info("---- confirm ----ack==false cause="+cause); } } }
到此這篇關(guān)于RabbitMq消息防丟失功能實現(xiàn)方式講解的文章就介紹到這了,更多相關(guān)RabbitMq消息防丟失內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基于elasticsearch實現(xiàn)集群管理
這篇文章主要介紹了java基于elasticsearch實現(xiàn)集群管理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-02-02SpringCloud Gateway 利用 Mysql 實現(xiàn)動態(tài)路由的方法
這篇文章主要介紹了SpringCloud Gateway 利用 Mysql 實現(xiàn)動態(tài)路由的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-02-02解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題
這篇文章主要介紹了解決mybatis-plus新增數(shù)據(jù)自增ID變無序問題,具有很好的參考價值,希望對大家有所幫助。2023-07-07Java之InputStreamReader類的實現(xiàn)
這篇文章主要介紹了Java之InputStreamReader類的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明
這篇文章主要介紹了BCryptPasswordEncoder加密與MD5加密的區(qū)別及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08