關(guān)于RocketMQ使用事務(wù)消息
說(shuō)明
事務(wù)消息:
1、不支持延時(shí)消息和批量消息
2、如果消息沒(méi)有及時(shí)提交,默認(rèn)check 15次,可以通過(guò)Broker的transactionCheckMax參數(shù)配置次數(shù)。如果超時(shí)15次依然沒(méi)有得到明確結(jié)果,將會(huì)打印異常信息,具體的處理策略可以通過(guò)復(fù)寫AbstractTransactionCheckListener類實(shí)現(xiàn)
3、每次check的時(shí)間間隔可以通過(guò)Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS屬性指定
4、事務(wù)狀態(tài):LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。
原理
事務(wù)消息是RocketMQ的一大特性,其保證發(fā)送消息和執(zhí)行本地邏輯在同一個(gè)事務(wù)內(nèi)。實(shí)現(xiàn)的思路借鑒了兩階段提交協(xié)議:
第一階段:發(fā)送半事務(wù)消息,消息發(fā)送后,消息是對(duì)消費(fèi)者透明的,也就是該消息還不屬于可消費(fèi)消息,消費(fèi)者無(wú)法消費(fèi)。
第二階段:執(zhí)行本地事務(wù),本地執(zhí)行事務(wù)后提交消息。
(1)、如果事務(wù)執(zhí)行失敗,則回滾消息;
(2)、如果事務(wù)執(zhí)行成功,則提交消息,提交后消費(fèi)者可消費(fèi)到消息;
(3)、如果事務(wù)執(zhí)行成功,但消息提交失敗,RocketMQ還提供了回查機(jī)制:如果一段時(shí)間過(guò)后,沒(méi)有提交/回滾半事務(wù)消息,RocketMQ會(huì)定時(shí)回查一定的次數(shù),獲取本地事務(wù)的狀態(tài)以決定是提交還是回滾消息。
如果回查一定的次數(shù)后依然沒(méi)有獲取到本地事務(wù)的明確狀態(tài),則消息會(huì)被放到死信隊(duì)列,由人工確認(rèn)如何處理。
事務(wù)消息處理流程
1、生產(chǎn)端發(fā)送半事務(wù)消息到服務(wù)端
2、服務(wù)端返回半事務(wù)消息發(fā)送成功響應(yīng)。注意,此時(shí)的消息對(duì)消費(fèi)端是不可見的,不可被消費(fèi)
3、發(fā)送方執(zhí)行本地事務(wù)
4、執(zhí)行完本地事務(wù)后,客戶端同步服務(wù)端提交/回滾消息
5、如果服務(wù)端在一定的時(shí)間內(nèi),等不到4的回應(yīng),則定時(shí)進(jìn)行回查,詢問(wèn)客戶端的本地事務(wù)狀態(tài)。
6、客戶端檢查本地事務(wù)狀態(tài)
7、根據(jù)本地事務(wù)執(zhí)行情況,告知服務(wù)端,服務(wù)端決定是提交消息還是丟棄消息。
生產(chǎn)端
@Test public void sendMessage() throws Exception { //事務(wù)生產(chǎn)者 TransactionMQProducer producer = new TransactionMQProducer("defaultGroup"); producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr()); //設(shè)置檢查本地事務(wù)狀態(tài)的線程池 //producer.setExecutorService(null); //本地事務(wù)執(zhí)行監(jiān)聽器 TransactionListener transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); producer.start(); Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8"))); //發(fā)送事務(wù)消息 producer.sendMessageInTransaction(message, null); } class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //執(zhí)行本地事務(wù)(數(shù)據(jù)庫(kù))操作...... int num = new Random().nextInt(10); if (num < 3) { //本地事務(wù)執(zhí)行成功,提交消息 return LocalTransactionState.COMMIT_MESSAGE; } else if (num < 6) { //本地事務(wù)執(zhí)行失敗,刪除消息 return LocalTransactionState.ROLLBACK_MESSAGE; } //等待本地事務(wù)check,即執(zhí)行checkLocalTransaction()方法 return LocalTransactionState.UNKNOW; } /** * 回查邏輯 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { int num = new Random().nextInt(10); if (num < 3) { //提交消息 return LocalTransactionState.COMMIT_MESSAGE; } else if (num < 6) { //刪除消息 return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; } }
發(fā)送事務(wù)消息步驟:
1、初始化TransactionMQProducer實(shí)例
2、指定check線程池(回查線程池)
3、為Producer添加自定義事務(wù)監(jiān)聽器。自定義事務(wù)監(jiān)聽器需實(shí)現(xiàn)TransactionListener接口,通過(guò)覆蓋接口的executeLocalTransaction方法執(zhí)行本地事務(wù),返回事務(wù)狀態(tài),客戶端會(huì)根據(jù)本地事務(wù)狀態(tài)通知服務(wù)端,決定是否提交消息;通過(guò)覆蓋接口的checkLocalTransaction方法提供回查機(jī)制,當(dāng)在一定的時(shí)間內(nèi)服務(wù)端獲取不到本地事務(wù)執(zhí)行狀態(tài),將通過(guò)該方法回查事務(wù)狀態(tài),以決定消失是否需要提交。
4、通過(guò)Producer.sendMessageInTransaction發(fā)送事務(wù)消息。
消費(fèi)者正常消費(fèi)邏輯
消費(fèi)端
@Test public void consumeMessage() throws Exception { DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer(); defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*"); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { log.info("消費(fèi)到消息條數(shù):{}", list.size()); list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8"))) .map(String::new).forEach(System.out::println); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); defaultMQPushConsumer.start(); Thread.sleep(5000L); }
消費(fèi)端正常消費(fèi)消息即可。
到此這篇關(guān)于關(guān)于RocketMQ使用事務(wù)消息的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一文教你掌握J(rèn)ava如何實(shí)現(xiàn)判空
實(shí)際項(xiàng)目中我們會(huì)有很多地方需要判空校驗(yàn),如果不做判空校驗(yàn)則可能產(chǎn)生NullPointerException異常。所以本文小編為大家整理了Java中幾個(gè)常見的判空方法,希望對(duì)大家有所幫助2023-04-04Java鏈表中元素刪除的實(shí)現(xiàn)方法詳解【只刪除一個(gè)元素情況】
這篇文章主要介紹了Java鏈表中元素刪除的實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了java只刪除鏈表中一個(gè)元素的相關(guān)操作原理、實(shí)現(xiàn)方法與注意事項(xiàng),需要的朋友可以參考下2020-03-03基于web項(xiàng)目log日志指定輸出文件位置配置方法
下面小編就為大家分享一篇基于web項(xiàng)目log日志指定輸出文件位置配置方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-04-04詳解Java 包掃描實(shí)現(xiàn)和應(yīng)用(Jar篇)
這篇文章主要介紹了詳解Java 包掃描實(shí)現(xiàn)和應(yīng)用(Jar篇),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07詳解如何實(shí)現(xiàn)SpringBoot的底層注解
今天給大家?guī)?lái)的文章是如何實(shí)現(xiàn)SpringBoot的底層注解,文中有非常詳細(xì)的介紹及代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴很有幫助,需要的朋友可以參考下2021-06-06調(diào)用java.lang.Runtime.exec的正確姿勢(shì)分享
這篇文章主要介紹了調(diào)用java.lang.Runtime.exec的正確姿勢(shì),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11使用Feign動(dòng)態(tài)設(shè)置header和原理分析
這篇文章主要介紹了使用Feign動(dòng)態(tài)設(shè)置header和原理分析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Java集合框架之List ArrayList LinkedList使用詳解刨析
早在 Java 2 中之前,Java 就提供了特設(shè)類。比如:Dictionary, Vector, Stack, 和 Properties 這些類用來(lái)存儲(chǔ)和操作對(duì)象組。雖然這些類都非常有用,但是它們?nèi)鄙僖粋€(gè)核心的,統(tǒng)一的主題。由于這個(gè)原因,使用 Vector 類的方式和使用 Properties 類的方式有著很大不同2021-10-10