亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ工作模式中的發(fā)布確認模式示例詳解

 更新時間:2025年05月14日 11:46:32   作者:新綠MEHO  
發(fā)布確認模式用于確保消息已經被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化,本文通過實例代碼給大家介紹RabbitMQ工作模式之發(fā)布確認模式,感興趣的朋友一起看看吧

發(fā)布確認模式

概述

發(fā)布確認模式用于確保消息已經被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化。通過使用發(fā)布確認,生產者可以獲得對消息的可靠性保證,避免消息丟失。這一機制基于通道(Channel)級別,通過兩個階段的確認來保證消息的可靠性。

消息丟失問題

作為消息中間件, 都會?臨消息丟失的問題.
消息丟失?概分為三種情況:

1. ?產者問題. 因為應?程序故障, ?絡抖動等各種原因, ?產者沒有成功向broker發(fā)送消息.
2. 消息中間件??問題. ?產者成功發(fā)送給了Broker, 但是Broker沒有把消息保存好, 導致消息丟失.
3. 消費者問題. Broker 發(fā)送消息到消費者, 消費者在消費消息時, 因為沒有處理好, 導致broker將消費失敗的消息從隊列中刪除了。

RabbitMQ也對上述問題給出了相應的解決?案. 問題2可以通過持久化機制. 問題3可以采?消息應答機制.
針對問題1, 可以采?發(fā)布確認(Publisher Confirms)機制實現(xiàn). 

發(fā)布確認的三種模式

RabbitMQ的發(fā)布確認模式主要有三種形式:單條確認、批量確認和異步確認。

單條確認(Single Publisher Confirm)

特點:在發(fā)布一條消息后,等待服務器確認該消息是否成功接收。
優(yōu)點:實現(xiàn)簡單,每條消息的確認狀態(tài)清晰。
缺點:性能開銷較大,特別是在高并發(fā)的場景下,因為每條消息都需要等待服務器的確認。

批量確認(Batch Publisher Confirm)

特點:允許在一次性確認多個消息是否成功被服務器接收。
優(yōu)點:在大量消息的場景中可以提高效率,因為可以減少確認消息的數(shù)量。
缺點:當一批消息中有一條消息發(fā)送失敗時,整個批量確認失敗。此時需要重新發(fā)送整批消息,但不知道是哪條消息發(fā)送失敗,增加了調試和處理的難度。

異步確認(Asynchronous Confirm)

特點:通過回調函數(shù)處理消息的確認和未確認事件,更加靈活。
優(yōu)點:在異步場景中能夠更好地處理消息的狀態(tài),提高了系統(tǒng)的并發(fā)性能和響應速度。
缺點:實現(xiàn)相對復雜,需要處理回調函數(shù)的邏輯和狀態(tài)管理。

實現(xiàn)步驟

1.設置通道為發(fā)布確認模式:在生產者發(fā)送消息之前,需要將通道設置為發(fā)布確認模式。這可以通過調用channel.confirmSelect()方法來實現(xiàn)。
2.發(fā)送消息并等待確認:生產者發(fā)送消息時,每條消息都會分配一個唯一的、遞增的整數(shù)ID(DeliveryTag)。生產者可以通過調用channel.waitForConfirms()方法來等待所有已發(fā)送消息的確認,或者通過其他方式處理確認回調。
3.處理確認回調:為了處理確認回調,需要創(chuàng)建一個ConfirmCallback接口的實現(xiàn)。在實現(xiàn)的handleAck()方法中,可以處理成功接收到確認的消息的邏輯;在handleNack()方法中,可以處理未成功接收到確認的消息的邏輯。

應用場景

發(fā)布確認模式適用于對數(shù)據(jù)安全性要求較高的場景,如金融交易、訂單處理等。在這些場景中,消息的丟失或重復都可能導致嚴重的業(yè)務問題。通過使用發(fā)布確認模式,可以確保消息被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化,從而提高了系統(tǒng)的可靠性和穩(wěn)定性。

代碼案例

引入依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.21.0</version>
</dependency>

常量類

public class Constants {
    public static final String HOST = "47.98.109.138";
    public static final int PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "aaa";
    //publisher confirms
    public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
    public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
    public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
}

單條確認

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 100;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
        connectionFactory.setUsername(Constants.USER_NAME);//賬號
        connectionFactory.setPassword(Constants.PASSWORD);  //密碼
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機
        return connectionFactory.newConnection();
    }
    public static void main(String[] args) throws Exception {
        //Strategy #1: Publishing Messages Individually
        //單獨確認
        publishingMessagesIndividually();
    }
    /**
     * 單獨確認
     */
    private static void publishingMessagesIndividually() throws Exception {
        try(Connection connection = createConnection()) {
            //1. 開啟信道
            Channel channel = connection.createChannel();
            //2. 設置信道為confirm模式
            channel.confirmSelect();
            //3. 聲明隊列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            //4. 發(fā)送消息, 并等待確認
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待確認
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("單獨確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
}

運行代碼

我們可以看到,以發(fā)送消息條數(shù)為100條為例,單條確認模式是非常耗時的。 

批量確認

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 10000;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
        connectionFactory.setUsername(Constants.USER_NAME);//賬號
        connectionFactory.setPassword(Constants.PASSWORD);  //密碼
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機
        return connectionFactory.newConnection();
    }
    public static void main(String[] args) throws Exception {
        //Strategy #2: Publishing Messages in Batches
        //批量確認
        publishingMessagesInBatches();
    }
    /**
     * 批量確認
     * @throws Exception
     */
    private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()) {
            //1. 開啟信道
            Channel channel = connection.createChannel();
            //2. 設置信道為confirm模式
            channel.confirmSelect();
            //3. 聲明隊列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 發(fā)送消息, 并進行確認
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
}

