亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Rocketmq事務(wù)消息之半消息詳解

 更新時(shí)間:2023年09月08日 09:26:39   作者:澄風(fēng)  
這篇文章主要介紹了Rocketmq事務(wù)消息之半消息詳解,RocketMQ的事務(wù)消息支持在業(yè)務(wù)邏輯與發(fā)送消息之間提供事務(wù)保證,RocketMQ通過(guò)兩階段的方式提供事務(wù)消息的支持,需要的朋友可以參考下

什么是半消息(事務(wù)消息)

當(dāng)我們?cè)跇I(yè)務(wù)邏輯中發(fā)送消息時(shí),消息與業(yè)務(wù)的事務(wù)之間難以保證一致性,如果業(yè)務(wù)代碼出現(xiàn)異常,如果已發(fā)送的消息無(wú)法回滾,則很會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,RocketMQ的事務(wù)消息支持在業(yè)務(wù)邏輯與發(fā)送消息之間提供事務(wù)保證,RocketMQ通過(guò)兩階段的方式提供事務(wù)消息的支持。

首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無(wú)法保證消費(fèi)者一定能消費(fèi)成功!

  • 事務(wù)消息 :MQ 提供類(lèi)似 X/Open XA 的分布事務(wù)功能,通過(guò) MQ 事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
  • 半消息 :暫不能投遞的消息,發(fā)送方已經(jīng)將消息成功發(fā)送到了 MQ 服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半消息。
  • 半消息回查 :由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,MQ 服務(wù)端通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于“半消息”時(shí),需要主動(dòng)向消息生產(chǎn)者詢(xún)問(wèn)該消息的最終狀態(tài)(Commit 或是 Rollback),該過(guò)程即消息回查。

極端情況:是否任何情況下MQ的事務(wù)性消息都可以保證雙方的最終一致性?答案是否定的。

考慮上面提到的異常情況“情況2:MQ發(fā)送方在執(zhí)行完本地事務(wù)之后commit之前異常退出”。

在這種情況下如果如果MQ發(fā)送方由于運(yùn)維上的失誤長(zhǎng)時(shí)間不重啟MQ發(fā)送方,那么MQ在多次回查不成功之后將會(huì)丟棄該消息。

最終分布式事務(wù)的雙方是不能達(dá)到最終一致性了。當(dāng)然這個(gè)回查的最大值可以通過(guò)修改broker的參數(shù)transactionCheckMax來(lái)調(diào)整。但是過(guò)大的transactionCheckMax參數(shù)將會(huì)導(dǎo)致MQ堆積過(guò)多的半包消息,從而危害MQ的穩(wěn)定性,是個(gè)需要權(quán)衡的參數(shù)。

半消息事務(wù)實(shí)現(xiàn)流程

在這里插入圖片描述

流程:

1.發(fā)送方向 MQ 服務(wù)端發(fā)送事務(wù)消息;

2.MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。

3.發(fā)送方開(kāi)始執(zhí)行本地事務(wù)邏輯。

4.發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除

半消息,訂閱方將不會(huì)接受該消息。

5.在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過(guò)固定時(shí)間后 MQ Server 將對(duì)該消息發(fā)起消息回查。

6.發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

7.發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。

半消息的應(yīng)用場(chǎng)景

注冊(cè)系統(tǒng)注冊(cè)的流程中,用戶(hù)入口在網(wǎng)頁(yè)注冊(cè)系統(tǒng),通知系統(tǒng)在郵件系統(tǒng),兩個(gè)系統(tǒng)之間的數(shù)據(jù)需要保持最終一致。

普通消息處理

如上所述,注冊(cè)系統(tǒng)和郵件通知系統(tǒng)之間通過(guò)消息隊(duì)列進(jìn)行異步處理。注冊(cè)系統(tǒng)將注冊(cè)信息寫(xiě)入注冊(cè)系統(tǒng)之后,發(fā)送一條注冊(cè)成功的消息到消息隊(duì)列RocketMQ版,郵件通知系統(tǒng)訂閱消息隊(duì)列RocketMQ版的注冊(cè)消息,做相應(yīng)的業(yè)務(wù)處理,發(fā)送注冊(cè)成功或者失敗的郵件。

