RabbitMQ進(jìn)階之消息可靠性詳解
消息的可靠性
Rabbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個(gè)很重要的問題。哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務(wù)消息不會(huì)被丟失。
我們可以從消息的收發(fā)過程中來分析,消息首先要從生產(chǎn)者producer發(fā)送到broker,再從broker把消息發(fā)送給消費(fèi)者consumer。

所以我們總的可以從發(fā)送方(生產(chǎn)者)確認(rèn)和接收方(消費(fèi)者)確認(rèn)來保證消息的可靠性。

異常捕獲機(jī)制
先執(zhí)行業(yè)務(wù)操作,業(yè)務(wù)操作成功后執(zhí)行行消息發(fā)送,消息發(fā)送過程通過try catch 方式捕獲異常, 在異常處理理的代碼塊中執(zhí)行回滾業(yè)務(wù)操作或者執(zhí)行重發(fā)操作等。
這是一種最大努力確保的方式, 并無法保證100%絕對(duì)可靠,因?yàn)檫@里沒有異常并不代表消息就一定投遞成功。

另外,可以通過spring.rabbitmq.template.retry.enabled=true 配置開啟發(fā)送端的重試。
AMQP/RabbitMQ的事務(wù)機(jī)制
沒有捕獲到異常并不能代表消息就一定投遞成功了。 一直到事務(wù)提交后都沒有異常,確實(shí)就說明消息是投遞成功了。
但是,這種方式在性能方面的開銷 比較大,一般也不推薦使用。
- 事務(wù)實(shí)現(xiàn)
channel.txSelect(): 將當(dāng)前信道設(shè)置成事務(wù)模式 channel.txCommit(): 用于提交事務(wù) channel.txRollback(): 用于回滾事務(wù)

發(fā)送端確認(rèn)機(jī)制
RabbitMQ后來引入了一種輕量量級(jí)的方式,叫發(fā)送方確認(rèn)(publisher confirm)機(jī)制。生產(chǎn)者將信 道設(shè)置成confirm(確認(rèn))模式,一旦信道進(jìn)入confirm 模式,所有在該信道上?面發(fā)布的消息都會(huì)被指派 一個(gè)唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后(如果消息和隊(duì)列是持久化的,那么 確認(rèn)消息會(huì)在消息持久化后發(fā)出),RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達(dá)了。

