RocketMQ事務消息圖文示例講解
RocketMQ 也允許我們像mysql 一樣發(fā)送具有事務特征的消息
MQ 的事務流程(本地代碼正常執(zhí)行)

MQ 的消息補償過程(當本地代碼執(zhí)行失敗時)

MQ 消息的三種狀態(tài)
- 提交狀態(tài):允許進入隊列,此消息與非事務消息無區(qū)別
- 回滾狀態(tài):不允許進入隊列,此消息等同于未發(fā)送過
- 中間狀態(tài):完成了 half 消息的發(fā)送,未對 MQ 進行二次狀態(tài)確認(未知狀態(tài))
注意:事務消息僅與生產(chǎn)者有關(guān),與消費者無關(guān)
生產(chǎn)者代碼(提交狀態(tài)、回滾狀態(tài)):
public class Producer {
public static void main(String[] args) throws Exception{
//事務消息使用的生產(chǎn)者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
//添加本地事務對應的監(jiān)聽
producer.setTransactionListener(new TransactionListener() {
//正常事務過程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 此處寫本地事務處理業(yè)務
// 如果成功,消息改為提交,如果失敗改為 回滾,如果是多線程處理狀態(tài)未知,就提交為未知等待事務補償過程
//事務提交狀態(tài)
return LocalTransactionState.COMMIT_MESSAGE;// 類似于msql 的 commit
//return LocalTransactionState.ROLLBACK_MESSAGE;回滾狀態(tài)
}
//事務補償過程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事務消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結(jié)果:"+result);
producer.shutdown();
}
}
生產(chǎn)者(中間狀態(tài)):
public class Producer {
public static void main(String[] args) throws Exception{
//事務消息使用的生產(chǎn)者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.23.127:9876");
//添加本地事務對應的監(jiān)聽
producer.setTransactionListener(new TransactionListener() {
//正常事務過程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.UNKNOW;
}
//事務補償過程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事務補償過程執(zhí)行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic8",("事務消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回結(jié)果:"+result);
//事務補償過程必須保障服務器在運行過程中,否則將無法進行正常的事務補償
//producer.shutdown();
}
}
到此這篇關(guān)于RocketMQ事務消息圖文示例講解的文章就介紹到這了,更多相關(guān)RocketMQ事務消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中@ConfigurationProperties實現(xiàn)配置自動綁定的方法
本文主要介紹了SpringBoot中@ConfigurationProperties實現(xiàn)配置自動綁定的方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
在SpringBoot中使用MongoDB完成數(shù)據(jù)存儲
本文主要介紹了在SpringBoot中如惡化使用MongoDB完成數(shù)據(jù)存儲,接下來這篇我們將圍繞MongoDB進行,MongoDB是一個開源的,面向文檔的NoSQL數(shù)據(jù)庫管理系統(tǒng),使用類似JSON的BSON(二進制JSON)格式來存儲數(shù)據(jù),具有靈活的數(shù)據(jù)模型和強大的查詢功能,需要的朋友可以參考下2023-11-11
Springboot如何根據(jù)docx填充生成word文件并導出pdf
這篇文章主要介紹了Springboot如何根據(jù)docx填充生成word文件并導出pdf問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
Jenkins初級應用之Invoke?Phing?targets插件配置
這篇文章主要為大家介紹了Jenkins初級應用之Invoke?Phing?targets的插件配置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪<BR>2022-04-04

