亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ進階之消息可靠性詳解

 更新時間:2023年08月10日 10:53:23   作者:程序員阿紅  
這篇文章主要介紹了RabbitMQ進階之消息可靠性詳解,abbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個很重要的問題,哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務消息不會被丟失,需要的朋友可以參考下

消息的可靠性

Rabbitmq消息的投遞過程中,怎么確保消息能不丟失,這是一個很重要的問題。哪怕我們做了Rabbitmq持久化,也不能保證我們的業(yè)務消息不會被丟失。

我們可以從消息的收發(fā)過程中來分析,消息首先要從生產(chǎn)者producer發(fā)送到broker,再從broker把消息發(fā)送給消費者consumer。

image-20230321151738456

所以我們總的可以從發(fā)送方(生產(chǎn)者)確認和接收方(消費者)確認來保證消息的可靠性。

image-20230321150208466

異常捕獲機制

先執(zhí)行業(yè)務操作,業(yè)務操作成功后執(zhí)行行消息發(fā)送,消息發(fā)送過程通過try catch 方式捕獲異常, 在異常處理理的代碼塊中執(zhí)行回滾業(yè)務操作或者執(zhí)行重發(fā)操作等。

這是一種最大努力確保的方式, 并無法保證100%絕對可靠,因為這里沒有異常并不代表消息就一定投遞成功。

image-20230321152637400

另外,可以通過spring.rabbitmq.template.retry.enabled=true 配置開啟發(fā)送端的重試。

AMQP/RabbitMQ的事務機制

沒有捕獲到異常并不能代表消息就一定投遞成功了。 一直到事務提交后都沒有異常,確實就說明消息是投遞成功了。

但是,這種方式在性能方面的開銷 比較大,一般也不推薦使用。

  • 事務實現(xiàn)
channel.txSelect(): 將當前信道設置成事務模式
channel.txCommit(): 用于提交事務
channel.txRollback(): 用于回滾事務

image-20230321152859934

發(fā)送端確認機制

RabbitMQ后來引入了一種輕量量級的方式,叫發(fā)送方確認(publisher confirm)機制。生產(chǎn)者將信 道設置成confirm(確認)模式,一旦信道進入confirm 模式,所有在該信道上?面發(fā)布的消息都會被指派 一個唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊列之后(如果消息和隊列是持久化的,那么 確認消息會在消息持久化后發(fā)出),RabbitMQ 就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一 ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達了。

image-20230321153131995

RabbitMQ 回傳給生產(chǎn)者的確認消息中的deliveryTag 字段包含了確認消息的序號,另外,通過設置channel.basicAck方法中的multiple參數(shù),表示到這個序號之前的所有消息是否都已經(jīng)得到了處理了。生產(chǎn)者投遞消息后并不需要一直阻塞著,可以繼續(xù)投遞下一條消息并通過回調方式處理理ACK響應。

如果 RabbitMQ 因為自身內部錯誤導致消息丟失等異常情況發(fā)生,就會響應一條nack(Basic.Nack)命令,生產(chǎn)者應用程序同樣可以在回調方法中處理理該 nack 命令。

package confirm;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherConfirmsProducer {
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        try {
        // 發(fā)送消息
        for (int i = 1 ; i < 10000 ; i++){
            channel.basicPublish("ex.pc", "key.pc", null, "hello world".getBytes());
        }
            // 同步的方式等待RabbitMQ的確認消息
            channel.waitForConfirmsOrDie(5000);
            System.out.println("發(fā)送的消息已經(jīng)得到確認");
        } catch (IOException ex) {
            System.out.println("消息被拒收");
        } catch (IllegalStateException ex) {
            System.out.println("發(fā)送消息的通道不是PublisherConfirms通道");
        } catch (TimeoutException ex) {
            System.out.println("等待消息確認超時");
        }
        channel.close();
        connection.close();
    }
}

waitForConfirm方法有個重載的,可以自定義timeout超時時間,超時后會拋TimeoutException。類似的有幾個waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后該方法會拋出java.io.IOException。

需要根據(jù)異常類型來做區(qū)別處理理, TimeoutException超時是屬于第三狀態(tài)(無法確定成功還是失敗),而返回Basic.Nack拋出IOException這種是明確的失敗。上面的代碼主要只是演示confirm機制,實際上還是同步阻塞模式的,性能并不不是太好。

