" />

亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java RabbitMQ的工作隊(duì)列與消息應(yīng)答詳解

 更新時(shí)間:2022年03月08日 14:47:32   作者:江海i  
這篇文章主要為大家詳細(xì)介紹了Python實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助

Work Queues

工作隊(duì)列(任務(wù)隊(duì)列)主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成,相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。

其實(shí)就是生產(chǎn)者發(fā)送大量的消息,發(fā)送到隊(duì)列之后,由多個(gè)消費(fèi)者(工作線程)來(lái)處理消息,并且每個(gè)消息只能被處理一次。

在這里插入圖片描述

1. 輪詢分發(fā)消息

多個(gè)工作線程按照次序每來(lái)一個(gè)消息執(zhí)行一次。

1.1 抽取工具類

直接通過(guò)信息獲取信道

/**
 * @Description RabbitMQ工具類
 * @date 2022/3/5 10:02
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

1.2 編寫(xiě)兩個(gè)工作線程

Work2和Work1代碼沒(méi)有區(qū)別,只需要對(duì)它做出區(qū)分即可。

public class Worker1 {
    // 指定隊(duì)列名稱
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        // 獲取信道
        Channel channel = RabbitMQUtils.getChannel();

        // 聲明:接收消息回調(diào)
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("工作線程01:"+ new String(message.getBody()));
        };

        // 聲明:取消消費(fèi)回調(diào)
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("工作線程01取消接收:"+consumerTag);
        };

        System.out.println("工作線程01啟動(dòng)完成......");

        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

1.3 編寫(xiě)生產(chǎn)者

public class Producer {

    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();


        // 產(chǎn)生隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,true,null);

        // 消息體
        Scanner scanner = new Scanner(System.in);
        int i = 1;
        while (scanner.hasNext()){
            String msg = scanner.next();
            msg = msg + i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("發(fā)送成功:" + msg);
        }

        System.out.println("----------==========發(fā)送完畢==========----------");
    }

}

1.4 運(yùn)行測(cè)試

先啟動(dòng)兩個(gè)工作線程,再啟動(dòng)生產(chǎn)者。

出現(xiàn)404異常請(qǐng)參考下方1.6

生產(chǎn)者發(fā)送情況:

在這里插入圖片描述

輪詢狀態(tài)下兩個(gè)工作隊(duì)列接收狀態(tài):

在這里插入圖片描述

在這里插入圖片描述

1.5 異常情況

在先啟動(dòng)兩個(gè)消費(fèi)者線程時(shí),會(huì)提示404找不到隊(duì)列。

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)

發(fā)生這個(gè)情況的原因很顯然是因?yàn)橄葐?dòng)了消費(fèi)者,但是在RabbitMQ中沒(méi)有創(chuàng)建相對(duì)應(yīng)的隊(duì)列名稱,解決方法可以:

1.先啟動(dòng)生產(chǎn)者創(chuàng)建隊(duì)列(也可以在RabbitMQ中創(chuàng)建隊(duì)列);

2.再啟動(dòng)消費(fèi)者就不會(huì)產(chǎn)生這個(gè)錯(cuò)誤;

3.再在生產(chǎn)者中使用Scanner類去發(fā)送消息測(cè)試。

2. 消息應(yīng)答

消費(fèi)者在接收到消息并且處理該消息之后,告訴RabbitMQ它已經(jīng)處理了,RabbitMQ可以刪除消息。其目的就是為了保護(hù)消息在被處理之前不會(huì)消失。

2.1 自動(dòng)應(yīng)答

這種方式發(fā)送后就被認(rèn)定為已經(jīng)傳送成功,所以在消息接收到之前消費(fèi)者的連接或者channel關(guān)閉,那么這個(gè)消息就會(huì)丟失。其特點(diǎn)是消費(fèi)者可以傳遞過(guò)載的消息,對(duì)傳遞的消息沒(méi)有限制,但如果因內(nèi)存耗盡消費(fèi)者線程被系統(tǒng)殺死,就會(huì)使得多條消息丟失。所以這個(gè)模式需要在數(shù)據(jù)安全性和吞吐量之間選擇,適合使用在消費(fèi)者可以高效并以某種速率能夠處理這些消息的情況下使用。

所以自動(dòng)應(yīng)答的方式局限性很高。

2.2 手動(dòng)應(yīng)答

優(yōu)點(diǎn):可以批量應(yīng)答和減少網(wǎng)絡(luò)擁擠。

1.channel.basicAck(long deliveryTag, boolean multiple);:肯應(yīng)應(yīng)答,處理完消息之后提醒RabbitMQ可以刪除當(dāng)前隊(duì)列,deliveryTag:當(dāng)前隊(duì)列中選中的消息;multiple:是否批量應(yīng)答。

2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定應(yīng)答,

3.channel.basicReject(long deliveryTag, boolean requeue):否定并且拒絕應(yīng)答。

2.3 消息自動(dòng)重新入隊(duì)

如果消費(fèi)者因?yàn)橐恍┰蚴チ藢?duì)RabbitMQ的連接,導(dǎo)致沒(méi)有發(fā)送ACK確認(rèn),RabbitMQ就會(huì)對(duì)該消息進(jìn)行重新排隊(duì),并且分發(fā)給可以處理該消息的消費(fèi)者,所以即使某個(gè)消費(fèi)者死亡,也可以保證消息不會(huì)丟失。

2.4 手動(dòng)應(yīng)答測(cè)試

測(cè)試目的:在手動(dòng)應(yīng)答狀態(tài)下不會(huì)發(fā)生消息丟失的情況。

測(cè)試方法:

1.創(chuàng)建兩個(gè)消費(fèi)者;

2.使用工具類使線程睡眠一定時(shí)間;

3.在睡眠時(shí)關(guān)閉線程,看能否自動(dòng)重新入隊(duì)。

2.4.1 生產(chǎn)者代碼

/**
 * @Description 手動(dòng)應(yīng)答生產(chǎn)者
 * @date 2022/3/5 19:03
 */
