RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)
引言
在Broker中,事務(wù)消息的初始化是通過(guò)BrokerController.initialTransaction()
方法執(zhí)行的。
private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }
這里有三個(gè)核心的初始化變量
TransactionalMessageService
事務(wù)消息主要處理服務(wù)。默認(rèn)實(shí)現(xiàn)類是TransactionalMessageServiceImpl
也可以自己定義事務(wù)消息處理實(shí)現(xiàn)類,通過(guò)ServiceProvider.loadClass()
方法進(jìn)行加載。
TransactionalMessageService
類定義如下。內(nèi)部屬性已加注釋標(biāo)明。
public interface TransactionalMessageService { //用于保存Half事務(wù)消息 PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner); //刪除事務(wù)消息 boolean deletePrepareMessage(MessageExt messageExt); //提交事務(wù)消息 OperationResult commitMessage(EndTransactionRequestHeader requestHeader); //回滾事務(wù)消息 OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader); void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener); //打開(kāi)事務(wù)消息 boolean open(); //關(guān)閉事務(wù)消息 void close(); }
transactionalMessageCheckListener
事務(wù)消息回查監(jiān)聽(tīng)器
transactionalMessageCheckService
事務(wù)消息回查服務(wù),啟動(dòng)一個(gè)線程定時(shí)檢查超時(shí)的Half消息是否需要回查。
處理事務(wù)消息
當(dāng)初始化完成之后,Broker就可以處理事務(wù)消息了。
Broker存儲(chǔ)事務(wù)消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,這和普通消息其實(shí)是一樣的。
但是有兩點(diǎn)針對(duì)事務(wù)消息的特殊處理:
第一處:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
//獲取擴(kuò)展字段的值,若是該值為true則為事務(wù)消息 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //判斷當(dāng)前Broker配置是否支持事務(wù)消息 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; }
if (sendTransactionPrepareMessage) { //保存Half信息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }
第二處:
存儲(chǔ)事務(wù)消息前的預(yù)處理,對(duì)應(yīng)方法是
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //將原消息的topic保存在擴(kuò)展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); //將原消息的QueueId保存在擴(kuò)展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); //將原消息的SysFlag保存在擴(kuò)展字段中 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //修改topic的值為RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); //修改Queueid為0 msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
完成上述步驟之后,調(diào)用DefaultMessageStole.putMessage()
方法將其保存到CommitLog
中。
CommitLog存儲(chǔ)成功之后,通過(guò)org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法對(duì)其進(jìn)行處理。
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the consume queue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; }
這里的邏輯是這樣的,當(dāng)讀到的消息類型為事務(wù)消息時(shí),設(shè)置當(dāng)前消息的位點(diǎn)值為0,而不是設(shè)置真實(shí)的位點(diǎn)。這樣該位點(diǎn)就不會(huì)建立ConsumeQueue索引,也不會(huì)被消費(fèi)。
以上就是RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Broker存儲(chǔ)事務(wù)消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot3中數(shù)據(jù)庫(kù)集成實(shí)踐詳解
項(xiàng)目工程中,集成數(shù)據(jù)庫(kù)實(shí)現(xiàn)對(duì)數(shù)據(jù)的增曬改查管理,是最基礎(chǔ)的能力,所以下面小編就來(lái)和大家講講SpringBoot3如何實(shí)現(xiàn)數(shù)據(jù)庫(kù)集成,需要的可以參考下2023-08-08maven <repositories>標(biāo)簽和<pluginRepositories>標(biāo)簽的使用
這篇文章主要介紹了maven <repositories>標(biāo)簽和<pluginRepositories>標(biāo)簽的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07基于mybatis注解動(dòng)態(tài)sql中foreach工具的方法
這篇文章主要介紹了mybatis注解動(dòng)態(tài)sql中foreach工具方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java開(kāi)發(fā)實(shí)例之圖書(shū)管理系統(tǒng)的實(shí)現(xiàn)
圖書(shū)管理的功能大體包括:增加書(shū)籍、借閱書(shū)籍、刪除書(shū)籍、查看書(shū)籍列表、退出系統(tǒng)、查找書(shū)籍、返還書(shū)籍這些,本文主要給大家介紹該系統(tǒng)的數(shù)據(jù)庫(kù)語(yǔ)句,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-10-10淺談緩沖字符流 BufferedReader BufferedWriter用法
這篇文章主要介紹了緩沖字符流 BufferedReader BufferedWriter的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Layui前后臺(tái)交互數(shù)據(jù)獲取java實(shí)例
下面小編就為大家分享一篇Layui前后臺(tái)交互數(shù)據(jù)獲取java實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01Spring?Boot?整合?Thymeleaf?實(shí)例分享
這篇文章主要分享了Spring?Boot整合Thymeleaf,Thymeleaf是新一代的Java模板引擎,類似于Velocity、FreeMarker等傳統(tǒng)引擎,關(guān)于其更多相關(guān)內(nèi)容,需要的小伙伴可以參考一下2022-05-05