RabbitMQ 回傳給生產(chǎn)者的確認(rèn)消息中的deliveryTag 字段包含了確認(rèn)消息的序號(hào),另外,通過設(shè)置channel.basicAck方法中的multiple參數(shù),表示到這個(gè)序號(hào)之前的所有消息是否都已經(jīng)得到了處理了。生產(chǎn)者投遞消息后并不需要一直阻塞著,可以繼續(xù)投遞下一條消息并通過回調(diào)方式處理理ACK響應(yīng)。
如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失等異常情況發(fā)生,就會(huì)響應(yīng)一條nack(Basic.Nack)命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理理該 nack 命令。
package confirm;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherConfirmsProducer {
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
try {
// 發(fā)送消息
for (int i = 1 ; i < 10000 ; i++){
channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());
}
// 同步的方式等待RabbitMQ的確認(rèn)消息
channel.waitForConfirmsOrDie(5000);
System.out.println("發(fā)送的消息已經(jīng)得到確認(rèn)");
} catch (IOException ex) {
System.out.println("消息被拒收");
} catch (IllegalStateException ex) {
System.out.println("發(fā)送消息的通道不是PublisherConfirms通道");
} catch (TimeoutException ex) {
System.out.println("等待消息確認(rèn)超時(shí)");
}
channel.close();
connection.close();
}
}waitForConfirm方法有個(gè)重載的,可以自定義timeout超時(shí)時(shí)間,超時(shí)后會(huì)拋TimeoutException。類似的有幾個(gè)waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后該方法會(huì)拋出java.io.IOException。
需要根據(jù)異常類型來做區(qū)別處理理, TimeoutException超時(shí)是屬于第三狀態(tài)(無法確定成功還是失?。?,而返回Basic.Nack拋出IOException這種是明確的失敗。上面的代碼主要只是演示confirm機(jī)制,實(shí)際上還是同步阻塞模式的,性能并不不是太好。
實(shí)際上,我們也可以通過“批處理理”的方式來改善整體的性能(即批量量發(fā)送消息后僅調(diào)用一次 waitForConfirms方法)。正常情況下這種批量處理的方式效率會(huì)高很多,但是如果發(fā)生了超時(shí)或者nack(失敗)后那就需要批量量重發(fā)消息或者通知上游業(yè)務(wù)批量回滾(因?yàn)槲覀冎恢肋@個(gè)批次中有消息沒投遞成功,而并不知道具體是那條消息投遞失敗了,所以很難針對(duì)性處理),如此看來,批量重發(fā)消息肯定會(huì)造成部分消息重復(fù)。
另外,我們可以通過異步回調(diào)的方式來處理Broker的響應(yīng)。addConfirmListener 方法可以添加ConfirmListener 這個(gè)回調(diào)接口,這個(gè) ConfirmListener 接口包含兩個(gè)方法:handleAck 和handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack 和 Basic.Nack。
package confirm;
/**
* 創(chuàng)建者: 魏紅
* 創(chuàng)建時(shí)間: 2023-02-28
* 描述:
*/
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
public class PublisherConfirmsProducer2 {
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello-";
// 批處理的大小
int batchSize = 10;
// 用于對(duì)需要等待確認(rèn)消息的計(jì)數(shù)
int outstrandingConfirms = 0;
for (int i = 0; i < 10000; i++) {
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
outstrandingConfirms++;
if (outstrandingConfirms == batchSize) {
// 此時(shí)已經(jīng)有一個(gè)批次的消息需要同步等待broker的確認(rèn)消息
// 同步等待
channel.waitForConfirmsOrDie(5000);
System.out.println("消息已經(jīng)被確認(rèn)了");
outstrandingConfirms = 0;
}
}
if (outstrandingConfirms > 0) {
channel.waitForConfirmsOrDie(5000);
System.out.println("剩余消息已經(jīng)被確認(rèn)了");
}
channel.close();
connection.close();
}
}還可以使用異步方法:
package confirm;
/**
* 創(chuàng)建者: 魏紅
* 創(chuàng)建時(shí)間: 2023-02-28
* 描述:
*/
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import javax.management.loading.MLet;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class PublisherConfirmsProducer3 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服務(wù)器發(fā)送AMQP命令,將當(dāng)前通道標(biāo)記為發(fā)送方確認(rèn)通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
// ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() {
// @Override
// public void handle(long deliveryTag, boolean multiple) throws IOException {
// if (multiple) {
// System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了");
// } else {
// System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)");
// }
// }
// };
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
System.out.println("編號(hào)小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認(rèn)了");
final ConcurrentNavigableMap<Long, String> headMap
= outstandingConfirms.headMap(deliveryTag, true);
// 清空outstandingConfirms中已經(jīng)被確認(rèn)的消息信息
headMap.clear();
} else {
// 移除已經(jīng)被確認(rèn)的消息
outstandingConfirms.remove(deliveryTag);
System.out.println("編號(hào)為:" + deliveryTag + " 的消息被確認(rèn)");
}
};
ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 將沒有確認(rèn)的消息記錄到一個(gè)集合中
// 此處省略實(shí)現(xiàn)
System.out.println("消息編號(hào)小于等于:" + deliveryTag + " 的消息 不確認(rèn)");
} else {
System.out.println("編號(hào)為:" + deliveryTag + " 的消息不確認(rèn)");
}
};
// 設(shè)置channel的監(jiān)聽器,處理確認(rèn)的消息和不確認(rèn)的消息
channel.addConfirmListener(clearOutstandingConfirms, confirmCallback);
String message = "hello-";
for (int i = 0; i < 500000; i++) {
// 獲取下一條即將發(fā)送的消息的消息ID
final long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
System.out.println("編號(hào)為:" + nextPublishSeqNo + " 的消息已經(jīng)發(fā)送成功,尚未確認(rèn)");
outstandingConfirms.put(nextPublishSeqNo, (message + i));
}
// 等待消息被確認(rèn)
Thread.sleep(10000);
channel.close();
connection.close();
}
}持久化存儲(chǔ)機(jī)制
持久化是提高RabbitMQ可靠性的基礎(chǔ),否則當(dāng)RabbitMQ遇到異常時(shí)(如:重啟、斷電、停機(jī)等)數(shù)據(jù)將會(huì)丟失。主要從以下幾個(gè)方面來保障消息的持久性:
- Exchange的持久化。通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Exchange相關(guān)的元數(shù)據(jù)不不丟失。
- Queue的持久化。也是通過定義時(shí)設(shè)置durable 參數(shù)為ture來保證Queue相關(guān)的元數(shù)據(jù)不不丟失。
- 消息的持久化。通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設(shè)置為 2即可實(shí)現(xiàn)消息的持久化,保證消息自身不丟失。

