RocketMQ事務(wù)消息使用與原理詳解
一、背景&概述
最近在找工作,面試過(guò)程中被多次問(wèn)到事務(wù)消息的實(shí)現(xiàn)原理,另外在分布式事務(wù)解決方案中,事務(wù)消息也是一個(gè)不錯(cuò)的解決方案,本篇文章將圍繞RocketMQ的事務(wù)消息實(shí)現(xiàn)展開(kāi)描述。
二、應(yīng)用場(chǎng)景
所謂事務(wù)消息,其實(shí)是為了解決上下游寫(xiě)一致性,以及強(qiáng)依賴(lài)解耦,也即是完成當(dāng)前操作的同時(shí)給下游發(fā)送指令,并且保證上下游要么同時(shí)成功或者同時(shí)失敗,并且考慮上游的性能和RT問(wèn)題做出的強(qiáng)調(diào)用解耦妥協(xié)。常見(jiàn)的應(yīng)用場(chǎng)景有:
1.訂單履約指令下發(fā)
用戶(hù)下單成功后,給履約系統(tǒng)發(fā)送指令進(jìn)行履約操作,下單失敗不發(fā)送指令,采購(gòu)缺貨或者其他履約異常,反向觸發(fā)訂單取消或者其他兜底操作。
2.用戶(hù)轉(zhuǎn)賬
用戶(hù)發(fā)起轉(zhuǎn)賬后,交易狀態(tài)短暫掛起,發(fā)送指令給銀行,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待結(jié)果更新交易狀態(tài)。
3.訂單支付
支付發(fā)起后,當(dāng)筆訂單處于中間狀態(tài),給支付網(wǎng)關(guān)發(fā)起指令,如果發(fā)起失敗則不發(fā)送指令,發(fā)送成功后等待支付網(wǎng)關(guān)反饋更新支付狀態(tài)。
三、使用方式
1.事務(wù)消息監(jiān)聽(tīng)器
@Component @Slf4j public class OrderTransactionalListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { log.info("開(kāi)始執(zhí)行本地事務(wù)...."); LocalTransactionState state; try{ String body = new String(message.getBody()); OrderDTO order = JSONObject.parseObject(body, OrderDTO.class); orderService.createOrder(order,message.getTransactionId()); state = LocalTransactionState.COMMIT_MESSAGE; log.info("本地事務(wù)已提交。{}",message.getTransactionId()); }catch (Exception e){ log.error("執(zhí)行本地事務(wù)失敗。{}",e); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { log.info("開(kāi)始回查本地事務(wù)狀態(tài)。{}",messageExt.getTransactionId()); LocalTransactionState state; String transactionId = messageExt.getTransactionId(); if (transactionLogService.get(transactionId)>0){ state = LocalTransactionState.COMMIT_MESSAGE; }else { state = LocalTransactionState.UNKNOW; } log.info("結(jié)束本地事務(wù)狀態(tài)查詢(xún):{}",state); return state; } }
2.編寫(xiě)事務(wù)消息生產(chǎn)者
@Component @Slf4j public class TransactionalMsgProducer implements InitializingBean, DisposableBean { private String GROUP = "order_transactional"; private TransactionMQProducer msgProducer; //用于執(zhí)行本地事務(wù)和事務(wù)狀態(tài)回查的監(jiān)聽(tīng)器 @Autowired private OrderTransactionalListener orderTransactionListener; //執(zhí)行任務(wù)的線(xiàn)程池 private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); private void start(){ try { this.msgProducer.start(); } catch (MQClientException e) { log.error("msg producer starter occur error;",e); } } private void shutdown() { if(null != msgProducer) { try { msgProducer.shutdown(); } catch (Exception e) { log.error("producer shutdown occur error;",e); } } } public TransactionSendResult send(String data, String topic) throws MQClientException { Message message = new Message(topic,data.getBytes()); return this.msgProducer.sendMessageInTransaction(message, null); } @Override public void afterPropertiesSet() throws Exception { msgProducer = new TransactionMQProducer(GROUP); msgProducer.setNamesrvAddr("namesrvHost:ip"); msgProducer.setSendMsgTimeout(Integer.MAX_VALUE); msgProducer.setExecutorService(executor); msgProducer.setTransactionListener(orderTransactionListener); this.start(); } @Override public void destroy() throws Exception { this.shutdown(); } }
3.業(yè)務(wù)實(shí)現(xiàn)
@Service @Slf4j public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private TransactionLogMapper transactionLogMapper; @Autowired private TransactionalMsgProducer producer; //執(zhí)行本地事務(wù)時(shí)調(diào)用,將訂單數(shù)據(jù)和事務(wù)日志寫(xiě)入本地?cái)?shù)據(jù)庫(kù) @Transactional @Override public void createOrder(OrderDTO orderDTO,String transactionId){ //1.創(chuàng)建訂單 Order order = new Order(); BeanUtils.copyProperties(orderDTO,order); orderMapper.createOrder(order); //2.寫(xiě)入事務(wù)日志 TransactionLog log = new TransactionLog(); log.setId(transactionId); log.setBusiness("order"); log.setForeignKey(String.valueOf(order.getId())); transactionLogMapper.insert(log); log.info("create order success,order={}",orderDTO); } //前端調(diào)用,只用于向RocketMQ發(fā)送事務(wù)消息 @Override public void createOrder(OrderDTO order) throws MQClientException { order.setId(snowflake.nextId()); order.setOrderNo(snowflake.nextIdStr()); producer.send(JSON.toJSONString(order),"order"); } }
4.入口調(diào)用
@RestController @Slf4j public class OrderController { @Autowired private OrderService orderService; @PostMapping("/create_order") public void createOrder(@RequestBody OrderDTO order) { log.info("receive order data,order={}",order.getCommodityCode()); orderService.createOrder(order); } }
這樣我們就實(shí)現(xiàn)了rocketmq事務(wù)消息的使用。
四、原理介紹
1.概念模型
- 半消息(half message):半消息是一種特殊的消息類(lèi)型,該狀態(tài)的消息暫時(shí)不能被Consumer消費(fèi)(消費(fèi)端不可見(jiàn))。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒(méi)有接收到Producer發(fā)出的二次確認(rèn)時(shí),該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài),該狀態(tài)的事務(wù)消息被稱(chēng)為半消息。
- 消息狀態(tài)回查(Message status check):由于網(wǎng)絡(luò)抖動(dòng)閃斷、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認(rèn)消息沒(méi)有成功送達(dá)。如果Broker檢測(cè)到某條事務(wù)消息長(zhǎng)時(shí)間處于半消息狀態(tài),則會(huì)主動(dòng)向Producer端發(fā)起回查操作,查詢(xún)?cè)撌聞?wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)??梢钥闯?,Message Status Check主要用來(lái)解決分布式事務(wù)中的超時(shí)問(wèn)題。
2.執(zhí)行流程
1):Producer向Broker端發(fā)送Half Message;
2):Broker ACK,Half Message發(fā)送成功;
3):Producer執(zhí)行本地事務(wù);
4):本地事務(wù)完畢,根據(jù)事務(wù)的狀態(tài),Producer向Broker發(fā)送二次確認(rèn)消息,確認(rèn)該Half Message的Commit或者Rollback狀態(tài)。Broker收到二次確認(rèn)消息后,對(duì)于Commit狀態(tài),則直接發(fā)送到Consumer端執(zhí)行消費(fèi)邏輯,而對(duì)于Rollback則直接標(biāo)記為失敗,一段時(shí)間后清除,并不會(huì)發(fā)給Consumer。正常情況下,到此分布式事務(wù)已經(jīng)完成,剩下要處理的就是超時(shí)問(wèn)題,即一段時(shí)間后Broker仍沒(méi)有收到Producer的二次確認(rèn)消息;
5):針對(duì)超時(shí)狀態(tài),Broker主動(dòng)向Producer發(fā)起消息回查;
6):Producer處理回查消息,返回對(duì)應(yīng)的本地事務(wù)的執(zhí)行結(jié)果;
7):Broker針對(duì)回查消息的結(jié)果,執(zhí)行Commit或Rollback操作,同4。
3.事務(wù)消息設(shè)計(jì)
在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對(duì)用戶(hù)不可見(jiàn)。其中事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶(hù)是不可見(jiàn)的。那么如何做到寫(xiě)入消息但是對(duì)用戶(hù)不可見(jiàn)呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)half類(lèi)型的消息,然后RocketMQ會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息。
在RocketMQ中,消息在服務(wù)端的存儲(chǔ)結(jié)構(gòu)如下,每條消息都會(huì)有對(duì)應(yīng)的索引信息,Consumer通過(guò)ConsumeQueue這個(gè)二級(jí)索引來(lái)讀取消息實(shí)體內(nèi)容,其流程如下:
RocketMQ的具體實(shí)現(xiàn)策略是:寫(xiě)入的如果事務(wù)消息,對(duì)消息的Topic和Queue等屬性進(jìn)行替換,同時(shí)將原來(lái)的Topic和Queue信息存儲(chǔ)到消息的屬性中,正因?yàn)橄⒅黝}被替換,故消息并不會(huì)轉(zhuǎn)發(fā)到該原主題的消息消費(fèi)隊(duì)列,消費(fèi)者無(wú)法感知消息的存在,不會(huì)消費(fèi)。
在完成一階段寫(xiě)入一條對(duì)用戶(hù)不可見(jiàn)的消息后,二階段如果是Commit操作,則需要讓消息對(duì)用戶(hù)可見(jiàn);如果是Rollback則需要撤銷(xiāo)一階段的消息。先說(shuō)Rollback的情況。對(duì)于Rollback,本身一階段的消息對(duì)用戶(hù)是不可見(jiàn)的,其實(shí)不需要真正撤銷(xiāo)消息(實(shí)際上RocketMQ也無(wú)法去真正的刪除一條消息,因?yàn)槭琼樞驅(qū)懳募模?。但是區(qū)別于這條消息沒(méi)有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個(gè)操作來(lái)標(biāo)識(shí)這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識(shí)事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒(méi)有對(duì)應(yīng)的Op消息,說(shuō)明這個(gè)事務(wù)的狀態(tài)還無(wú)法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無(wú)論是Commit或者Rollback都會(huì)記錄一個(gè)Op操作。Commit相對(duì)于Rollback只是在寫(xiě)入Op消息前創(chuàng)建Half消息的索引。
一階段的Half消息由于是寫(xiě)到一個(gè)特殊的Topic,所以二階段構(gòu)建索引時(shí)需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過(guò)一次普通消息的寫(xiě)入操作來(lái)生成一條對(duì)用戶(hù)可見(jiàn)的消息。所以RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容,在二階段時(shí)恢復(fù)出一條完整的普通消息。
如果在RocketMQ事務(wù)消息的二階段過(guò)程中失敗了,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問(wèn)題導(dǎo)致Commit失敗,那么需要通過(guò)一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱(chēng)為“回查”。Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來(lái)檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過(guò)對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
需要注意的是,rocketmq并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無(wú)法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。
五、源碼分析
1.客服端發(fā)送事務(wù)消息
RocketMQ事務(wù)消息由TransactionMQProducer實(shí)現(xiàn),繼承DefaultMQProducer實(shí)現(xiàn)了發(fā)送事務(wù)消息的能力。
發(fā)送事務(wù)消息會(huì)調(diào)用TransactionMQProducer的sendMessageInTransaction方法:
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }
檢查有沒(méi)有配置事務(wù)監(jiān)聽(tīng)器,監(jiān)聽(tīng)器提供了兩個(gè)方法:
- executeLocalTransaction:執(zhí)行本地事務(wù)
- checkLocalTransaction:回查本地事務(wù)
然后調(diào)用DefaultMQProducerImpl執(zhí)行發(fā)送:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { //...省略 SendResult sendResult = null; //msg設(shè)置參數(shù)TRAN_MSG,表示為事務(wù)消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //發(fā)送消息 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } //通過(guò)LocalTransactionExecutor執(zhí)行,已經(jīng)廢棄 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { //消息發(fā)送成功,執(zhí)行本地事務(wù) localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } } catch (Throwable e) { localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //執(zhí)行endTransaction方法,如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } //省略... return transactionSendResult; }
該方法做了以下幾件事情:
- 給消息打上事務(wù)屬性,用于broker區(qū)分普通消息和事務(wù)消息
- 發(fā)送半消息(half message)
- 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
- 執(zhí)行endTransaction方法,通知broker 執(zhí)行 commit/rollback
發(fā)送消息會(huì)正常調(diào)用DefaultMQProducerImpl的發(fā)送消息邏輯,執(zhí)行本地事務(wù)通過(guò)transactionListener調(diào)用本地的事務(wù)邏輯,我們看一下結(jié)束事務(wù)endTransaction方法實(shí)現(xiàn):
public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。
如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。
2.Broker處理事務(wù)消息
RocketMQ服務(wù)端有個(gè)NettyRequestProcessor接口,類(lèi)似于spring的BeanPostProcessor,broker啟動(dòng)的時(shí)候會(huì)把對(duì)應(yīng)的實(shí)現(xiàn)注冊(cè)到NettyRemotingServer的本地緩存processorTable中,在收到producer發(fā)送的消息會(huì)調(diào)用NettyServerHandler的channelRead0方法,然后會(huì)調(diào)用對(duì)應(yīng)的NettyRequestProcessor實(shí)現(xiàn)處理接收到的消息請(qǐng)求??匆幌耂endMessageProcessor實(shí)現(xiàn):
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext traceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) { return rewriteResult; } traceContext = buildMsgContext(ctx, requestHeader); String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); traceContext.setCommercialOwner(owner); try { this.executeSendMessageHookBefore(ctx, request, traceContext); } catch (AbortProcessException e) { final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()); errorResponse.setOpaque(request.getOpaque()); return errorResponse; } RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, traceContext, requestHeader, mappingContext, (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1)); } else { response = this.sendMessage(ctx, request, traceContext, requestHeader, mappingContext, (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12)); } return response; } }
會(huì)調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類(lèi)型,進(jìn)行半消息存儲(chǔ):
public RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader, final TopicQueueMappingContext mappingContext, final SendMessageCallback sendMessageCallback) throws RemotingCommandException { //...省略 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; } long beginTimeMillis = this.brokerController.getMessageStore().now(); if (brokerController.getBrokerConfig().isAsyncSendEnable()) { //...異步發(fā)送 return null; } else { PutMessageResult putMessageResult = null; if (sendTransactionPrepareMessage) { //存儲(chǔ)事務(wù)消息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { //存儲(chǔ)普通消息 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return response; } }
繼續(xù)看事務(wù)半消息存儲(chǔ)實(shí)現(xiàn)prepareMessage:
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
備份消息的原主題名稱(chēng)與原隊(duì)列ID,然后取消事務(wù)消息的消息標(biāo)簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊(duì)列ID固定為0。與其他普通消息區(qū)分開(kāi),然后完成消息持久化。
到這里,broker就初步處理完了 Producer 發(fā)送的事務(wù)半消息。
當(dāng)客戶(hù)端TransactionMQProducer執(zhí)行endTransaction動(dòng)作時(shí),觸發(fā)broker事務(wù)消息的二階段提交,broker會(huì)執(zhí)行EndTransactionProcessor的processRequest方法:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { //...省略 OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; }
邏輯很清晰,其核心實(shí)現(xiàn)如下:
- 根據(jù)commitlogOffset找到消息
- 如果是提交動(dòng)作,就恢復(fù)原消息的主題與隊(duì)列,再次存入commitlog文件進(jìn)而轉(zhuǎn)到消息消費(fèi)隊(duì)列,供消費(fèi)者消費(fèi),然后將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
- 回滾消息,則直接將原預(yù)處理消息存入一個(gè)新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
還有一種情況,如果本地事務(wù)執(zhí)行結(jié)果是UNKNOW或者由于網(wǎng)絡(luò)問(wèn)題沒(méi)有提交,那么存儲(chǔ)的broker的事務(wù)消息處于漂浮狀態(tài),無(wú)法主動(dòng)轉(zhuǎn)換成可消費(fèi)或者刪除狀態(tài),那么就需要broker有一種兜底機(jī)制來(lái)處理這種場(chǎng)景,當(dāng)然RocketMQ提供了一種補(bǔ)償機(jī)制,定時(shí)回查此類(lèi)消息,由TransactionalMessageCheckService實(shí)現(xiàn):
@Override public void run() { log.info("Start transaction check service thread!"); while (!this.isStopped()) { long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); } @Override protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
整體流程如下圖:
六、總結(jié)與思考
異常情況覆蓋
- 客戶(hù)端producer發(fā)送半消息失敗
可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 發(fā)送半消息(prepare)失敗??蛻?hù)端服務(wù)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等。
- 半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗
如果producer發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。這種情況下,執(zhí)行本地事務(wù)失敗后,會(huì)返回rollback給 MQ,MQ會(huì)刪除之前發(fā)送的半消息。不會(huì)下發(fā)指令給下游依賴(lài)。
- 半消息投遞成功,沒(méi)收到MQ返回的ack
如果客戶(hù)端發(fā)送半消息成功后,沒(méi)有收到MQ返回的響應(yīng)??赡苁且?yàn)榫W(wǎng)絡(luò)問(wèn)題,或者其他未知異常,客戶(hù)端以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。這個(gè)時(shí)候其實(shí)mq已經(jīng)保存半消息成功了,那這個(gè)消息怎么處理?
這個(gè)時(shí)候broker的補(bǔ)償邏輯上場(chǎng),消息回查定時(shí)任務(wù)TransactionalMessageCheckService會(huì)每隔1分鐘掃描一次半消息隊(duì)列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時(shí)MQ就會(huì)發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時(shí)就要發(fā)送rollback請(qǐng)求給mq,刪除之前的半消息。
- commit/rollback失敗
這個(gè)也是通過(guò)定時(shí)任務(wù)TransactionalMessageCheckService來(lái)做補(bǔ)償,它發(fā)現(xiàn)這個(gè)消息超過(guò)一定時(shí)間還沒(méi)有進(jìn)行二階段處理,就會(huì)回查本地事務(wù)。
缺點(diǎn)和替代方案
事務(wù)消息很好了解決了分布式事務(wù)場(chǎng)景的業(yè)務(wù)解耦,但是也存在一些問(wèn)題,比如引入新的組件依賴(lài),并且事務(wù)消息是強(qiáng)依賴(lài),那么還有沒(méi)有其他比較可行的替代方案,ebay提出的本地消息表是一種解決方案,消息生產(chǎn)方新增消息表,并記錄消息發(fā)送狀態(tài)。消息表和業(yè)務(wù)數(shù)據(jù)要在一個(gè)事務(wù)里提交,也就是說(shuō)他們要在一個(gè)數(shù)據(jù)庫(kù)里面。然后消息會(huì)經(jīng)過(guò)MQ發(fā)送到消息的消費(fèi)方。如果消息發(fā)送失敗,會(huì)進(jìn)行重試發(fā)送。消息消費(fèi)方,需要處理這個(gè)消息,并完成自己的業(yè)務(wù)邏輯。此時(shí)如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會(huì)重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個(gè)業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作。
本地消息表的優(yōu)點(diǎn)是避免了分布式事務(wù),實(shí)現(xiàn)了最終一致性,缺點(diǎn)也明顯,消息表會(huì)耦合到業(yè)務(wù)系統(tǒng)中,如果沒(méi)有封裝好的解決方案,會(huì)有很多支撐邏輯要處理。
以上就是RocketMQ事務(wù)消息使用與原理詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 事務(wù)消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java?byte數(shù)組轉(zhuǎn)String的幾種常用方法
在Java中數(shù)組是一種非常常見(jiàn)的數(shù)據(jù)結(jié)構(gòu),它可以用來(lái)存儲(chǔ)多個(gè)相同類(lèi)型的數(shù)據(jù),有時(shí)候,我們需要將數(shù)組轉(zhuǎn)換為字符串,以便于輸出或者傳遞給其他方法,這篇文章主要給大家介紹了關(guān)于java?byte數(shù)組轉(zhuǎn)String的幾種常用方法,需要的朋友可以參考下2024-09-09基于springboot i18n國(guó)際化后臺(tái)多種語(yǔ)言設(shè)置的方式
這篇文章主要介紹了基于springboot i18n國(guó)際化后臺(tái)多種語(yǔ)言設(shè)置的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06IntelliJ?IDEA?2022.1.1創(chuàng)建java項(xiàng)目的詳細(xì)方法步驟
最近安裝了IntelliJ IDEA 2022.1.1,發(fā)現(xiàn)新版本的窗口還有些變化的,所以下面這篇文章主要給大家介紹了關(guān)于IntelliJ?IDEA?2022.1.1創(chuàng)建java項(xiàng)目的詳細(xì)方法步驟,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07@RequestBody 部分屬性沒(méi)有轉(zhuǎn)化成功的處理
這篇文章主要介紹了@RequestBody 部分屬性沒(méi)有轉(zhuǎn)化成功的處理方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10SpringBoot啟動(dòng)自動(dòng)終止也不報(bào)錯(cuò)的原因及解決
這篇文章主要介紹了SpringBoot啟動(dòng)自動(dòng)終止也不報(bào)錯(cuò)的原因及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09IDEA maven依賴(lài)錯(cuò)誤中包下面紅色波浪線(xiàn)
這篇文章主要介紹了IDEA maven依賴(lài)錯(cuò)誤中包下面紅色波浪線(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08java 創(chuàng)建線(xiàn)程的方法總結(jié)
這篇文章主要介紹了java 創(chuàng)建線(xiàn)程的方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-03-03