RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解
1. 概述
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)問題啥的,都 有可能。此時(shí)可以開啟 confirm 模式,在生產(chǎn)者那里設(shè)置開啟 confirm 模式之后,你每次寫的消息都會(huì)分配一個(gè)唯一的 id,然后如果寫入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息,告訴你說這個(gè)消息 ok 了。如果RabbitMQ 沒能處理這個(gè)消息,會(huì)回調(diào)你的一個(gè) nack 接口,告訴你這個(gè)消息接收失敗,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài),如果超過一定時(shí)間還 沒接收到這個(gè)消息的回調(diào),那么你可以重發(fā)。

在實(shí)際項(xiàng)目中,可以利用這一機(jī)制保障消息的可靠性投遞,如果消息未發(fā)送成功,可以在監(jiān)聽事件中記錄日志、重新發(fā)送消息等操作。
2.原生API中開啟Confirm消息確認(rèn)機(jī)制
- 在生產(chǎn)者的channel上開啟確認(rèn)機(jī)制: channel.confirmSelect();
- 在channel上添加Confirm監(jiān)聽事件: channel.addConfirmListener(new ConfirmListener() ...
2.1 代碼演示
生產(chǎn)者代碼
監(jiān)聽事件的兩個(gè)方法:handleAck() 消息投遞成功后回調(diào),handleNack 消息未成功投遞回調(diào)
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/");
//創(chuàng)建一個(gè)鏈接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建channel
Channel channel = connection.createChannel();
//消息的確認(rèn)模式
channel.confirmSelect();
String exchangeName="test_confirm_exchange";
String routeKey="confirm.test";
String msg="RabbitMQ send message confirm test!";
for (int i=0;i<5;i++){
channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
}
//確定監(jiān)聽事件
channel.addConfirmListener(new ConfirmListener() {
/**
* 消息成功發(fā)送
* @param deliveryTag 消息唯一標(biāo)簽
* @param multiple 是否批量
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("**********Ack*********");
}
/**
* 消息沒有成功發(fā)送
* @param deliveryTag
* @param multiple
* @throws IOException
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("**********No Ack*********");
}
});
}消費(fèi)者端代碼
public static void main(String[] args) throws Exception{
System.out.println("======消息接收start==========");
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設(shè)置虛擬主機(jī)
connectionFactory.setVirtualHost("/");
//創(chuàng)建鏈接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建channel
Channel channel = connection.createChannel();
String exchangeName="test_confirm_exchange";
String exchangeType="topic";
//聲明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
String queueName="test_confirm_queue";
//聲明隊(duì)列
channel.queueDeclare(queueName,true,false,false,null);
String routeKey="confirm.#";
//綁定隊(duì)列和交換機(jī)
channel.queueBind(queueName,exchangeName,routeKey);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息::"+new String(body));
}
});
}到此這篇關(guān)于RabbitMQ中Confirm消息確認(rèn)機(jī)制保障生產(chǎn)端消息的可靠性詳解的文章就介紹到這了,更多相關(guān)Confirm消息確認(rèn)機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
圖文教程教你IDEA中的Spring環(huán)境搭建+簡單入門
這篇文章主要介紹了圖文教程教你IDEA中的Spring環(huán)境搭建+簡單入門,Spring的環(huán)境搭建使用Maven更加方便,需要的朋友可以參考下2023-03-03
Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類詳解
這篇文章主要給大家介紹了關(guān)于Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法
本篇文章主要介紹了Java實(shí)現(xiàn)較大二進(jìn)制文件的讀、寫方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02
Java搭建簡單Netty開發(fā)環(huán)境入門教程
這篇文章主要介紹了Java搭建簡單Netty開發(fā)環(huán)境入門教程,有詳細(xì)的代碼展示和maven依賴,能夠幫助你快速上手Netty開發(fā)框架,需要的朋友可以參考下2021-06-06
基于MyBatis的parameterType傳入?yún)?shù)類型
這篇文章主要介紹了基于MyBatis的parameterType傳入?yún)?shù)類型,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
java.imageIo給圖片添加水印的實(shí)現(xiàn)代碼
最近項(xiàng)目在做一個(gè)商城項(xiàng)目, 項(xiàng)目上的圖片要添加水?、?添加圖片水印;②:添加文字水印;一下提供下個(gè)方法,希望大家可以用得著2013-07-07
淺析Spring IOC bean為什么默認(rèn)是單例
單例的意思就是說在 Spring IoC 容器中只會(huì)存在一個(gè) bean 的實(shí)例,無論一次調(diào)用還是多次調(diào)用,始終指向的都是同一個(gè) bean 對(duì)象,本文小編將和大家一起分析Spring IOC bean為什么默認(rèn)是單例,需要的朋友可以參考下2023-12-12

