亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java?RabbitMQ的持久化和發(fā)布確認(rèn)詳解

 更新時間:2022年03月08日 11:49:06   作者:江海i  
這篇文章主要為大家詳細(xì)介紹了RabbitMQ的持久化和發(fā)布確認(rèn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助

1. 持久化

當(dāng)RabbitMQ服務(wù)停掉以后消息生產(chǎn)者發(fā)送過的消息不丟失。默認(rèn)情況下RabbitMQ退出或者崩潰時,會忽視掉隊列和消息。為了保證消息不丟失需要將隊列和消息都標(biāo)記為持久化。

1.1 實現(xiàn)持久化

1.隊列持久化:在創(chuàng)建隊列時將channel.queueDeclare();第二個參數(shù)改為true。

2.消息持久化:在使用信道發(fā)送消息時channel.basicPublish();將第三個參數(shù)改為:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

/**
 * @Description 持久化MQ
 * @date 2022/3/7 9:14
 */
public class Producer3 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 持久化隊列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化消息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("發(fā)送消息:'" + msg + "'成功");
        }
    }
}

但是存儲消息還有存在一個緩存的間隔點,沒有真正的寫入磁盤,持久性保證不夠強(qiáng),但是對于簡單隊列而言也綽綽有余。

1.2 不公平分發(fā)

輪詢分發(fā)的方式在消費者處理效率不同的情況下并不適用。所以真正的公平應(yīng)該是遵循能者多勞的前提。

在消費者處修改channel.basicQos(1);表示開啟不公平分發(fā)

/**
 * @Description 不公平分發(fā)消費者
 * @date 2022/3/7 9:27
 */
public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模擬并發(fā)沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("線程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 設(shè)置不公平分發(fā)
        channel.basicQos(1);
        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消費者取消消費");
                });
    }
}

1.3 測試不公平分發(fā)

測試目的:是否能實現(xiàn)能者多勞。

測試方法:兩個消費者睡眠不同的事件來模擬處理事件不同,如果處理時間(睡眠時間)短的能夠處理多個消息就代表目的達(dá)成。

先啟動生產(chǎn)者創(chuàng)建隊列,再分別啟動兩個消費者。

生產(chǎn)者按照順序發(fā)四條消息:

在這里插入圖片描述

睡眠時間短的線程A接收到了三條消息

在這里插入圖片描述

而睡眠時間長的線程B只接收到的第二條消息:

在這里插入圖片描述

因為線程B在處理消息時消耗的時間較長,所以就將其他消息分配給了線程A。

實驗成功!

1.4 預(yù)取值

消息的發(fā)送和手動確認(rèn)都是異步完成的,因此就存在一個未確認(rèn)消息的緩沖區(qū),開發(fā)人員希望能夠限制緩沖區(qū)的大小,用來避免緩沖區(qū)里面無限制的未確認(rèn)消息問題。

這里的預(yù)期值就值得是上述方法channel.basicQos();里面的參數(shù),如果在當(dāng)前信道上存在等于參數(shù)的消息就不會在安排當(dāng)前信道進(jìn)行消費消息。

1.4.1 代碼測試

測試方法:

1.新建兩個不同的消費者分別給定預(yù)期值5個2。

2.給睡眠時間長的指定為5,時間短的指定為2。

3.假如按照指定的預(yù)期值獲取消息則表示測試成功,但并不是代表一定會按照5和2分配,這個類似于權(quán)重的判別。

代碼根據(jù)上述代碼修改預(yù)期值即可。

2. 發(fā)布確認(rèn)

發(fā)布確認(rèn)就是生產(chǎn)者發(fā)布消息到隊列之后,隊列確認(rèn)進(jìn)行持久化完畢再通知給生產(chǎn)者的過程。這樣才能保證消息不會丟失。

需要注意的是需要開啟隊列持久化才能使用確認(rèn)發(fā)布。
開啟方法:channel.confirmSelect();

