RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解
1. 概述
如果消費(fèi)端在你消費(fèi)的時候,剛消費(fèi)到,還沒處理,結(jié)果進(jìn)程掛了,比如重啟了,那么就尷尬了,RabbitMQ 認(rèn)為你都消費(fèi)了,這數(shù)據(jù)就丟了。這個時候得用 RabbitMQ 提供的 ack 機(jī)制,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動ack ,可以通過一個 api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認(rèn)為你還沒處理完,這個時候 RabbitMQ 會把這個消費(fèi)分配給別的 consumer 去處理,消息是不會丟的。
生產(chǎn)端消息可靠性保證可以使用RabbitMQ的confirm機(jī)制。
2. ACK機(jī)制與消費(fèi)端消息補(bǔ)償機(jī)制
把channel.basicConsume(...)方法的autoAck參數(shù)改為false
channel.basicAck(long deliveryTag, boolean multiple);方法,消費(fèi)成功簽收
參數(shù)說明:
- deliveryTag:消息標(biāo)識
- multiple:是否批量簽收
basicNack(long deliveryTag, boolean multiple, boolean requeue) ,消息消費(fèi)失敗
參數(shù)說明:
- deliveryTag:消息標(biāo)識
- multiple:是否批量簽收
- requeue:true 消息會重回隊列,false 消息會進(jìn)入到死信隊列
3. 代碼演示
生產(chǎn)端
public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬主機(jī) connectionFactory.setVirtualHost("/"); //創(chuàng)建一個鏈接 Connection connection = connectionFactory.newConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); String exchangeName="test_ack_exchange"; String routeKey="ack.test"; for (int i=0;i<5;i++){ Map<String, Object> headers = new HashMap<String, Object>(); //演示重回隊列機(jī)制,使用num==0的消息簽收失敗重回隊列 headers.put("num", i); AMQP.BasicProperties properties=new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); String msg="RabbitMQ send message ack test!"+i; channel.basicPublish(exchangeName,routeKey,properties,msg.getBytes()); } }
消息端
public static void main(String[] args) throws Exception{ System.out.println("======消息接收start=========="); ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬主機(jī) connectionFactory.setVirtualHost("/"); //創(chuàng)建鏈接 Connection connection = connectionFactory.newConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); String exchangeName="test_ack_exchange"; String exchangeType="topic"; //聲明Exchange channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); String queueName="test_ack_queue"; //聲明隊列 channel.queueDeclare(queueName,true,false,false,null); String routeKey="ack.#"; //綁定隊列和交換機(jī) channel.queueBind(queueName,exchangeName,routeKey); /** * autoAck:false 設(shè)置為手工簽收 */ channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息::"+new String(body)); try { Thread.sleep(3000); //休眠5秒 } catch (InterruptedException e) { e.printStackTrace(); } //演示重回隊列機(jī)制,使用num==0的消息簽收失敗重回隊列 if((Integer)properties.getHeaders().get("num") == 0) { /** * 參數(shù)說明:1、消息標(biāo)識 2、是否批量簽收 3、是否重回隊列 */ channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }); }
運(yùn)行代碼以上后,由于在消費(fèi)端,設(shè)置了第一條消息,簽收失敗重回隊列,在RabbitMQ控制臺中我們可以看到始終有一條消息未簽收確認(rèn)
到此這篇關(guān)于RabbitMQ的ACK確認(rèn)機(jī)制保障消費(fèi)端消息的可靠性詳解的文章就介紹到這了,更多相關(guān)RabbitMQ的ACK確認(rèn)機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud Admin健康檢查 郵件、釘釘群通知的實(shí)現(xiàn)
這篇文章主要介紹了Spring Cloud Admin健康檢查 郵件、釘釘群通知的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08如何使用 IntelliJ IDEA 編寫 Spark 應(yīng)用程序(Sc
本教程展示了如何在IntelliJIDEA中使用Maven編寫和運(yùn)行一個簡單的Spark應(yīng)用程序(例如WordCount程序),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-11-11spring cloud如何修復(fù)zuul跨域配置異常的問題
最近的開發(fā)過程中,使用spring集成了spring-cloud-zuul,在配置zuul跨域的時候遇到了問題,下面這篇文章主要給大家介紹了關(guān)于spring cloud如何修復(fù)zuul跨域配置異常的問題,需要的朋友可以參考借鑒,下面來一起看看吧。2017-09-09