運行代碼

我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認模式是比較快的。 

異步確認

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 10000;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前開放端口號
        connectionFactory.setUsername(Constants.USER_NAME);//賬號
        connectionFactory.setPassword(Constants.PASSWORD);  //密碼
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機
        return connectionFactory.newConnection();
    }
    public static void main(String[] args) throws Exception {
        //Strategy #3: Handling Publisher Confirms Asynchronously
        //異步確認
        handlingPublisherConfirmsAsynchronously();
    }
    /**
     * 異步確認
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1. 開啟信道
            Channel channel = connection.createChannel();
            //2. 設置信道為confirm模式
            channel.confirmSelect();
            //3. 聲明隊列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            //4. 監(jiān)聽confirm
            //集合中存儲的是未確認的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //業(yè)務需要根據(jù)實際場景進行處理, 比如重發(fā), 此處代碼省略
                }
            });
            //5. 發(fā)送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("異步確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
}

運行代碼

我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認模式是非??斓?。 

對比批量確認和異步確認模式

我們可以看到,異步確認模式是比批量確認模式快很多的。

到此這篇關于RabbitMQ工作模式之發(fā)布確認模式的文章就介紹到這了,更多相關RabbitMQ發(fā)布確認模式內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot中?Jackson?日期的時區(qū)和日期格式問題解決

    SpringBoot中?Jackson?日期的時區(qū)和日期格式問題解決

    因為最近項目需要國際化,需要能夠支持多種國際化語言,目前需要支持三種(法語、英語、簡體中文),這篇文章主要介紹了SpringBoot中?Jackson?日期的時區(qū)和日期格式問題,需要的朋友可以參考下
    2022-12-12
  • 淺談java線程join方法使用方法

    淺談java線程join方法使用方法

    這篇文章主要介紹了淺談java線程join方法使用方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-09-09
  • Java?FTP協(xié)議實現(xiàn)文件下載功能

    Java?FTP協(xié)議實現(xiàn)文件下載功能

    FTP(File?Transfer?Protocol)就是文件傳輸協(xié)議。通過FTP客戶端從遠程FTP服務器上拷貝文件到本地計算機稱為下載,將本地計算機上的文件復制到遠程FTP服務器上稱為上傳,上傳和下載是FTP最常用的兩個功能
    2022-11-11
  • Java Synchronized的偏向鎖詳細分析

    Java Synchronized的偏向鎖詳細分析

    synchronized作為Java程序員最常用同步工具,很多人卻對它的用法和實現(xiàn)原理一知半解,以至于還有不少人認為synchronized是重量級鎖,性能較差,盡量少用。但不可否認的是synchronized依然是并發(fā)首選工具,本文就來詳細講講
    2023-04-04
  • SpringBoot集成本地緩存性能之王Caffeine示例詳解

    SpringBoot集成本地緩存性能之王Caffeine示例詳解

    這篇文章主要為大家介紹了SpringBoot集成本地緩存性能之王Caffeine的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-07-07
  • Java中零拷貝和深拷貝的原理及實現(xiàn)探究(代碼示例)

    Java中零拷貝和深拷貝的原理及實現(xiàn)探究(代碼示例)

    深拷貝和零拷貝是兩個在 Java 中廣泛使用的概念,它們分別用于對象復制和數(shù)據(jù)傳輸優(yōu)化,下面將詳細介紹這兩個概念的原理,并給出相應的 Java 代碼示例,感興趣的朋友一起看看吧
    2023-12-12
  • 淺談二分法查找和原始算法查找的效率對比

    淺談二分法查找和原始算法查找的效率對比

    這篇文章主要介紹了淺談二分法查找和原始算法查找的效率對比,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • Java 為什么要避免使用finalizer和Cleaner

    Java 為什么要避免使用finalizer和Cleaner

    這篇文章主要介紹了Java 為什么要避免使用finalizer和Cleaner,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下
    2021-03-03
  • myeclipse無法部署項目問題及解決方法

    myeclipse無法部署項目問題及解決方法

    最近小編遇到到棘手的問題,myeclipse無法部署項目,點擊這個部署按鈕沒有反應。怎么解決呢,下面小編給大家代理的myeclipse無法部署項目問題及解決方法 ,感興趣的朋友一起看看吧
    2018-10-10
  • Spring 校驗(validator,JSR-303)簡單實現(xiàn)方式

    Spring 校驗(validator,JSR-303)簡單實現(xiàn)方式

    這篇文章主要介紹了Spring 校驗(validator,JSR-303)簡單實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10

最新評論