2.1 單個確認(rèn)發(fā)布

是一種同步發(fā)布的方式,即發(fā)送完一個消息之后只有確認(rèn)它確認(rèn)發(fā)布后,后續(xù)的消息才會繼續(xù)發(fā)布,在指定的時間內(nèi)沒有確認(rèn)就會拋出異常。缺點就是特別慢。

/**
 * @Description 確認(rèn)發(fā)布——單個確認(rèn)
 * @date 2022/3/7 14:49
 */
public class SoloProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_solo";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產(chǎn)生隊列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認(rèn)發(fā)布
        channel.confirmSelect();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 單個發(fā)布確認(rèn)
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("發(fā)送消息:" + i);
            }
        }
        // 記錄結(jié)束時間
        long endTime = System.currentTimeMillis();
        System.out.println("發(fā)送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒");   }
}

2.2 批量確認(rèn)發(fā)布

一批一批的確認(rèn)發(fā)布可以提高系統(tǒng)的吞吐量。但是缺點是發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時,需要將整個批處理保存在內(nèi)存中,后面再重新發(fā)布。

/**
 * @Description 確認(rèn)發(fā)布——批量確認(rèn)
 * @date 2022/3/7 14:49
 */
public class BatchProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_batch";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產(chǎn)生隊列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認(rèn)發(fā)布
        channel.confirmSelect();
        // 設(shè)置一個多少一批確認(rèn)一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 批量發(fā)布確認(rèn)
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("發(fā)送消息:" + i);
                }
            }
        }
        // 記錄結(jié)束時間
        long endTime = System.currentTimeMillis();
        System.out.println("發(fā)送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

顯然效率要比單個確認(rèn)發(fā)布的高很多。

2.3 異步確認(rèn)發(fā)布

在編程上比上述兩個要復(fù)雜,但是性價比很高,無論是可靠性還行效率的都好很多,利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的。

/**
 * @Description 確認(rèn)發(fā)布——異步確認(rèn)
 * @date 2022/3/7 14:49
 */
public class AsyncProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產(chǎn)生隊列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認(rèn)發(fā)布
        channel.confirmSelect();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        // 確認(rèn)成功回調(diào)
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("確認(rèn)成功消息:" + deliveryTab);
        };
        // 確認(rèn)失敗回調(diào)
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未確認(rèn)的消息:" + deliveryTab);
        };
        // 消息監(jiān)聽器
        /**
         * addConfirmListener:
         *                  1. 確認(rèn)成功的消息;
         *                  2. 確認(rèn)失敗的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
        }

        // 記錄結(jié)束時間
        long endTime = System.currentTimeMillis();
        System.out.println("發(fā)送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.4 處理未確認(rèn)的消息

最好的處理方式把未確認(rèn)的消息放到一個基于內(nèi)存的能被發(fā)布線程訪問的隊列。

例如:ConcurrentLinkedQueue可以在確認(rèn)隊列confirm callbacks與發(fā)布線程之間進(jìn)行消息的傳遞。

處理方式:

1.記錄要發(fā)送的全部消息;

2.在發(fā)布成功確認(rèn)處刪除;

3.打印未確認(rèn)的消息。

使用一個哈希表存儲消息,它的優(yōu)點:

可以將需要和消息進(jìn)行關(guān)聯(lián);輕松批量刪除條目;支持高并發(fā)。

ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/**
 * @Description 異步發(fā)布確認(rèn),處理未發(fā)布成功的消息
 * @date 2022/3/7 18:09
 */