public class Producer1 {

    // 指定隊(duì)列名
    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("發(fā)送消息:'" + msg + "'成功");
        }
    }
}

2.4.2 消費(fèi)者代碼

/**
 * @Description 手動(dòng)應(yīng)答消費(fèi)者1
 * @date 2022/3/5 19:17
 */
public class Worker1 {

    private static final String TASK_QUEUE_RES = "queue_res";

    public static void main(String[] args)  throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("線程A等待接收......");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模擬并發(fā)沉睡一秒
            try {
                Thread.sleep(1000);
                System.out.println("線程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                /**
                 * basicAck:
                 *          1. 消息標(biāo)記
                 *          2. 是否批量
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)");
                });

    }
}

Worker2類和1區(qū)別不大,將名稱改成B再將睡眠事件改成30即可。

2.4.3 測(cè)試

測(cè)試方法:

1.先啟動(dòng)生產(chǎn)者創(chuàng)建隊(duì)列;

2.啟動(dòng)兩個(gè)消費(fèi)者接收消息;

3.因?yàn)槭禽喸兎绞?,所以A線程接收之后肯定是B線程接收,在睡眠時(shí)關(guān)閉B線程,如果A線程接收到說(shuō)明測(cè)試成功。

發(fā)送消息:

在這里插入圖片描述

線程A接收:

在這里插入圖片描述

再發(fā)送消息:

在這里插入圖片描述

關(guān)閉線程B線程A接收到消息:

在這里插入圖片描述

測(cè)試成功!

總結(jié)

本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!   

相關(guān)文章

  • Apache Commons Math3探索之快速傅立葉變換代碼示例

    Apache Commons Math3探索之快速傅立葉變換代碼示例

    這篇文章主要介紹了Apache Commons Math3探索之快速傅立葉變換代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • Java并發(fā)編程Semaphore計(jì)數(shù)信號(hào)量詳解

    Java并發(fā)編程Semaphore計(jì)數(shù)信號(hào)量詳解

    這篇文章主要介紹了Java并發(fā)編程Semaphore計(jì)數(shù)信號(hào)量詳解,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • 基于Java?GUI?事件處理方式

    基于Java?GUI?事件處理方式

    這篇文章主要介紹了基于Java?GUI?事件處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 如何避免在Java?中使用雙括號(hào)初始化

    如何避免在Java?中使用雙括號(hào)初始化

    這篇文章主要介紹了如何避免在Java中使用雙括號(hào)初始化,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-07-07
  • 解決SpringMVC攔截器path路徑的坑

    解決SpringMVC攔截器path路徑的坑

    這篇文章主要介紹了解決SpringMVC攔截器path路徑的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • java異常處理攔截器詳情

    java異常處理攔截器詳情

    這篇文章主要介紹了java異常處理攔截器,使用異常處理攔截器,可以不用寫(xiě)那么多try…catch…,下面就來(lái)學(xué)習(xí)關(guān)于java異常處理攔截器的詳情內(nèi)容吧,需要的朋友可以參考一下
    2021-10-10
  • Java中Map的computeIfAbsent方法詳解

    Java中Map的computeIfAbsent方法詳解

    這篇文章主要介紹了Java的Map中computeIfAbsent方法詳解,在jdk1.8中Map接口新增了一個(gè)computeIfAbsent方法,這是Map接口中的默認(rèn)實(shí)現(xiàn)該方法是首先判斷緩存Map中是否存在指定的key的值,如果不存在,會(huì)調(diào)用mappingFunction(key)計(jì)算key的value,需要的朋友可以參考下
    2023-11-11
  • Java超全面梳理內(nèi)部類的使用

    Java超全面梳理內(nèi)部類的使用

    說(shuō)起內(nèi)部類這個(gè)詞,想必很多人都不陌生,但是又會(huì)覺(jué)得不熟悉。原因是平時(shí)編寫(xiě)代碼時(shí)可能用到的場(chǎng)景不多,用得最多的是在有事件監(jiān)聽(tīng)的情況下,并且即使用到也很少去總結(jié)內(nèi)部類的用法。今天我們就來(lái)一探究竟
    2022-04-04
  • Java?C++分別實(shí)現(xiàn)滑動(dòng)窗口的最大值

    Java?C++分別實(shí)現(xiàn)滑動(dòng)窗口的最大值

    這篇文章主要介紹了分別通過(guò)Java和C++實(shí)現(xiàn)滑動(dòng)窗口最大值,即給定一個(gè)數(shù)組?nums?和滑動(dòng)窗口的大小?k,請(qǐng)找出所有滑動(dòng)窗口里的最大值。感興趣的可以了解一下
    2021-12-12
  • java設(shè)計(jì)模式之中介者模式

    java設(shè)計(jì)模式之中介者模式

    這篇文章主要為大家詳細(xì)介紹了java設(shè)計(jì)模式之中介者模式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-08-08

最新評(píng)論