RocketMQ發(fā)送事務(wù)消息詳解
概念介紹
- 事務(wù)消息:提供類似XA或Open XA的分布式事務(wù)功能,通過事務(wù)消息能達到分布式事務(wù)的最終一致。
- 半事務(wù)消息:暫不能投遞的消息,生產(chǎn)者已經(jīng)成功地將消息發(fā)送到了RocketMQ服務(wù)端,但是RocketMQ服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
- 消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。
分布式事務(wù)消息的優(yōu)勢
RocketMQ分布式事務(wù)消息不僅可以實現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。
同時,傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會因為某一個關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。
在極端情況下,如果關(guān)聯(lián)的某一個應(yīng)用始終無法處理成功,也只需對當(dāng)前應(yīng)用進行補償或數(shù)據(jù)訂正處理,而無需對整體業(yè)務(wù)進行回滾。
典型場景
在淘寶購物車下單時,涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務(wù)消息的異步處理實現(xiàn)。
在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。
而購物車系統(tǒng)只需要訂閱交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。
交互流程
事務(wù)消息交互流程如下圖所示。
事務(wù)消息發(fā)送步驟如下:
- 生產(chǎn)者將半事務(wù)消息發(fā)送至RocketMQ服務(wù)端。
- RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半事務(wù)消息。
- 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
- 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:
- 二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費者。
- 二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會將半事務(wù)消息投遞給消費者。
- 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實例發(fā)起消息回查。
事務(wù)消息回查步驟如下:
- 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 生產(chǎn)者根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進行處理。
示例代碼
事務(wù)消息生產(chǎn)者
public enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW, }
事務(wù)消息發(fā)送完成本地事務(wù)后,可在execute方法中返回以下三種狀態(tài):
- COMMIT_MESSAGE:提交事務(wù),允許消費者消費該消息。
- ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費。
- UNKNOW:暫時無法判斷狀態(tài),等待固定時間以后消息隊列RocketMQ版服務(wù)端根據(jù)回查規(guī)則向生產(chǎn)者進行消息回查。
創(chuàng)建事務(wù)消息的Producer時必須指定TransactionListener的實現(xiàn)類,處理異常情況下事務(wù)消息的回查。
回查規(guī)則:本地事務(wù)執(zhí)行完成后,若服務(wù)端收到的本地事務(wù)返回狀態(tài)為TransactionStatus.Unknow,或生產(chǎn)者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)。則服務(wù)端會向消息生產(chǎn)者發(fā)起事務(wù)回查,第一次回查后仍未獲取到事務(wù)狀態(tài),則之后每隔一段時間會再次回查。
回查間隔時間:系統(tǒng)默認(rèn)每隔30秒發(fā)起一次定時任務(wù),對未提交的半事務(wù)消息進行回查,共持續(xù)12小時。
package com.morris.rocketmq.transaction; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS; /** * 事務(wù)消息生產(chǎn)者 */ public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-demo"); producer.setNamesrvAddr(NAME_SERVER_ADDRESS); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }); producer.setExecutorService(executorService); // 指定事務(wù)會查的實現(xiàn)類 producer.setTransactionListener(new TransactionListener() { private final AtomicInteger transactionIndex = new AtomicInteger(0); private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); System.out.println(Thread.currentThread().getName()+ "-executeLocalTransaction:" + new String(msg.getBody()) + ",value=" + value); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println(Thread.currentThread().getName()+ "-checkLocalTransaction:" + new String(msg.getBody())); Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.COMMIT_MESSAGE; case 1: return LocalTransactionState.UNKNOW; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); for(int i = 0; i < 10; i++) { Message message = new Message("TransactionTopic", ("transactionDemo" + i).getBytes()); // 發(fā)送事務(wù)消息 producer.sendMessageInTransaction(message, i); System.out.println(message); } } }
第一次消息回查最快時間:該參數(shù)支持自定義設(shè)置。若指定消息未達到設(shè)置的最快回查時間前,系統(tǒng)默認(rèn)每隔30秒一次的回查任務(wù)不會檢查該消息。
設(shè)置方式如下:
Message message = new Message(); message.putUserProperties(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
說明:因為系統(tǒng)默認(rèn)的回查間隔,第一次消息回查的實際時間會向后有0秒~30秒的浮動。
例如:指定消息的第一次消息最快回查時間設(shè)置為60秒,系統(tǒng)在第58秒時達到定時的回查時間,但設(shè)置的60秒未到,所以該消息不在本次回查范圍內(nèi)。等待間隔30秒后,下一次的系統(tǒng)回查時間在第88秒,該消息才符合條件進行第一次回查,距設(shè)置的最快回查時間延后了28秒。
事務(wù)消息消費者
事務(wù)消息的Group ID不能與其他類型消息的Group ID共用。與其他類型的消息不同,事務(wù)消息有回查機制,回查時服務(wù)端會根據(jù)Group ID去查詢生產(chǎn)者客戶端。
package com.morris.rocketmq.transaction; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS; /** * 事務(wù)消息消費者 */ public class TranscationConsumer { public static void main(String[] args) throws Exception { // 實例化消息生產(chǎn)者,指定組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group"); // 指定Namesrv地址信息. consumer.setNamesrvAddr(NAME_SERVER_ADDRESS); // 訂閱Topic consumer.subscribe("TransactionTopic", "*"); //負(fù)載均衡模式消費 consumer.setMessageModel(MessageModel.CLUSTERING); // 注冊回調(diào)函數(shù),處理消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //啟動消息者 consumer.start(); System.out.printf("Consumer Started.%n"); } }
使用說明
- 事務(wù)消息不支持延時消息和批量消息。
- 事務(wù)回查的間隔時間:BrokerConfig.transactionCheckInterval,通過Broker的配置文件設(shè)置好。
- 為了避免單個消息被檢查太多次而導(dǎo)致半隊列消息累積,我們默認(rèn)將單個消息的檢查次數(shù)限制為15次,但是用戶可以通過Broker配置文件的transactionCheckMax參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過N次的話(N=transactionCheckMax)則Broker將丟棄此消息,并在默認(rèn)情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionCheckListener類來修改這個行為。
- 事務(wù)消息將在Broker配置文件中的參數(shù)transactionMsgTimeout這樣的特定時間長度之后被檢查。當(dāng)發(fā)送事務(wù)消息時,用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS來改變這個限制,該參數(shù)優(yōu)先于transactionMsgTimeout參數(shù)。
- 事務(wù)性消息可能不止一次被檢查或消費。
- 事務(wù)性消息中用到了生產(chǎn)者群組,這種就是一種高可用機制,用來確保事務(wù)消息的可靠性。
- 提交給用戶的目標(biāo)主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過RocketMQ本身的高可用性機制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機制。
- 事務(wù)消息的生產(chǎn)者ID不能與其他類型消息的生產(chǎn)者ID共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者ID查詢到消費者。
到此這篇關(guān)于RocketMQ發(fā)送事務(wù)消息詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java獲取HttpServletRequest的三種方法詳解
這篇文章主要介紹了Java獲取HttpServletRequest的三種方法詳解,是一個接口,全限定名稱為Jakarta.Serclet.http.HttpServletRequest2023-11-11
HttpServletRequest接口是Servlet規(guī)范的一員,需要的朋友可以參考下SpringBoot使用AOP與注解實現(xiàn)請求參數(shù)自動填充流程詳解
面向切面編程(aspect-oriented programming,AOP)主要實現(xiàn)的目的是針對業(yè)務(wù)處理過程中的切面進行提取,諸如日志、事務(wù)管理和安全這樣的系統(tǒng)服務(wù),從而使得業(yè)務(wù)邏輯各部分之間的耦合度降低,提高程序的可重用性,同時提高了開發(fā)的效率2023-02-02Java選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)的使用詳解
循環(huán)結(jié)構(gòu)是指在程序中需要反復(fù)執(zhí)行某個功能而設(shè)置的一種程序結(jié)構(gòu)。它由循環(huán)體中的條件,判斷繼續(xù)執(zhí)行某個功能還是退出循環(huán),選擇結(jié)構(gòu)用于判斷給定的條件,根據(jù)判斷的結(jié)果判斷某些條件,根據(jù)判斷的結(jié)果來控制程序的流程2022-03-03Java ExecutorServic線程池異步實現(xiàn)流程
這篇文章主要介紹了Java ExecutorServic線程池異步實現(xiàn)流程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-12-12Feign如何解決服務(wù)之間調(diào)用傳遞token
這篇文章主要介紹了Feign如何解決服務(wù)之間調(diào)用傳遞token,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03