實際上,我們也可以通過“批處理理”的方式來改善整體的性能(即批量量發(fā)送消息后僅調用一次 waitForConfirms方法)。正常情況下這種批量處理的方式效率會高很多,但是如果發(fā)生了超時或者nack(失?。┖竽蔷托枰苛恐匕l(fā)消息或者通知上游業(yè)務批量回滾(因為我們只知道這個批次中有消息沒投遞成功,而并不知道具體是那條消息投遞失敗了,所以很難針對性處理),如此看來,批量重發(fā)消息肯定會造成部分消息重復。

另外,我們可以通過異步回調的方式來處理Broker的響應。addConfirmListener 方法可以添加ConfirmListener 這個回調接口,這個 ConfirmListener 接口包含兩個方法:handleAck 和handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack 和 Basic.Nack。

package confirm;
/**
 * 創(chuàng)建者: 魏紅
 * 創(chuàng)建時間: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import util.ConnectionUtil;
public class PublisherConfirmsProducer2 {
    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
        String message = "hello-";
        // 批處理的大小
        int batchSize = 10;
        // 用于對需要等待確認消息的計數(shù)
        int outstrandingConfirms = 0;
        for (int i = 0; i < 10000; i++) {
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            outstrandingConfirms++;
            if (outstrandingConfirms == batchSize) {
                // 此時已經(jīng)有一個批次的消息需要同步等待broker的確認消息
                // 同步等待
                channel.waitForConfirmsOrDie(5000);
                System.out.println("消息已經(jīng)被確認了");
                outstrandingConfirms = 0;
            }
        }
        if (outstrandingConfirms > 0) {
            channel.waitForConfirmsOrDie(5000);
            System.out.println("剩余消息已經(jīng)被確認了");
        }
        channel.close();
        connection.close();
    }
}

還可以使用異步方法:

package confirm;
/**
 * 創(chuàng)建者: 魏紅
 * 創(chuàng)建時間: 2023-02-28
 * 描述:
 */
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import javax.management.loading.MLet;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class PublisherConfirmsProducer3 {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 向RabbitMQ服務器發(fā)送AMQP命令,將當前通道標記為發(fā)送方確認通道
        final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.pc", true, false, false, null);
        channel.exchangeDeclare("ex.pc", "direct", true, false, null);
        channel.queueBind("queue.pc", "ex.pc", "key.pc");
//        ConfirmCallback clearOutstandingConfirms = new ConfirmCallback() {
//            @Override
//            public void handle(long deliveryTag, boolean multiple) throws IOException {
//                if (multiple) {
//                    System.out.println("編號小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認了");
//                } else {
//                    System.out.println("編號為:" + deliveryTag + " 的消息被確認");
//                }
//            }
//        };
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
            if (multiple) {
                System.out.println("編號小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認了");
                final ConcurrentNavigableMap<Long, String> headMap
                        = outstandingConfirms.headMap(deliveryTag, true);
                // 清空outstandingConfirms中已經(jīng)被確認的消息信息
                headMap.clear();
            } else {
                // 移除已經(jīng)被確認的消息
                outstandingConfirms.remove(deliveryTag);
                System.out.println("編號為:" + deliveryTag + " 的消息被確認");
            }
        };
        ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // 將沒有確認的消息記錄到一個集合中
                // 此處省略實現(xiàn)
                System.out.println("消息編號小于等于:" + deliveryTag + " 的消息 不確認");
            } else {
                System.out.println("編號為:" + deliveryTag + " 的消息不確認");
            }
        };
        // 設置channel的監(jiān)聽器,處理確認的消息和不確認的消息
        channel.addConfirmListener(clearOutstandingConfirms, confirmCallback);
        String message = "hello-";
        for (int i = 0; i < 500000; i++) {
            // 獲取下一條即將發(fā)送的消息的消息ID
            final long nextPublishSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
            System.out.println("編號為:" + nextPublishSeqNo + " 的消息已經(jīng)發(fā)送成功,尚未確認");
            outstandingConfirms.put(nextPublishSeqNo, (message + i));
        }
        // 等待消息被確認
        Thread.sleep(10000);
        channel.close();
        connection.close();
    }
}

持久化存儲機制

持久化是提高RabbitMQ可靠性的基礎,否則當RabbitMQ遇到異常時(如:重啟、斷電、停機等)數(shù)據(jù)將會丟失。主要從以下幾個方面來保障消息的持久性:

  • Exchange的持久化。通過定義時設置durable 參數(shù)為ture來保證Exchange相關的元數(shù)據(jù)不不丟失。
  • Queue的持久化。也是通過定義時設置durable 參數(shù)為ture來保證Queue相關的元數(shù)據(jù)不不丟失。
  • 消息的持久化。通過將消息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設置為 2即可實現(xiàn)消息的持久化,保證消息自身不丟失。

image-20230321153556625

接收端確認機制

如何保證消息被消費者成功消費?

前面我們講了生產(chǎn)者發(fā)送確認機制和消息的持久化存儲機制,然而這依然無法完全保證整個過程的 可靠性,因為如果消息被消費過程中業(yè)務處理失敗了但是消息卻已經(jīng)出列了(被標記為已消費了),我 們又沒有任何重試,那結果跟消息丟失沒什么分別。

RabbitMQ在消費端會有Ack機制,即消費端消費消息后需要發(fā)送Ack確認報文給Broker端,告知自 己是否已消費完成,否則可能會一直重發(fā)消息直到消息過期(AUTO模式)。這也是我們之前一直在講的“最終一致性”、“可恢復性” 的基礎。

