RocketMQ事務(wù)消息機(jī)制詳解
RocketMQ事務(wù)消息
RocketMQ提供了事務(wù)消息,通過事務(wù)消息就能達(dá)到分布式事務(wù)的最終一致,從而實現(xiàn)了可靠消息服務(wù)。
一、事務(wù)消息的實現(xiàn)步驟

事務(wù)消息發(fā)送步驟:
1. 發(fā)送方將半事務(wù)消息發(fā)送至RocketMQ服務(wù)端。
2. RocketMQ服務(wù)端將消息持久化之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功。由于消息為半事務(wù)消息,在未收到生產(chǎn)者對該消息的二次確認(rèn)前,此消息被標(biāo)記成“暫不能投遞”狀態(tài)。
3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),服務(wù)端收到Commit 狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接受該消息。
事務(wù)消息回查步驟:
1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查。
2. 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。 3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進(jìn)行操作。
二、程序?qū)崿F(xiàn)
事務(wù)消息處理類需要繼承RocketMQLocalTransactionListener類。該類的executeLocalTransaction方法負(fù)責(zé)在接到RocketMQ服務(wù)端的Ack確認(rèn)消息后執(zhí)行本地方法,也就是事務(wù)消息發(fā)送步驟中的步驟3。該類的checkLocalTransaction方法負(fù)責(zé),在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,執(zhí)行RocketMQ服務(wù)端的消息回查,也就是事務(wù)消息回查步驟中的步驟2。
此外,要使該類生效,還需要加@RocketMQTransactionListener注解。這里有個要特別注意的地方。在2.1.0版本前,這個注解有一個屬性txProducerGroup,可以用多個@RocketMQTransactionListener來監(jiān)聽不同的txProducerGroup來發(fā)送不同類型的事務(wù)消息到topic。但是現(xiàn)在在一個項目中,如果你在一個project中寫了多個@RocketMQTransactionListener,項目將不能啟動,啟動會報錯。產(chǎn)生這個問題的原因據(jù)說是,當(dāng)使用RocketMQTemplate并發(fā)的執(zhí)行事務(wù)時,非常容易出現(xiàn)"illegal state"的異常,原因是一個TransactionProducer在執(zhí)行事務(wù)時不能被共享。所以,必須使用同一個TransactionMQProducer來發(fā)送所有類型的事務(wù)消息。當(dāng)然同理也就必須使用一個偵聽器處理所有的消息了。
既然必須使用同一個TransactionMQProducer,對于比較大的應(yīng)用,業(yè)務(wù)場景很多,就會造成混亂。這里我給出一個方案拋磚引玉。TransactionMQProducer在發(fā)送消息時,是可以傳遞參數(shù)對象和指定消息頭的??梢园岩獔?zhí)行的本地方法的bean名和方法名放進(jìn)去。
//發(fā)送半事務(wù)消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( topicAndTag, MessageBuilder.withPayload(msg) .setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId()) .setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId()) .setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId()) .build(), def );
其中def就是參數(shù)對象,可以自定義對象,這里是我自定義的TransactionMsgDefinationDto類,可以把想傳遞的信息放進(jìn)去,最重要的是要執(zhí)行的本地方法的bean名和方法名和方法執(zhí)行參數(shù):executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法執(zhí)行參數(shù))。該對象可以傳給RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通過反射執(zhí)行。
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//保存消息記錄
String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
JSONObject jsonBody = JSONObject.parseObject(body);
BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload();
TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg;
ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new);
String[] tags = def.getMsgTags();
if(tags !=null && tags.length > 0) {
StringBuilder tag = new StringBuilder();
for(int i = 0; i<tags.length; i++) {
tag.append(tags[0]);
if(i != tags.length-1) {
tag.append("||");
}
}
producerLog.setMsgTag(tag.toString());
}
producerLog.setBizId(dto.getBizId());
producerLog.setTxId(dto.getTxId());
producerLog.setBizType(dto.getBizType());
producerLog.setGroupName(dto.getProducerGroup());
producerLog.setMsgBody(body);
producerLogService.save(producerLog);
//執(zhí)行事務(wù)方法
SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
logger.error("發(fā)生錯誤:", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}放在消息頭header中的數(shù)據(jù)可以傳遞給RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同樣通過反射執(zhí)行。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME);
String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME);
Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME));
//執(zhí)行檢查方法
Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId});
if(ret.booleanValue())
return RocketMQLocalTransactionState.COMMIT;
else
return RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
logger.error("發(fā)生錯誤:", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}到此這篇關(guān)于RocketMQ事務(wù)消息機(jī)制詳解的文章就介紹到這了,更多相關(guān)RocketMQ事務(wù)消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java httpcomponents發(fā)送get post請求代碼實例
這篇文章主要介紹了Java httpcomponents發(fā)送get post請求代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09
IDEA工程運(yùn)行時總是報xx程序包不存在實際上包已導(dǎo)入(問題分析及解決方案)
這篇文章主要介紹了IDEA工程運(yùn)行時,總是報xx程序包不存在,實際上包已導(dǎo)入,本文給大家分享問題分析及解決方案,通過實例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2020-08-08
Java?SE使用for?each循環(huán)遍歷數(shù)組的方法代碼
在Java?SE開發(fā)中,數(shù)組是最常見的數(shù)據(jù)結(jié)構(gòu)之一,Java提供了多種遍歷數(shù)組的方式,其中for循環(huán)是最常用的方式之一,本文將介紹如何使用for?each循環(huán)遍歷數(shù)組,接下來,我們將通過一個簡單的代碼示例來展示如何使用for?each循環(huán)遍歷數(shù)組,需要的朋友可以參考下2023-11-11