普通消息處理

在這里插入圖片描述

流程說(shuō)明如下:

1.注冊(cè)系統(tǒng)發(fā)起注冊(cè)。

2.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送注冊(cè)消息成功與否的消息。

2.1 消息發(fā)送成功,進(jìn)入3。

2.2 消息發(fā)送失敗,導(dǎo)致郵件通知系統(tǒng)未收到消息隊(duì)列RocketMQ版發(fā)送的注冊(cè)成功與否的消息,而無(wú)法發(fā)送郵件,最終郵件通知系統(tǒng)和注冊(cè)系統(tǒng)之間的狀態(tài)數(shù)據(jù)不一致,流程結(jié)束。

3.郵件通知系統(tǒng)收到消息隊(duì)列RocketMQ版的注冊(cè)成功消息。

4.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件給用戶(hù)。

在這樣的情況下,雖然實(shí)現(xiàn)了系統(tǒng)間的解耦,上游系統(tǒng)不需要關(guān)心下游系統(tǒng)的業(yè)務(wù)處理結(jié)果;但是數(shù)據(jù)一致性不好處理,如何保證郵件通知系統(tǒng)狀態(tài)與注冊(cè)系統(tǒng)狀態(tài)的最終一致。

事務(wù)消息處理

此時(shí),需要利用消息隊(duì)列RocketMQ版所提供的事務(wù)消息來(lái)實(shí)現(xiàn)系統(tǒng)間的狀態(tài)數(shù)據(jù)一致性。

在這里插入圖片描述

事務(wù)消息 流程說(shuō)明如下:

1.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半事務(wù)消息。

1.1 半事務(wù)消息發(fā)送成功,進(jìn)入2。

1.2 半事務(wù)消息發(fā)送失敗,注冊(cè)系統(tǒng)不進(jìn)行注冊(cè),流程結(jié)束。

說(shuō)明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

2.注冊(cè)系統(tǒng)開(kāi)始注冊(cè)。

2.1 注冊(cè)成功,進(jìn)入3.1。

2.2 注冊(cè)失敗,進(jìn)入3.2。

3.注冊(cè)系統(tǒng)向消息隊(duì)列RocketMQ版發(fā)送半消息狀態(tài)。

3.1 提交半事務(wù)消息,產(chǎn)生注冊(cè)成功消息,進(jìn)入4。

3.2 回滾半事務(wù)消息,未產(chǎn)生注冊(cè)成功消息,流程結(jié)束。

說(shuō)明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

4.郵件通知系統(tǒng)接收消息隊(duì)列RocketMQ版的注冊(cè)成功消息。

5.郵件通知系統(tǒng)發(fā)送注冊(cè)成功郵件。

說(shuō)明 最終注冊(cè)系統(tǒng)與郵件通知系統(tǒng)數(shù)據(jù)一致。

關(guān)于分布式事務(wù)消息的更多詳細(xì)內(nèi)容,請(qǐng)參見(jiàn)事務(wù)消息。

這一段是摘抄子阿里云的rocketmq文檔介紹,大概的意思就是我發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。

實(shí)踐

在這里插入圖片描述

1.模擬一個(gè)接口發(fā)送事務(wù)消息

@GetMapping("/sendMessage")
public String sendMessage(String cron) {
    //TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction
    //        (String.format("%s:%s", "FINORDER_MQ_TOPIC_KEY", "FINORDER_PAY_TAG_KEY"), MessageBuilder.withPayload("{\"name\": \"ok\"}").build(), "");
    Message<String> mqMessage = MessageBuilder
            .withPayload("OK 啦!")
            .setHeader("key", "ALLENS") // ①
            .build();
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("FINORDER_MQ_TOPIC_KEY:FINORDER_PAY_TAG_KEY" /* ② */, mqMessage , "");
    return "ok";
}

