詳解RabbitMq如何做到消息的可靠性投遞
前言
現(xiàn)在的一些互聯(lián)網(wǎng)項(xiàng)目或者是高并發(fā)的項(xiàng)目中很少有沒有引入消息隊(duì)列的。 引入消息隊(duì)列可以給這個(gè)項(xiàng)目帶來很多的好處:比如
- 削峰
這個(gè)就很好的理解,在系統(tǒng)中的請求量是固定的,但是有的時(shí)候會(huì)多出很多的突發(fā)流量,比如在有秒殺活動(dòng)的時(shí)候,這種瞬時(shí)的高流量可能會(huì)打垮系統(tǒng),這個(gè)時(shí)候就可以很好的引入MQ,將這些請求積壓到MQ中,然后消費(fèi)端在按照自已的能力去處理這里請求
- 解耦合
比如現(xiàn)在有系統(tǒng)A,當(dāng)系統(tǒng)A執(zhí)行完成后,B、C系統(tǒng)需要拿到A系統(tǒng)的結(jié)果才可以繼續(xù)執(zhí)行,如果不引入MQ,A系統(tǒng)還要調(diào)用B、C系統(tǒng),這樣這A、B、C三個(gè)系統(tǒng)的耦合性就很大。引入MQ后A系統(tǒng)的執(zhí)行結(jié)果只需要保證將消息投遞到MQ就好,其它的兩個(gè)系統(tǒng)只需要監(jiān)聽這個(gè)MQ的某個(gè)隊(duì)列,這樣就降低了這三個(gè)系統(tǒng)之間的耦合性。
- 異步
再通過A、B、C這三個(gè)系統(tǒng)舉例。A系統(tǒng)在返回給用戶的執(zhí)行結(jié)果前需要完成B、C系統(tǒng)的調(diào)用,這個(gè)總的執(zhí)行時(shí)間是A+B+C的執(zhí)行時(shí)間,如果引入MQ,A系統(tǒng)的執(zhí)行完成后將數(shù)據(jù)投遞到MQ,直接響應(yīng)用戶。B、C再這在通過監(jiān)聽完成數(shù)據(jù)的處理。這樣也降低了用戶的等待時(shí)間
除了這些好處,當(dāng)然引入MQ還會(huì)有不好的地方:比如
- 數(shù)據(jù)一致性問題
- A系統(tǒng)執(zhí)行完將數(shù)據(jù)投遞到了MQ,B、C在消費(fèi)的時(shí)候如果出現(xiàn)了問題,是不是就導(dǎo)致了數(shù)據(jù)不一致的問題
- 可用性降低
- 一個(gè)好好的系統(tǒng),引入一個(gè)MQ,如果這個(gè)MQ拓機(jī)了呢?這個(gè)可能就需要集群來提高M(jìn)Q的高可用。
- 系統(tǒng)的復(fù)雜度提高
- 引入了MQ,我們還需要關(guān)注消息是否被成功的投遞,MQ中的消息被積壓太多怎么辦?消費(fèi)端是否成功的消費(fèi)的消息。
這些都是問題,所在是否要引入MQ還需要看業(yè)務(wù)需求
RabbitMq的投遞及消費(fèi)流程
這里有張投遞消息到消費(fèi)的流程圖
從這張圖上可看出這也是一種AMQP協(xié)議的實(shí)現(xiàn)。消息的提供者先是通過某一個(gè)信道將消息發(fā)送到交換機(jī),然后交換機(jī)通通RoutingKey來將消息分發(fā)到某一個(gè)隊(duì)列上。然后,消費(fèi)者在臨聽某一個(gè)隊(duì)列來進(jìn)行消息的消費(fèi)。
今天我們的主題是如何保證消息的投遞可靠性。那么我們來想想在這個(gè)流程中那些位置可能會(huì)影響我們消息的投遞可靠性?
從上圖中我們可以總結(jié)出有二個(gè)因素影響著消息是否被成功投遞和被成功消費(fèi)
提供者
- 提供者有沒有將消成功的發(fā)送到MQ并被處理
- 發(fā)送到MQ中的消息有沒有成功的被路由到隊(duì)列中
消費(fèi)者
- 消費(fèi)者有沒有成功的簽收消息并成功處理。
- 消費(fèi)者是否可以保證消費(fèi)者的穩(wěn)定性
提供者如何確保消息的成功投遞
解決這個(gè)問題,我們可以通過提供者的發(fā)送方確認(rèn)機(jī)制來實(shí)現(xiàn),這個(gè)發(fā)送方確認(rèn)機(jī)制又分成三種:
- 單條消息的同步確認(rèn)
- 多條消息的同步確認(rèn)
- 異步消息確認(rèn)
單條消息的同步確認(rèn)
首先要在當(dāng)前的Channel上開啟消息確認(rèn)模式,然后通過waitForConfirms()方法進(jìn)行消息確認(rèn)是否發(fā)送成功。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號(hào)"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put("name","1111"); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr.getBytes()); boolean isSendSuccess = channel.waitForConfirms(); if(isSendSuccess){ System.out.print("消息發(fā)送成功"); } } }
這樣做的話每次發(fā)完消息后,都會(huì)確保消息是否發(fā)送成功。如果發(fā)送失敗的話進(jìn)行相應(yīng)的處理。
多條消息的同步確認(rèn)
多條消息的確認(rèn)和單條的差不多,比如我將發(fā)送消息的代碼放到一個(gè)循環(huán)內(nèi)。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號(hào)"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); Map<String,String> mes = new HashMap<>(); mes.put("name","1111"); String messageStr = objectMapper.writeValueAsString(mes); for(int i = 0;i < 100;i++){ channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr()); } boolean isSendSuccess = channel.waitForConfirms(); System.out.println(isSendSuccess); } }
這樣的話當(dāng)一批消息發(fā)送完成后,進(jìn)行統(tǒng)一的消息確認(rèn)是否發(fā)送成功,就成了多條的消息確認(rèn),不過并不推薦使用這種確認(rèn)消息的方式
在多條的消息確認(rèn)中,比如我先是發(fā)送了一批的消息,比如這批消息有100條,這個(gè)時(shí)候如果有其中的一條消息沒有發(fā)送成功,這里返回的也是false,然爾我們并不能知道是具體的哪 一條消息發(fā)送失敗。
異步消息確認(rèn)
異步的消息確認(rèn)是通過一個(gè)監(jiān)聽器來實(shí)現(xiàn)的,當(dāng)消息發(fā)送后,會(huì)接著執(zhí)行下面的邏輯,可能在稍會(huì)的一段時(shí)間,監(jiān)聽器監(jiān)聽到了Broker的返回,再進(jìn)行邏輯的處理。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號(hào)"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.confirmSelect(); ConfirmListener confirmListener = new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("發(fā)送成功:" + deliveryTag + " multiple:" + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("發(fā)送失?。? + deliveryTag); } }; channel.addConfirmListener(confirmListener); Map<String,String> mes = new HashMap<>(); mes.put("name","11111"); String messageStr = objectMapper.writeValueAsString(mes); for(int i = 0;i < 100;i++){ channel.basicPublish( "exchange.drinks", "drinks.juzi", null, messageStr.getBytes()); } Thread.sleep(Integer.MAX_VALUE); } }
當(dāng)成功的發(fā)送消息的時(shí)候會(huì)回調(diào)監(jiān)聽器中的handleAck
方法,如果沒有發(fā)送成功會(huì)回調(diào)handleNack
方法 在這個(gè)監(jiān)聽器里面有兩個(gè)參數(shù)一個(gè)deliveryTag
和multiple
:
- deliveryTag:表示當(dāng)前的Channel發(fā)送的第幾條消息
- multiple:是否在確認(rèn)多條消息
這個(gè)異步的雖然在聽覺上感覺比較厲害些,這里也不推薦使用,原因和上面的一樣,我們并不能具休的知道是哪一條消息沒有被確認(rèn)發(fā)送。
綜上:這里更加推薦單條消息確認(rèn),具體選擇哪一種還是要用業(yè)務(wù)做出選擇
注:注意一點(diǎn)是當(dāng)一條消息成功的發(fā)送到Broker,但是如果沒有正確的路由到隊(duì)列,那么這時(shí)borker也是會(huì)返回true,因?yàn)锽roker確時(shí)接收到了消息只是RoutingKey不可達(dá),所以這里也會(huì)返回true,并且直接將消息丟棄
消息的返回機(jī)制
這個(gè)消息返回機(jī)制的作用就是在當(dāng)一個(gè)消息成功的發(fā)送,但是并沒有正確路由到隊(duì)列的時(shí)候所回調(diào)的。
這也彌補(bǔ)了上面確認(rèn)消息是否發(fā)送成功但沒有路由到隊(duì)列所返回true的問題 在使用消息返回機(jī)制的時(shí)候在發(fā)送消息時(shí)需要將mandatory
置成true。再添加對應(yīng)的監(jiān)聽器。
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("host"); cf.setPort(5672); cf.setUsername("賬號(hào)"); cf.setPassword("密碼"); try(Connection connection = cf.newConnection(); Channel channel = connection.createChannel()){ channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) { System.out.println("replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:" + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody())); } }); Map<String,String> mes = new HashMap<>(); mes.put("name","11111"); String messageStr = objectMapper.writeValueAsString(mes); channel.basicPublish( "exchange.drinks", "drinks.juzi1", true, null, messageStr.getBytes()); Thread.sleep(Integer.MAX_VALUE); } }
這里的addReturnListener方法有兩個(gè)重載:只不過是handle的參數(shù)不同,一個(gè)是參數(shù)都顯示在了參數(shù)列表內(nèi),一個(gè)是將參數(shù)封裝到了Return對象內(nèi)。當(dāng)handle被回調(diào)的時(shí)候也可以獲取到相應(yīng)的參數(shù)比如:exchange routingkey body。
注:保證消息可靠性投遞的前提是服務(wù)的高可用,服務(wù)不高可用談其它的都是扯
以上就是詳解RabbitMq如何做到消息的可靠性投遞的詳細(xì)內(nèi)容,更多關(guān)于RabbitMq 消息可靠性投遞的資料請關(guān)注腳本之家其它相關(guān)文章!
- RabbitMQ消息確認(rèn)機(jī)制剖析
- 前端與RabbitMQ實(shí)時(shí)消息推送未讀消息小紅點(diǎn)實(shí)現(xiàn)示例
- springboot+rabbitmq實(shí)現(xiàn)智能家居實(shí)例詳解
- kafka?rabbitMQ及rocketMQ隊(duì)列的消息可靠性保證分析
- Golang?rabbitMQ生產(chǎn)者消費(fèi)者實(shí)現(xiàn)示例
- python操作RabbitMq的三種工作模式
- RabbitMQ消息隊(duì)列實(shí)現(xiàn)延遲任務(wù)示例
- RabbitMQ消費(fèi)端ACK NACK及重回隊(duì)列機(jī)制詳解
相關(guān)文章
Arthas排查Kubernetes中應(yīng)用頻繁掛掉重啟異常
這篇文章主要為大家介紹了Arthas排查Kubernetes中應(yīng)用頻繁掛掉重啟的異常分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助祝大家多多進(jìn)步2022-02-02詳解Java如何優(yōu)雅的調(diào)用dubbo同時(shí)不使用其它jar包
這篇文章主要介紹了如何在不使用他人jar包的情況下優(yōu)雅的進(jìn)行dubbo調(diào)用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-02-02Java使用自定義注解實(shí)現(xiàn)為事件源綁定事件監(jiān)聽器操作示例
這篇文章主要介紹了Java使用自定義注解實(shí)現(xiàn)為事件源綁定事件監(jiān)聽器操作,結(jié)合實(shí)例形式分析了java自定義注解、注解處理、事件監(jiān)聽與響應(yīng)等相關(guān)操作技巧,需要的朋友可以參考下2019-10-10Java線程安全的常用類_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
在集合框架中,有些類是線程安全的,這些都是jdk1.1中的出現(xiàn)的。在jdk1.2之后,就出現(xiàn)許許多多非線程安全的類。 下面是這些線程安全的同步的類2017-06-06