亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解

 更新時間:2023年12月12日 10:12:31   作者:warybee  
這篇文章主要介紹了RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動ack ,可以通過一個 api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack 一把,需要的朋友可以參考下

1. 概述

如果消費(fèi)端在你消費(fèi)的時候,剛消費(fèi)到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認(rèn)為你都消費(fèi)了,這數(shù)據(jù)就丟了。這個時候得用 RabbitMQ 提供的 ack 機(jī)制,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動ack ,可以通過一個 api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認(rèn)為你還沒處理完,這個時候 RabbitMQ 會把這個消費(fèi)分配給別的 consumer 去處理,消息是不會丟的。

生產(chǎn)端消息可靠性保證可以使用RabbitMQ的confirm機(jī)制。

2. ACK機(jī)制與消費(fèi)端消息補(bǔ)償機(jī)制

把channel.basicConsume(...)方法的autoAck參數(shù)改為false

channel.basicAck(long deliveryTag, boolean multiple);方法,消費(fèi)成功簽收

參數(shù)說明:

  • deliveryTag:消息標(biāo)識
  • multiple:是否批量簽收

basicNack(long deliveryTag, boolean multiple, boolean requeue) ,消息消費(fèi)失敗

參數(shù)說明:

  • deliveryTag:消息標(biāo)識
  • multiple:是否批量簽收
  • requeue:true 消息會重回隊列,false 消息會進(jìn)入到死信隊列

3. 代碼演示

生產(chǎn)端

 public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設(shè)置虛擬主機(jī)
        connectionFactory.setVirtualHost("/");
        //創(chuàng)建一個鏈接
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String routeKey="ack.test";
        for (int i=0;i<5;i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            //演示重回隊列機(jī)制,使用num==0的消息簽收失敗重回隊列
            headers.put("num", i);
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg="RabbitMQ send message ack test!"+i;
            channel.basicPublish(exchangeName,routeKey,properties,msg.getBytes());
        }
    }

消息端

public static void main(String[] args) throws  Exception{
        System.out.println("======消息接收start==========");
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設(shè)置虛擬主機(jī)
        connectionFactory.setVirtualHost("/");
        //創(chuàng)建鏈接
        Connection connection = connectionFactory.newConnection();
        //創(chuàng)建channel
        Channel channel = connection.createChannel();
        String exchangeName="test_ack_exchange";
        String exchangeType="topic";
        //聲明Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        String queueName="test_ack_queue";
        //聲明隊列
        channel.queueDeclare(queueName,true,false,false,null);
        String routeKey="ack.#";
        //綁定隊列和交換機(jī)
        channel.queueBind(queueName,exchangeName,routeKey);
        /**
         * autoAck:false  設(shè)置為手工簽收
         */
        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息::"+new String(body));
                    try {
                        Thread.sleep(3000); //休眠5秒
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //演示重回隊列機(jī)制,使用num==0的消息簽收失敗重回隊列
                    if((Integer)properties.getHeaders().get("num") == 0) {
                        /**
                         * 參數(shù)說明:1、消息標(biāo)識  2、是否批量簽收  3、是否重回隊列
                         */
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    } else {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            });
    }

運(yùn)行代碼以上后,由于在消費(fèi)端,設(shè)置了第一條消息,簽收失敗重回隊列,在RabbitMQ控制臺中我們可以看到始終有一條消息未簽收確認(rèn)

在這里插入圖片描述

到此這篇關(guān)于RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解的文章就介紹到這了,更多相關(guān)RabbitMQ的ACK確認(rèn)機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 深入了解Java核心類庫--Arrays類

    深入了解Java核心類庫--Arrays類

    這篇文章主要為大家詳細(xì)介紹了java Arrays類定義與使用的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能給你帶來幫助
    2021-07-07
  • Java多線程之FutureTask的介紹及使用

    Java多線程之FutureTask的介紹及使用

    之前介紹了線程池相關(guān)的對象,Runable Callable與Future,下面介紹FutureTask的作用,它的特性是怎樣的呢,需要的朋友可以參考下
    2021-06-06
  • Spring Cloud Admin健康檢查 郵件、釘釘群通知的實(shí)現(xiàn)

    Spring Cloud Admin健康檢查 郵件、釘釘群通知的實(shí)現(xiàn)

    這篇文章主要介紹了Spring Cloud Admin健康檢查 郵件、釘釘群通知的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • 如何使用 IntelliJ IDEA 編寫 Spark 應(yīng)用程序(Scala + Maven)

    如何使用 IntelliJ IDEA 編寫 Spark 應(yīng)用程序(Sc

    本教程展示了如何在IntelliJIDEA中使用Maven編寫和運(yùn)行一個簡單的Spark應(yīng)用程序(例如WordCount程序),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-11-11
  • spring cloud如何修復(fù)zuul跨域配置異常的問題

    spring cloud如何修復(fù)zuul跨域配置異常的問題

    最近的開發(fā)過程中,使用spring集成了spring-cloud-zuul,在配置zuul跨域的時候遇到了問題,下面這篇文章主要給大家介紹了關(guān)于spring cloud如何修復(fù)zuul跨域配置異常的問題,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-09-09
  • SpringBoot通過注解注入Bean的幾種方式解析

    SpringBoot通過注解注入Bean的幾種方式解析

    這篇文章主要為大家介紹了SpringBoot注入Bean的幾種方式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-03-03
  • Java調(diào)用Shell命令的方法

    Java調(diào)用Shell命令的方法

    這篇文章主要介紹了Java調(diào)用Shell命令的方法,實(shí)例分析了java調(diào)用shell命令的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • java讀取用戶登入退出日志信息上傳服務(wù)端

    java讀取用戶登入退出日志信息上傳服務(wù)端

    這篇文章主要介紹了java讀取用戶登入退出日志信息上傳服務(wù)端的相關(guān)資料,需要的朋友可以參考下
    2016-05-05
  • Java IO流深入理解

    Java IO流深入理解

    這篇文章主要介紹了java IO流的深入理解,下面和小編來一起學(xué)習(xí)一下吧,希望能給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容
    2021-07-07
  • Activiti如何啟動流程并使流程前進(jìn)

    Activiti如何啟動流程并使流程前進(jìn)

    這篇文章主要介紹了Activiti如何啟動流程并使流程前進(jìn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-03-03

最新評論