RabbitMQ工作模式中的發(fā)布確認模式示例詳解
發(fā)布確認模式
概述
發(fā)布確認模式用于確保消息已經被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化。通過使用發(fā)布確認,生產者可以獲得對消息的可靠性保證,避免消息丟失。這一機制基于通道(Channel)級別,通過兩個階段的確認來保證消息的可靠性。
消息丟失問題
作為消息中間件, 都會?臨消息丟失的問題.
消息丟失?概分為三種情況:
1. ?產者問題. 因為應?程序故障, ?絡抖動等各種原因, ?產者沒有成功向broker發(fā)送消息.
2. 消息中間件??問題. ?產者成功發(fā)送給了Broker, 但是Broker沒有把消息保存好, 導致消息丟失.
3. 消費者問題. Broker 發(fā)送消息到消費者, 消費者在消費消息時, 因為沒有處理好, 導致broker將消費失敗的消息從隊列中刪除了。
RabbitMQ也對上述問題給出了相應的解決?案. 問題2可以通過持久化機制. 問題3可以采?消息應答機制.
針對問題1, 可以采?發(fā)布確認(Publisher Confirms)機制實現(xiàn).
發(fā)布確認的三種模式
RabbitMQ的發(fā)布確認模式主要有三種形式:單條確認、批量確認和異步確認。
單條確認(Single Publisher Confirm)
特點:在發(fā)布一條消息后,等待服務器確認該消息是否成功接收。
優(yōu)點:實現(xiàn)簡單,每條消息的確認狀態(tài)清晰。
缺點:性能開銷較大,特別是在高并發(fā)的場景下,因為每條消息都需要等待服務器的確認。
批量確認(Batch Publisher Confirm)
特點:允許在一次性確認多個消息是否成功被服務器接收。
優(yōu)點:在大量消息的場景中可以提高效率,因為可以減少確認消息的數(shù)量。
缺點:當一批消息中有一條消息發(fā)送失敗時,整個批量確認失敗。此時需要重新發(fā)送整批消息,但不知道是哪條消息發(fā)送失敗,增加了調試和處理的難度。
異步確認(Asynchronous Confirm)
特點:通過回調函數(shù)處理消息的確認和未確認事件,更加靈活。
優(yōu)點:在異步場景中能夠更好地處理消息的狀態(tài),提高了系統(tǒng)的并發(fā)性能和響應速度。
缺點:實現(xiàn)相對復雜,需要處理回調函數(shù)的邏輯和狀態(tài)管理。
實現(xiàn)步驟
1.設置通道為發(fā)布確認模式:在生產者發(fā)送消息之前,需要將通道設置為發(fā)布確認模式。這可以通過調用channel.confirmSelect()方法來實現(xiàn)。
2.發(fā)送消息并等待確認:生產者發(fā)送消息時,每條消息都會分配一個唯一的、遞增的整數(shù)ID(DeliveryTag)。生產者可以通過調用channel.waitForConfirms()方法來等待所有已發(fā)送消息的確認,或者通過其他方式處理確認回調。
3.處理確認回調:為了處理確認回調,需要創(chuàng)建一個ConfirmCallback接口的實現(xiàn)。在實現(xiàn)的handleAck()方法中,可以處理成功接收到確認的消息的邏輯;在handleNack()方法中,可以處理未成功接收到確認的消息的邏輯。
應用場景
發(fā)布確認模式適用于對數(shù)據(jù)安全性要求較高的場景,如金融交易、訂單處理等。在這些場景中,消息的丟失或重復都可能導致嚴重的業(yè)務問題。通過使用發(fā)布確認模式,可以確保消息被正確地發(fā)送到RabbitMQ服務器,并被成功接收和持久化,從而提高了系統(tǒng)的可靠性和穩(wěn)定性。
代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量類
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //publisher confirms public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1"; public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2"; public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3"; }
單條確認
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 100; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前開放端口號 connectionFactory.setUsername(Constants.USER_NAME);//賬號 connectionFactory.setPassword(Constants.PASSWORD); //密碼 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #1: Publishing Messages Individually //單獨確認 publishingMessagesIndividually(); } /** * 單獨確認 */ private static void publishingMessagesIndividually() throws Exception { try(Connection connection = createConnection()) { //1. 開啟信道 Channel channel = connection.createChannel(); //2. 設置信道為confirm模式 channel.confirmSelect(); //3. 聲明隊列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null); //4. 發(fā)送消息, 并等待確認 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); //等待確認 channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("單獨確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start); } } }
運行代碼
我們可以看到,以發(fā)送消息條數(shù)為100條為例,單條確認模式是非常耗時的。
批量確認
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前開放端口號 connectionFactory.setUsername(Constants.USER_NAME);//賬號 connectionFactory.setPassword(Constants.PASSWORD); //密碼 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #2: Publishing Messages in Batches //批量確認 publishingMessagesInBatches(); } /** * 批量確認 * @throws Exception */ private static void publishingMessagesInBatches() throws Exception{ try(Connection connection = createConnection()) { //1. 開啟信道 Channel channel = connection.createChannel(); //2. 設置信道為confirm模式 channel.confirmSelect(); //3. 聲明隊列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); //4. 發(fā)送消息, 并進行確認 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; if (outstandingMessageCount==batchSize){ channel.waitForConfirmsOrDie(5000); outstandingMessageCount = 0; } } if (outstandingMessageCount>0){ channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("批量確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start); } } }
運行代碼
我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認模式是比較快的。
異步確認
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前開放端口號 connectionFactory.setUsername(Constants.USER_NAME);//賬號 connectionFactory.setPassword(Constants.PASSWORD); //密碼 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #3: Handling Publisher Confirms Asynchronously //異步確認 handlingPublisherConfirmsAsynchronously(); } /** * 異步確認 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception{ try (Connection connection = createConnection()){ //1. 開啟信道 Channel channel = connection.createChannel(); //2. 設置信道為confirm模式 channel.confirmSelect(); //3. 聲明隊列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); //4. 監(jiān)聽confirm //集合中存儲的是未確認的消息ID long start = System.currentTimeMillis(); SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } //業(yè)務需要根據(jù)實際場景進行處理, 比如重發(fā), 此處代碼省略 } }); //5. 發(fā)送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); } while (!confirmSeqNo.isEmpty()){ Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("異步確認策略, 消息條數(shù): %d, 耗時: %d ms \n",MESSAGE_COUNT, end-start); } } }
運行代碼
我們可以看到,以發(fā)送消息條數(shù)為10000條為例,單條確認模式是非??斓?。
對比批量確認和異步確認模式
我們可以看到,異步確認模式是比批量確認模式快很多的。
到此這篇關于RabbitMQ工作模式之發(fā)布確認模式的文章就介紹到這了,更多相關RabbitMQ發(fā)布確認模式內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot中?Jackson?日期的時區(qū)和日期格式問題解決
因為最近項目需要國際化,需要能夠支持多種國際化語言,目前需要支持三種(法語、英語、簡體中文),這篇文章主要介紹了SpringBoot中?Jackson?日期的時區(qū)和日期格式問題,需要的朋友可以參考下2022-12-12SpringBoot集成本地緩存性能之王Caffeine示例詳解
這篇文章主要為大家介紹了SpringBoot集成本地緩存性能之王Caffeine的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-07-07Java中零拷貝和深拷貝的原理及實現(xiàn)探究(代碼示例)
深拷貝和零拷貝是兩個在 Java 中廣泛使用的概念,它們分別用于對象復制和數(shù)據(jù)傳輸優(yōu)化,下面將詳細介紹這兩個概念的原理,并給出相應的 Java 代碼示例,感興趣的朋友一起看看吧2023-12-12Java 為什么要避免使用finalizer和Cleaner
這篇文章主要介紹了Java 為什么要避免使用finalizer和Cleaner,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下2021-03-03Spring 校驗(validator,JSR-303)簡單實現(xiàn)方式
這篇文章主要介紹了Spring 校驗(validator,JSR-303)簡單實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10