RabbitMQ消費(fèi)端ACK NACK及重回隊(duì)列機(jī)制詳解
ACK和NACK
當(dāng)我們使用RabbitMQ時(shí)用于網(wǎng)絡(luò)異常,業(yè)務(wù)處理異?;蛘邩I(yè)務(wù)錯(cuò)誤導(dǎo)致消息無(wú)法立即消費(fèi)時(shí),在這種情況下,傳輸中的信息將無(wú)法正常投遞——它們需要被重新投遞。Acknowledgements機(jī)制讓服務(wù)器和客戶端知道何時(shí)需要重新投遞。
當(dāng)設(shè)置autoACK=false 時(shí),就可以使用手工ACK。 其實(shí)手工方式包括了手工ACK、手工NACK。
- 手工 ACK 時(shí),會(huì)發(fā)送給Broker一個(gè)應(yīng)答,代表消息處理成功,Broker就可回送響應(yīng)給Pro;
- NACK 則表示消息處理失敗,如果設(shè)置了重回隊(duì)列,Broker端就會(huì)將沒有成功處理的消息重新發(fā)送。
使用方式
1、basicNack
Consumer消費(fèi)時(shí),若由于業(yè)務(wù)異常,可手工 NACK 記錄日志,然后進(jìn)行補(bǔ)償: basic.nack方法為不確認(rèn)deliveryTag對(duì)應(yīng)的消息,第二個(gè)參數(shù)是否應(yīng)用于多消息,第三個(gè)參數(shù)是否requeue,如果requeue 參數(shù)設(shè)置為true ,則RabbitMQ 會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下一個(gè)訂閱的消費(fèi)者, 如果requeue 參數(shù)設(shè)置為false ,則RabbitMQ立即會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消費(fèi)者。與basic.reject區(qū)別就是同時(shí)支持多個(gè)消息,可以nack該消費(fèi)者先前接收未ack的所有消息。nack后的消息也會(huì)被自己消費(fèi)到。
multiple 參數(shù)設(shè)置為false 則表示拒絕編號(hào)為deliveryTag的這一條消息,這時(shí)候basicNack 和basicReject 方法一樣;multiple 參數(shù)設(shè)置為true 則表示拒絕deliveryTag 編號(hào)之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息。
- 參數(shù)1: 消息
- 參數(shù)2: 是否應(yīng)用于多消息
- 參數(shù)3: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
2、basicAck
如果由于服務(wù)器宕機(jī)等嚴(yán)重問題,就需要手工 ACK 保障Con消費(fèi)成功:deliveryTag
: 每次接收消息+1,可以做此消息處理通道的名字。
void basicAck(long deliveryTag, boolean multiple)
3、basicReject
basic.Reject拒絕deliveryTag對(duì)應(yīng)的消息,第二個(gè)參數(shù)是否requeue,true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。該方法reject后,該消費(fèi)者還是會(huì)消費(fèi)到該條被reject的消息。reject一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用Basic.Nack 這個(gè)命令。
- 參數(shù)1: 消息
- 參數(shù)2: 是否重新放回隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列
void basicReject(long deliveryTag, boolean requeue);
4、basicRecover
basic.recover是否恢復(fù)消息到隊(duì)列,參數(shù)是是否requeue,true則重新入隊(duì)列,并且盡可能的將之前recover的消息投遞給其他消費(fèi)者消費(fèi),而不是自己再次消費(fèi)。false則消息會(huì)重新被投遞給自己。
channel.basicRecover(true);
消費(fèi)端的重回隊(duì)列
重回隊(duì)列針對(duì)沒有處理成功的消息,將消息重新投遞給Broker。
重回隊(duì)列會(huì)把消費(fèi)失敗的消息重新添加到隊(duì)列尾端,供Consumer重新消費(fèi)。
一般在實(shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,即設(shè)置為false。
在確認(rèn)ack后再把消息通過basicPublish到隊(duì)列尾部,防止正常消息堆積。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));
實(shí)際運(yùn)用
@RabbitListener(queues = "${rabbitmq.one-queue}", containerFactory = "rabbitListenerContainerFactory") @RabbitHandler public void doRmOnduty(Message message, Channel channel) { try { String body = new String(message.getBody()); //log.info("消息處理:{}",body); } catch (Exception e) { //消費(fèi)異常,設(shè)置重回隊(duì)列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } finally { //最終確認(rèn)消息消費(fèi)成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
ACK機(jī)制可以保證Con拉取到了消息,若處理失敗了,則隊(duì)列中還有這個(gè)消息,仍然可以給Con處理。ack機(jī)制是 Con 告訴 Broker 當(dāng)前消息是否成功消費(fèi),至于 Broker 如何處理 NACK,取決于 Con 是否設(shè)置了 requeue:如果 requeue=false, 則NACK 后 Broker 還是會(huì)刪除消息的。
但一般處理消息失敗都是因?yàn)榇a邏輯出bug,即使隊(duì)列中后來(lái)仍然保留該消息,然后再給Con消費(fèi),依舊報(bào)錯(cuò)。 當(dāng)然,若一臺(tái)機(jī)器宕機(jī),消息還有,還可以給另外機(jī)器消費(fèi),這種情景下 ACK 很有用。
如果不使用 ACK 機(jī)制,直接把出錯(cuò)消息存庫(kù),便于日后查bug或重新執(zhí)行。
以上就是RabbitMQ消費(fèi)端ACK NACK及重回隊(duì)列機(jī)制詳解的詳細(xì)內(nèi)容,更多關(guān)于RabbitMQ消費(fèi)端ACK NACK的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JAVA 文件監(jiān)控 WatchService的示例方法
本篇文章主要介紹了JAVA 文件監(jiān)控 WatchService的示例方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來(lái)看看吧2017-10-10Java 策略模式與模板方法模式相關(guān)總結(jié)
這篇文章主要介紹了Java 策略模式與模板方法模式相關(guān)總結(jié),幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2021-01-01LoggingEventAsyncDisruptorAppender類執(zhí)行流程源碼解讀
這篇文章主要介紹了LoggingEventAsyncDisruptorAppender類執(zhí)行流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12一文看懂springboot實(shí)現(xiàn)短信服務(wù)功能
項(xiàng)目中的短信服務(wù)基本上上都會(huì)用到,簡(jiǎn)單的注冊(cè)驗(yàn)證碼,消息通知等等都會(huì)用到。這篇文章主要介紹了springboot 實(shí)現(xiàn)短信服務(wù)功能,需要的朋友可以參考下2019-10-10Java并發(fā)編程中的volatile關(guān)鍵字詳解
這篇文章主要介紹了Java并發(fā)編程中的volatile關(guān)鍵字詳解,volatile?用于保證我們某個(gè)變量的可見性,使其一直存放在主存中,不被移動(dòng)到某個(gè)線程的私有工作內(nèi)存中,需要的朋友可以參考下2023-08-08解決@Test注解在Maven工程的Test.class類中無(wú)法使用的問題
這篇文章主要介紹了解決@Test注解在Maven工程的Test.class類中無(wú)法使用的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03使用java8 API遍歷過濾文件目錄及子目錄和隱藏文件示例詳解
這篇文章主要介紹了使用java8API遍歷過濾文件目錄及子目錄及隱藏文件示例詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07