接收端確認(rèn)機(jī)制
如何保證消息被消費(fèi)者成功消費(fèi)?
前面我們講了生產(chǎn)者發(fā)送確認(rèn)機(jī)制和消息的持久化存儲(chǔ)機(jī)制,然而這依然無法完全保證整個(gè)過程的 可靠性,因?yàn)槿绻⒈幌M(fèi)過程中業(yè)務(wù)處理失敗了但是消息卻已經(jīng)出列了(被標(biāo)記為已消費(fèi)了),我 們又沒有任何重試,那結(jié)果跟消息丟失沒什么分別。
RabbitMQ在消費(fèi)端會(huì)有Ack機(jī)制,即消費(fèi)端消費(fèi)消息后需要發(fā)送Ack確認(rèn)報(bào)文給Broker端,告知自 己是否已消費(fèi)完成,否則可能會(huì)一直重發(fā)消息直到消息過期(AUTO模式)。這也是我們之前一直在講的“最終一致性”、“可恢復(fù)性” 的基礎(chǔ)。
一般而言,我們有如下處理手段:
- 采用NONE模式,消費(fèi)的過程中自行捕獲異常,引發(fā)異常后直接記錄日志并落到異常恢復(fù)表,再通過后臺(tái)定時(shí)任務(wù)掃描異常恢復(fù)表嘗試做重試動(dòng)作。如果業(yè)務(wù)不自行處理則有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
- 采用AUTO(自動(dòng)Ack)模式,不主動(dòng)捕獲異常,當(dāng)消費(fèi)過程中出現(xiàn)異常時(shí)會(huì)將消息放回Queue中,然后消息會(huì)被重新分配到其他消費(fèi)者節(jié)點(diǎn)(如果沒有則還是選擇當(dāng)前節(jié)點(diǎn))重新被消費(fèi),默認(rèn)會(huì)一直重發(fā)消息并直到消費(fèi)完成返回Ack或者一直到過期
- 采用MANUAL(手動(dòng)Ack)模式,消費(fèi)者自行控制流程并手動(dòng)調(diào)用channel相關(guān)的方法返回Ack
package workmode;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* NONE模式,則只要收到消息后就立即確認(rèn)(消息出列,標(biāo)記已消費(fèi)),有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
* AUTO模式,看情況確認(rèn),如果此時(shí)消費(fèi)者拋出異常則消息會(huì)返回到隊(duì)列中
* MANUAL模式,需要顯式的調(diào)用當(dāng)前channel的basicAck方法
*/
public class Recer2 {
public static void main(String[] args) throws Exception {
// 1.獲得連接
Connection connection = ConnectionUtil.getConnection();
// 2.獲得通道(信道)
final Channel channel = connection.createChannel();
channel.queueDeclare("work_queue",false,false,false,null);
// 3.從信道中獲得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override //交付處理(收件人信息,包裹上的快遞標(biāo)簽,協(xié)議的配置,消息)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
// System.out.println("【顧客2】吃掉 " + s+" ! 總共吃【"+i+++"】串!");
System.out.println("【消費(fèi)者2】得到 " + s);
// 模擬網(wǎng)絡(luò)延遲
try{
Thread.sleep(400);
}catch (Exception e){
}
// 手動(dòng)確認(rèn)(收件人信息,是否同時(shí)確認(rèn)多個(gè)消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 4.監(jiān)聽隊(duì)列 false:手動(dòng)消息確認(rèn)
channel.basicConsume("work_queue", false,consumer);
}
}本小節(jié)的內(nèi)容總結(jié)起來就如圖所示,本質(zhì)上就是“請(qǐng)求/應(yīng)答”確認(rèn)模式

到此這篇關(guān)于RabbitMQ進(jìn)階之消息可靠性詳解的文章就介紹到這了,更多相關(guān)RabbitMQ消息可靠性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Set接口及常用實(shí)現(xiàn)類總結(jié)
Collection的另一個(gè)子接口就是Set,他并沒有我們List常用,并且自身也沒有一些額外的方法,全是繼承自Collection中的,因此我們還是簡單總結(jié)一下,包括他的常用實(shí)現(xiàn)類HashSet、LinkedHashSet、TreeSet的總結(jié)2023-01-01
一次排查@CacheEvict注解失效的經(jīng)歷及解決
這篇文章主要介紹了一次排查@CacheEvict注解失效的經(jīng)歷及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Maven多模塊及version修改的實(shí)現(xiàn)方法
這篇文章主要介紹了Maven多模塊及version修改的實(shí)現(xiàn)方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-06-06
Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯實(shí)例詳解
這篇文章主要介紹了Spring Boot 2 實(shí)戰(zhàn):自定義啟動(dòng)運(yùn)行邏輯,結(jié)合實(shí)例形式詳細(xì)分析了Spring Boot 2自定義啟動(dòng)運(yùn)行邏輯詳細(xì)操作技巧與注意事項(xiàng),需要的朋友可以參考下2020-05-05
java多線程之wait(),notify(),notifyAll()的詳解分析
本篇文章是對(duì)java多線程 wait(),notify(),notifyAll()進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06
JAVA集成Freemarker生成靜態(tài)html過程解析
這篇文章主要介紹了JAVA集成Freemarker生成靜態(tài)html過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06