一般而言,我們有如下處理手段:

  • 采用NONE模式,消費的過程中自行捕獲異常,引發(fā)異常后直接記錄日志并落到異?;謴捅恚偻ㄟ^后臺定時任務掃描異?;謴捅韲L試做重試動作。如果業(yè)務不自行處理則有丟失數(shù)據(jù)的風險
  • 采用AUTO(自動Ack)模式,不主動捕獲異常,當消費過程中出現(xiàn)異常時會將消息放回Queue中,然后消息會被重新分配到其他消費者節(jié)點(如果沒有則還是選擇當前節(jié)點)重新被消費,默認會一直重發(fā)消息并直到消費完成返回Ack或者一直到過期
  • 采用MANUAL(手動Ack)模式,消費者自行控制流程并手動調用channel相關的方法返回Ack
package workmode;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* NONE模式,則只要收到消息后就立即確認(消息出列,標記已消費),有丟失數(shù)據(jù)的風險
* AUTO模式,看情況確認,如果此時消費者拋出異常則消息會返回到隊列中
* MANUAL模式,需要顯式的調用當前channel的basicAck方法
 */
public class Recer2 {
    public static void main(String[] args) throws  Exception {
        // 1.獲得連接
        Connection connection = ConnectionUtil.getConnection();
        // 2.獲得通道(信道)
        final Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",false,false,false,null);
        // 3.從信道中獲得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override //交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息)
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
//                System.out.println("【顧客2】吃掉 " + s+" ! 總共吃【"+i+++"】串!");
                System.out.println("【消費者2】得到 " + s);
                // 模擬網(wǎng)絡延遲
                try{
                    Thread.sleep(400);
                }catch (Exception e){
                }
                // 手動確認(收件人信息,是否同時確認多個消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 4.監(jiān)聽隊列 false:手動消息確認
        channel.basicConsume("work_queue", false,consumer);
    }
}

本小節(jié)的內容總結起來就如圖所示,本質上就是“請求/應答”確認模式

image-20230321154156414

到此這篇關于RabbitMQ進階之消息可靠性詳解的文章就介紹到這了,更多相關RabbitMQ消息可靠性內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Kotlin教程之基本數(shù)據(jù)類型

    Kotlin教程之基本數(shù)據(jù)類型

    這篇文章主要介紹了Kotlin教程之基本數(shù)據(jù)類型的學習的相關資料,需要的朋友可以參考下
    2017-05-05
  • java 數(shù)據(jù)類型有哪些取值范圍多少

    java 數(shù)據(jù)類型有哪些取值范圍多少

    這篇文章主要介紹了java 數(shù)據(jù)類型有哪些取值范圍多少的相關資料,網(wǎng)上關于java 數(shù)據(jù)類型的資料有很多,不夠全面,這里就整理下,需要的朋友可以參考下
    2017-01-01
  • Java Set接口及常用實現(xiàn)類總結

    Java Set接口及常用實現(xiàn)類總結

    Collection的另一個子接口就是Set,他并沒有我們List常用,并且自身也沒有一些額外的方法,全是繼承自Collection中的,因此我們還是簡單總結一下,包括他的常用實現(xiàn)類HashSet、LinkedHashSet、TreeSet的總結
    2023-01-01
  • Spring:如何使用枚舉參數(shù)

    Spring:如何使用枚舉參數(shù)

    這篇文章主要介紹了springboot枚舉類型傳遞的步驟,幫助大家更好的理解和學習使用springboot,感興趣的朋友可以了解下,希望能給你帶來幫助
    2021-08-08
  • 一次排查@CacheEvict注解失效的經(jīng)歷及解決

    一次排查@CacheEvict注解失效的經(jīng)歷及解決

    這篇文章主要介紹了一次排查@CacheEvict注解失效的經(jīng)歷及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Maven多模塊及version修改的實現(xiàn)方法

    Maven多模塊及version修改的實現(xiàn)方法

    這篇文章主要介紹了Maven多模塊及version修改的實現(xiàn)方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-06-06
  • Spring Boot 2 實戰(zhàn):自定義啟動運行邏輯實例詳解

    Spring Boot 2 實戰(zhàn):自定義啟動運行邏輯實例詳解

    這篇文章主要介紹了Spring Boot 2 實戰(zhàn):自定義啟動運行邏輯,結合實例形式詳細分析了Spring Boot 2自定義啟動運行邏輯詳細操作技巧與注意事項,需要的朋友可以參考下
    2020-05-05
  • 輕松掌握java責任鏈模式

    輕松掌握java責任鏈模式

    這篇文章主要幫助大家輕松掌握java責任鏈模式,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • java多線程之wait(),notify(),notifyAll()的詳解分析

    java多線程之wait(),notify(),notifyAll()的詳解分析

    本篇文章是對java多線程 wait(),notify(),notifyAll()進行了詳細的分析介紹,需要的朋友參考下
    2013-06-06
  • JAVA集成Freemarker生成靜態(tài)html過程解析

    JAVA集成Freemarker生成靜態(tài)html過程解析

    這篇文章主要介紹了JAVA集成Freemarker生成靜態(tài)html過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-06-06

最新評論