RabbitMQ進階之消息可靠性詳解
消息的可靠性
Rabbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個很重要的問題。哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務消息不會被丟失。
我們可以從消息的收發(fā)過程中來分析,消息首先要從生產(chǎn)者producer發(fā)送到broker,再從broker把消息發(fā)送給消費者consumer。
所以我們總的可以從發(fā)送方(生產(chǎn)者)確認和接收方(消費者)確認來保證消息的可靠性。
異常捕獲機制
先執(zhí)行業(yè)務操作,業(yè)務操作成功后執(zhí)行行消息發(fā)送,消息發(fā)送過程通過try catch 方式捕獲異常, 在異常處理理的代碼塊中執(zhí)行回滾業(yè)務操作或者執(zhí)行重發(fā)操作等。
這是一種最大努力確保的方式, 并無法保證100%絕對可靠,因為這里沒有異常并不代表消息就一定投遞成功。
另外,可以通過spring.rabbitmq.template.retry.enabled=true 配置開啟發(fā)送端的重試。
AMQP/RabbitMQ的事務機制
沒有捕獲到異常并不能代表消息就一定投遞成功了。 一直到事務提交后都沒有異常,確實就說明消息是投遞成功了。
但是,這種方式在性能方面的開銷 比較大,一般也不推薦使用。
- 事務實現(xiàn)
channel.txSelect(): 將當前信道設置成事務模式 channel.txCommit(): 用于提交事務 channel.txRollback(): 用于回滾事務
發(fā)送端確認機制
RabbitMQ后來引入了一種輕量量級的方式,叫發(fā)送方確認(publisher confirm)機制。生產(chǎn)者將信 道設置成confirm(確認)模式,一旦信道進入confirm 模式,所有在該信道上?面發(fā)布的消息都會被指派 一個唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊列之后(如果消息和隊列是持久化的,那么 確認消息會在消息持久化后發(fā)出),RabbitMQ 就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達了。
RabbitMQ 回傳給生產(chǎn)者的確認消息中的deliveryTag 字段包含了確認消息的序號,另外,通過設置channel.basicAck方法中的multiple參數(shù),表示到這個序號之前的所有消息是否都已經(jīng)得到了處理了。生產(chǎn)者投遞消息后并不需要一直阻塞著,可以繼續(xù)投遞下一條消息并通過回調方式處理理ACK響應。
如果 RabbitMQ 因為自身內部錯誤導致消息丟失等異常情況發(fā)生,就會響應一條nack(Basic.Nack)命令,生產(chǎn)者應用程序同樣可以在回調方法中處理理該 nack 命令。
package confirm; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import util.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherConfirmsProducer { public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); try { // 發(fā)送消息 for (int i = 1 ; i < 10000 ; i++){ channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes()); } // 同步的方式等待RabbitMQ的確認消息 channel.waitForConfirmsOrDie(5000); System.out.println("發(fā)送的消息已經(jīng)得到確認"); } catch (IOException ex) { System.out.println("消息被拒收"); } catch (IllegalStateException ex) { System.out.println("發(fā)送消息的通道不是PublisherConfirms通道"); } catch (TimeoutException ex) { System.out.println("等待消息確認超時"); } channel.close(); connection.close(); } }
waitForConfirm方法有個重載的,可以自定義timeout超時時間,超時后會拋TimeoutException。類似的有幾個waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后該方法會拋出java.io.IOException。
需要根據(jù)異常類型來做區(qū)別處理理, TimeoutException超時是屬于第三狀態(tài)(無法確定成功還是失敗),而返回Basic.Nack拋出IOException這種是明確的失敗。上面的代碼主要只是演示confirm機制,實際上還是同步阻塞模式的,性能并不不是太好。
實際上,我們也可以通過“批處理理”的方式來改善整體的性能(即批量量發(fā)送消息后僅調用一次 waitForConfirms方法)。正常情況下這種批量處理的方式效率會高很多,但是如果發(fā)生了超時或者nack(失?。┖竽蔷托枰苛恐匕l(fā)消息或者通知上游業(yè)務批量回滾(因為我們只知道這個批次中有消息沒投遞成功,而并不知道具體是那條消息投遞失敗了,所以很難針對性處理),如此看來,批量重發(fā)消息肯定會造成部分消息重復。
另外,我們可以通過異步回調的方式來處理Broker的響應。addConfirmListener 方法可以添加ConfirmListener 這個回調接口,這個 ConfirmListener 接口包含兩個方法:handleAck 和handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack 和 Basic.Nack。
package confirm; /** * 創(chuàng)建者: 魏紅 * 創(chuàng)建時間: 2023-02-28 * 描述: */ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import util.ConnectionUtil; public class PublisherConfirmsProducer2 { public static void main(String[] args) throws Exception { //獲取連接 Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); String message = "hello-"; // 批處理的大小 int batchSize = 10; // 用于對需要等待確認消息的計數(shù) int outstrandingConfirms = 0; for (int i = 0; i < 10000; i++) { channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); outstrandingConfirms++; if (outstrandingConfirms == batchSize) { // 此時已經(jīng)有一個批次的消息需要同步等待broker的確認消息 // 同步等待 channel.waitForConfirmsOrDie(5000); System.out.println("消息已經(jīng)被確認了"); outstrandingConfirms = 0; } } if (outstrandingConfirms > 0) { channel.waitForConfirmsOrDie(5000); System.out.println("剩余消息已經(jīng)被確認了"); } channel.close(); connection.close(); } }
還可以使用異步方法:
package confirm; /** * 創(chuàng)建者: 魏紅 * 創(chuàng)建時間: 2023-02-28 * 描述: */ import com.rabbitmq.client.*; import util.ConnectionUtil; import javax.management.loading.MLet; import java.io.IOException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; public class PublisherConfirmsProducer3 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道 final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); channel.queueDeclare("queue.pc", true, false, false, null); channel.exchangeDeclare("ex.pc", "direct", true, false, null); channel.queueBind("queue.pc", "ex.pc", "key.pc"); // ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() { // @Override // public void handle(long deliveryTag, boolean multiple) throws IOException { // if (multiple) { // System.out.println("編號小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認了"); // } else { // System.out.println("編號為:" + deliveryTag + " 的消息被確認"); // } // } // }; ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> { if (multiple) { System.out.println("編號小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認了"); final ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(deliveryTag, true); // 清空outstandingConfirms中已經(jīng)被確認的消息信息 headMap.clear(); } else { // 移除已經(jīng)被確認的消息 outstandingConfirms.remove(deliveryTag); System.out.println("編號為:" + deliveryTag + " 的消息被確認"); } }; ConfirmCallback confirmCallback = (deliveryTag, multiple) -> { if (multiple) { // 將沒有確認的消息記錄到一個集合中 // 此處省略實現(xiàn) System.out.println("消息編號小于等于:" + deliveryTag + " 的消息 不確認"); } else { System.out.println("編號為:" + deliveryTag + " 的消息不確認"); } }; // 設置channel的監(jiān)聽器,處理確認的消息和不確認的消息 channel.addConfirmListener(clearOutstandingConfirms, confirmCallback); String message = "hello-"; for (int i = 0; i < 500000; i++) { // 獲取下一條即將發(fā)送的消息的消息ID final long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes()); System.out.println("編號為:" + nextPublishSeqNo + " 的消息已經(jīng)發(fā)送成功,尚未確認"); outstandingConfirms.put(nextPublishSeqNo, (message + i)); } // 等待消息被確認 Thread.sleep(10000); channel.close(); connection.close(); } }
持久化存儲機制
持久化是提高RabbitMQ可靠性的基礎,否則當RabbitMQ遇到異常時(如:重啟、斷電、停機等)數(shù)據(jù)將會丟失。主要從以下幾個方面來保障消息的持久性:
- Exchange的持久化。通過定義時設置durable 參數(shù)為ture來保證Exchange相關的元數(shù)據(jù)不不丟失。
- Queue的持久化。也是通過定義時設置durable 參數(shù)為ture來保證Queue相關的元數(shù)據(jù)不不丟失。
- 消息的持久化。通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設置為 2即可實現(xiàn)消息的持久化,保證消息自身不丟失。
接收端確認機制
如何保證消息被消費者成功消費?
前面我們講了生產(chǎn)者發(fā)送確認機制和消息的持久化存儲機制,然而這依然無法完全保證整個過程的 可靠性,因為如果消息被消費過程中業(yè)務處理失敗了但是消息卻已經(jīng)出列了(被標記為已消費了),我 們又沒有任何重試,那結果跟消息丟失沒什么分別。
RabbitMQ在消費端會有Ack機制,即消費端消費消息后需要發(fā)送Ack確認報文給Broker端,告知自 己是否已消費完成,否則可能會一直重發(fā)消息直到消息過期(AUTO模式)。這也是我們之前一直在講的“最終一致性”、“可恢復性” 的基礎。
一般而言,我們有如下處理手段:
- 采用NONE模式,消費的過程中自行捕獲異常,引發(fā)異常后直接記錄日志并落到異?;謴捅恚偻ㄟ^后臺定時任務掃描異?;謴捅韲L試做重試動作。如果業(yè)務不自行處理則有丟失數(shù)據(jù)的風險
- 采用AUTO(自動Ack)模式,不主動捕獲異常,當消費過程中出現(xiàn)異常時會將消息放回Queue中,然后消息會被重新分配到其他消費者節(jié)點(如果沒有則還是選擇當前節(jié)點)重新被消費,默認會一直重發(fā)消息并直到消費完成返回Ack或者一直到過期
- 采用MANUAL(手動Ack)模式,消費者自行控制流程并手動調用channel相關的方法返回Ack
package workmode; import com.rabbitmq.client.*; import util.ConnectionUtil; import java.io.IOException; /** * NONE模式,則只要收到消息后就立即確認(消息出列,標記已消費),有丟失數(shù)據(jù)的風險 * AUTO模式,看情況確認,如果此時消費者拋出異常則消息會返回到隊列中 * MANUAL模式,需要顯式的調用當前channel的basicAck方法 */ public class Recer2 { public static void main(String[] args) throws Exception { // 1.獲得連接 Connection connection = ConnectionUtil.getConnection(); // 2.獲得通道(信道) final Channel channel = connection.createChannel(); channel.queueDeclare("work_queue",false,false,false,null); // 3.從信道中獲得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override //交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); // System.out.println("【顧客2】吃掉 " + s+" ! 總共吃【"+i+++"】串!"); System.out.println("【消費者2】得到 " + s); // 模擬網(wǎng)絡延遲 try{ Thread.sleep(400); }catch (Exception e){ } // 手動確認(收件人信息,是否同時確認多個消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 4.監(jiān)聽隊列 false:手動消息確認 channel.basicConsume("work_queue", false,consumer); } }
本小節(jié)的內容總結起來就如圖所示,本質上就是“請求/應答”確認模式
到此這篇關于RabbitMQ進階之消息可靠性詳解的文章就介紹到這了,更多相關RabbitMQ消息可靠性內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
一次排查@CacheEvict注解失效的經(jīng)歷及解決
這篇文章主要介紹了一次排查@CacheEvict注解失效的經(jīng)歷及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Spring Boot 2 實戰(zhàn):自定義啟動運行邏輯實例詳解
這篇文章主要介紹了Spring Boot 2 實戰(zhàn):自定義啟動運行邏輯,結合實例形式詳細分析了Spring Boot 2自定義啟動運行邏輯詳細操作技巧與注意事項,需要的朋友可以參考下2020-05-05java多線程之wait(),notify(),notifyAll()的詳解分析
本篇文章是對java多線程 wait(),notify(),notifyAll()進行了詳細的分析介紹,需要的朋友參考下2013-06-06JAVA集成Freemarker生成靜態(tài)html過程解析
這篇文章主要介紹了JAVA集成Freemarker生成靜態(tài)html過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06