① 設(shè)置消息頭,這事務(wù)監(jiān)聽(tīng)里邊可以通過(guò)這個(gè)header來(lái)區(qū)分是哪一個(gè)事務(wù),假如單個(gè)微服務(wù)有多個(gè)事務(wù)消息就可以用這個(gè)來(lái)區(qū)分。

② TOPIC + groupid 用":"來(lái)分割

創(chuàng)建事務(wù)監(jiān)聽(tīng)器

@Service
@RocketMQTransactionListener
@Slf4j
public class TestTransactionListenerImpl implements RocketMQLocalTransactionListener {
	/**
	 * 每次推送消息會(huì)執(zhí)行executeLocalTransaction方法,首先會(huì)發(fā)送半消息,到這里的時(shí)候是執(zhí)行具體本地業(yè)務(wù),
     * 執(zhí)行成功后手動(dòng)返回RocketMQLocalTransactionState.COMMIT狀態(tài),
     * 這里是保證本地事務(wù)執(zhí)行成功,如果本地事務(wù)執(zhí)行失敗則可以返回ROLLBACK進(jìn)行消息回滾。 此時(shí)消息只是被保存到broker,并沒(méi)有發(fā)送到topic中,broker會(huì)根據(jù)本地返回的狀態(tài)來(lái)決定消息的處理方式。
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("接收到消息:" + msg);
        System.out.println("Header:" + msg.getHeaders().get("key"));
        return RocketMQLocalTransactionState.COMMIT;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        return RocketMQLocalTransactionState.COMMIT;
    }
}

該監(jiān)聽(tīng)器的實(shí)現(xiàn)有兩個(gè)方法一個(gè)是本地事務(wù)的執(zhí)行方法executeLocalTransaction,一個(gè)是本地事務(wù)回查方法checkLocalTransaction。 兩個(gè)方法的返回值類(lèi)型為RocketMQLocalTransactionState,該枚舉有三種:

// COMMIT:即生產(chǎn)者通知Rocket該消息可以消費(fèi)
RocketMQLocalTransactionState.COMMIT;
// ROLLBACK:即生產(chǎn)者通知Rocket將該消息刪除
RocketMQLocalTransactionState.ROLLBACK;
// UNKNOWN:即生產(chǎn)者通知Rocket繼續(xù)查詢(xún)?cè)撓⒌臓顟B(tài)
RocketMQLocalTransactionState.UNKNOWN;

對(duì)于長(zhǎng)時(shí)間沒(méi)有 Commit/Rollback 的事務(wù)消息( pending 狀態(tài)的消息),從服務(wù)端發(fā)起一次 回查Producer 收到回查消息,檢查回查消息對(duì)應(yīng)的 本地事務(wù)狀態(tài)根據(jù)本地事務(wù)狀態(tài),重新 Commit 或者 Rollback。

以上代碼中,如果sex是偶數(shù),executeLocalTransaction會(huì)拋出異常,本地事務(wù)會(huì)回滾,半消息狀態(tài)是UNKNOWN,此時(shí)就會(huì)啟動(dòng)消息的回查機(jī)制,mq會(huì)在一定的時(shí)間調(diào)用checkLocalTransaction方法查詢(xún)執(zhí)行狀態(tài),根據(jù)執(zhí)行狀態(tài)來(lái)決定是繼續(xù)回查、刪除消息、發(fā)送消息。

executeLocalTransaction也可以自己捕獲異常,手動(dòng)回滾事務(wù),返回RocketMQLocalTransactionState.ROLLBACK,這樣能減少消息回查。

等消息正常提交,半消息消息會(huì)移動(dòng)到發(fā)送指定的TOPIC隊(duì)里中,這個(gè)時(shí)候訂閱者就可以正常獲取消息了。

@Service
@RocketMQMessageListener(consumerGroup = "FINORDER_PAY_TAG_KEY",topic = "FINORDER_MQ_TOPIC_KEY")
@Slf4j
public class MQConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("consumer:" + message);
    }
}

總結(jié)

1、首需要注意的是 事務(wù)消息(半消息) 僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性 ,而投遞到MQ服務(wù)器后,并無(wú)法保證消費(fèi)者一定能消費(fèi)成功!

2、發(fā)消息的動(dòng)作要和一個(gè)本地事務(wù)進(jìn)行綁定,我如果發(fā)消息失敗那么你本地事務(wù)也不應(yīng)該執(zhí)行,我本地事務(wù)執(zhí)行失敗,那么消息也不應(yīng)該發(fā)。要保證上下游系統(tǒng)的數(shù)據(jù)是最終一致的,保證消息和本地事務(wù)一定是原子性的。

3、 半消息如果提交成功最終是要入隊(duì)列的,可以正常的收到消息,這個(gè)時(shí)候可以認(rèn)為上游系統(tǒng)的依賴(lài)條件肯定是已經(jīng)執(zhí)行成功了的。

到此這篇關(guān)于Rocketmq事務(wù)消息之半消息詳解的文章就介紹到這了,更多相關(guān)Rocketmq半消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 一文詳解Java中的原子操作

    一文詳解Java中的原子操作

    在Java中,原子操作尤為重要,尤其是在多線程環(huán)境中,想象一下,如果小黑在操作一個(gè)共享變量時(shí),這個(gè)操作被其他線程打斷,那會(huì)發(fā)生什么?可能會(huì)導(dǎo)致數(shù)據(jù)不一致,或者更糟糕的情況,本文將給大家詳細(xì)介紹一下Java中的原子操作
    2024-01-01
  • RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)

    RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)

    這篇文章主要為大家介紹了RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Spring?populateBean屬性賦值和自動(dòng)注入

    Spring?populateBean屬性賦值和自動(dòng)注入

    這篇文章主要為大家介紹了Spring?populateBean屬性賦值和自動(dòng)注入示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • 深入理解springboot中配置文件application.properties

    深入理解springboot中配置文件application.properties

    本文主要介紹了springboot中配置文件application.properties,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • 詳談Java幾種線程池類(lèi)型介紹及使用方法

    詳談Java幾種線程池類(lèi)型介紹及使用方法

    下面小編就為大家?guī)?lái)一篇詳談Java幾種線程池類(lèi)型介紹及使用方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-03-03
  • spring boot配置ssl(多cer格式)超詳細(xì)教程

    spring boot配置ssl(多cer格式)超詳細(xì)教程

    這篇文章主要介紹了spring boot配置ssl(多cer格式)超詳細(xì)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2023-11-11
  • Java8 Optional的詳細(xì)使用教程

    Java8 Optional的詳細(xì)使用教程

    這篇文章主要給大家介紹了關(guān)于Java8 Optional的詳細(xì)使用教程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • SpringBoot使用PageHelper插件實(shí)現(xiàn)Mybatis分頁(yè)效果

    SpringBoot使用PageHelper插件實(shí)現(xiàn)Mybatis分頁(yè)效果

    這篇文章主要介紹了SpringBoot使用PageHelper插件實(shí)現(xiàn)Mybatis分頁(yè)效果,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2024-02-02
  • SpringBoot啟動(dòng)參數(shù)的實(shí)現(xiàn)

    SpringBoot啟動(dòng)參數(shù)的實(shí)現(xiàn)

    SpringBoot通過(guò)jar文件方式啟動(dòng),配置可以通過(guò)啟動(dòng)參數(shù)進(jìn)行覆蓋,本文就來(lái)介紹一下SpringBoot啟動(dòng)參數(shù)的實(shí)現(xiàn),感興趣的可以了解一下
    2025-01-01
  • Java設(shè)計(jì)模式之迭代器模式

    Java設(shè)計(jì)模式之迭代器模式

    這篇文章介紹了Java設(shè)計(jì)模式之迭代器模式,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-10-10

最新評(píng)論