public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async_remember";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 產(chǎn)生隊列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 開啟確認(rèn)發(fā)布
        channel.confirmSelect();
        // 線程安全有序的一個hash表,適用與高并發(fā)
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
        // 記錄開始時間
        long beginTime = System.currentTimeMillis();
        // 確認(rèn)成功回調(diào)
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在發(fā)布成功確認(rèn)處刪除;
            // 批量刪除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 單獨刪除
                map.remove(deliveryTab);
            }
            System.out.println("確認(rèn)成功消息:" + deliveryTab);
        };
        // 確認(rèn)失敗回調(diào)
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 打印未確認(rèn)的消息。
            System.out.println("未確認(rèn)的消息:" + map.get(deliveryTab) + ",標(biāo)記:" + deliveryTab);
        };
        // 消息監(jiān)聽器
        /**
         * addConfirmListener:
         *                  1. 確認(rèn)成功的消息;
         *                  2. 確認(rèn)失敗的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 記錄要發(fā)送的全部消息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }

        // 記錄結(jié)束時間
        long endTime = System.currentTimeMillis();
        System.out.println("發(fā)送" + MESSAGE_COUNT + "條消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

總結(jié)

顯然來說,異步處理除了在編碼處有些麻煩,在處理時間效率和可用性上都是比單處理和批處理好很多。

本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!    

相關(guān)文章

  • SpringBoot AOP使用筆記

    SpringBoot AOP使用筆記

    今天小編就為大家分享一篇關(guān)于SpringBoot AOP使用筆記,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • 教你用Springboot實現(xiàn)攔截器獲取header內(nèi)容

    教你用Springboot實現(xiàn)攔截器獲取header內(nèi)容

    項目中遇到一個需求,對接上游系統(tǒng)是涉及到需要增加請求頭,請求頭的信息是動態(tài)獲取的,需要動態(tài)從下游拿到之后轉(zhuǎn)給上游,文中非常詳細(xì)的介紹了該需求的實現(xiàn),需要的朋友可以參考下
    2021-05-05
  • Java實現(xiàn)多項式乘法代碼實例

    Java實現(xiàn)多項式乘法代碼實例

    今天小編就為大家分享一篇關(guān)于Java實現(xiàn)多項式乘法代碼實例,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-10-10
  • Java的靜態(tài)方法Arrays.asList()使用指南

    Java的靜態(tài)方法Arrays.asList()使用指南

    Arrays.asList() 是一個 Java 的靜態(tài)方法,它可以把一個數(shù)組或者多個參數(shù)轉(zhuǎn)換成一個 List 集合,這個方法可以作為數(shù)組和集合之間的橋梁,方便我們使用集合的一些方法和特性,本文將介紹 Arrays.asList() 的語法、應(yīng)用場景、坑點和總結(jié)
    2023-09-09
  • Java?MapStruct優(yōu)雅地實現(xiàn)對象轉(zhuǎn)換

    Java?MapStruct優(yōu)雅地實現(xiàn)對象轉(zhuǎn)換

    MapSturct?是一個生成類型安全,高性能且無依賴的?JavaBean?映射代碼的注解處理器,用它可以輕松實現(xiàn)對象轉(zhuǎn)換,下面就來和大家聊聊具體操作吧
    2023-06-06
  • java property配置文件管理工具框架過程詳解

    java property配置文件管理工具框架過程詳解

    這篇文章主要介紹了java property配置文件管理工具框架過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • 淺析java快速排序算法

    淺析java快速排序算法

    這篇文章主要介紹了淺析java快速排序算法,需要的朋友可以參考下
    2015-02-02
  • Springboot實現(xiàn)多文件上傳代碼解析

    Springboot實現(xiàn)多文件上傳代碼解析

    這篇文章主要介紹了Springboot實現(xiàn)多文件上傳代碼解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-04-04
  • Java Map遍歷2種實現(xiàn)方法代碼實例

    Java Map遍歷2種實現(xiàn)方法代碼實例

    這篇文章主要介紹了Java Map遍歷2種實現(xiàn)方法代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-10-10
  • 手把手寫Spring框架

    手把手寫Spring框架

    Spring是于2003 年興起的一個輕量級的Java 開發(fā)框架,由Rod Johnson創(chuàng)建。簡單來說,Spring是一個分層的JavaSE/EE full-stack(一站式) 輕量級開源框架
    2021-08-08

最新評論