RabbitMQ,RocketMQ,Kafka?事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略問題
RabbitMQ,RocketMQ,Kafka 事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略 消息隊(duì)列常見問題處理分布式事務(wù)什么是分布式事務(wù)常見的分布式事務(wù)解決方案基于 MQ 實(shí)現(xiàn)的分布式事務(wù)本地消息表-最終一致性MQ事務(wù)-最終一致性RocketMQ中如何處理事務(wù)Kafka中如何處理事務(wù)RabbitMQ中的事務(wù)消息防丟失生產(chǎn)階段防止消息丟失RabbitMQ 中的防丟失措施Kafka 中的防丟失措施RocketMQ 中的防丟失措施存儲(chǔ)階段RabbitMQ 中的防丟失措施Kafka 中的防丟失措施RocketMQ 中的防丟失措施消費(fèi)階段消息重復(fù)發(fā)送消息的順序性參考
消息隊(duì)列常見問題處理
分布式事務(wù)
什么是分布式事務(wù)
我們的服務(wù)器從單機(jī)發(fā)展到擁有多臺(tái)機(jī)器的分布式系統(tǒng),各個(gè)系統(tǒng)之前需要借助于網(wǎng)絡(luò)進(jìn)行通信,原有單機(jī)中相對(duì)可靠的方法調(diào)用以及進(jìn)程間通信方式已經(jīng)沒有辦法使用,同時(shí)網(wǎng)絡(luò)環(huán)境也是不穩(wěn)定的,造成了我們多個(gè)機(jī)器之間的數(shù)據(jù)同步問題,這就是典型的分布式事務(wù)問題。
在分布式事務(wù)中事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點(diǎn)之上。分布式事務(wù)就是要保證不同節(jié)點(diǎn)之間的數(shù)據(jù)一致性。
常見的分布式事務(wù)解決方案
1、2PC(二階段提交)方案 - 強(qiáng)一致性
2、3PC(三階段提交)方案
3、TCC (Try-Confirm-Cancel)事務(wù) - 最終一致性
4、Saga事務(wù) - 最終一致性
5、本地消息表 - 最終一致性
6、MQ事務(wù) - 最終一致性
這里重點(diǎn)關(guān)注下使用消息隊(duì)列實(shí)現(xiàn)分布式的一致性,上面幾種的分布式設(shè)計(jì)方案的具體細(xì)節(jié)可參見文章最后的引用鏈接
基于 MQ 實(shí)現(xiàn)的分布式事務(wù)
本地消息表-最終一致性
消息的生產(chǎn)方,除了維護(hù)自己的業(yè)務(wù)邏輯之外,同時(shí)需要維護(hù)一個(gè)消息表。這個(gè)消息表里面記錄的就是需要同步到別的服務(wù)的信息,當(dāng)然這個(gè)消息表,每個(gè)消息都有一個(gè)狀態(tài)值,來標(biāo)識(shí)這個(gè)消息有沒有被成功處理。
發(fā)送放的業(yè)務(wù)邏輯以及消息表中數(shù)據(jù)的插入將在一個(gè)事務(wù)中完成,這樣避免了業(yè)務(wù)處理成功 + 事務(wù)消息發(fā)送失敗
,或業(yè)務(wù)處理失敗 + 事務(wù)消息發(fā)送成功
,這個(gè)問題。
舉個(gè)栗子:
我們假定目前有兩個(gè)服務(wù),訂單服務(wù),購(gòu)物車服務(wù),用戶在購(gòu)物車中對(duì)幾個(gè)商品進(jìn)行合并下單,之后需要清空購(gòu)物車中剛剛已經(jīng)下單的商品信息。
1、消息的生產(chǎn)方也就是訂單服務(wù),完成了自己的邏輯(對(duì)商品進(jìn)行下單操作)然后把這個(gè)消息通過 mq 發(fā)送到需要進(jìn)行數(shù)據(jù)同步的其他服務(wù)中,也就是我們栗子中的購(gòu)物車服務(wù)。
2、其他服務(wù)(購(gòu)物車服務(wù))會(huì)監(jiān)聽這個(gè)隊(duì)列;
1、如果收到這個(gè)消息,并且數(shù)據(jù)同步執(zhí)行成功了,當(dāng)然這也是一個(gè)本地事務(wù),就通過 mq 回復(fù)消息的生產(chǎn)方(訂單服務(wù))消息已經(jīng)處理了,然后生產(chǎn)方就能標(biāo)識(shí)本次事務(wù)已經(jīng)結(jié)束。如果是一個(gè)業(yè)務(wù)上的錯(cuò)誤,就回復(fù)消息的生產(chǎn)方,需要進(jìn)行數(shù)據(jù)回滾了。
2、很久沒收到這個(gè)消息,這種情況是不會(huì)發(fā)生的,消息的發(fā)送方會(huì)有一個(gè)定時(shí)的任務(wù),會(huì)定時(shí)重試發(fā)送消息表中還沒有處理的消息;
3、消息的生產(chǎn)方(訂單服務(wù))如果收到消息回執(zhí);
1、成功的話就修改本次消息已經(jīng)處理完,也就是本次分布式事務(wù)的同步已經(jīng)完成;
2、如果消息的結(jié)果是執(zhí)行失敗,同時(shí)在本地回滾本次事務(wù),標(biāo)識(shí)消息已經(jīng)處理完成;
3、如果消息丟失,也就是回執(zhí)消息沒有收到,這種情況也不太會(huì)發(fā)生,消息的發(fā)送方(訂單服務(wù))會(huì)有一個(gè)定時(shí)的任務(wù),定時(shí)重試發(fā)送消息表中還沒有處理的消息,下游的服務(wù)需要做冪等,可能會(huì)收到多次重復(fù)的消息,如果一個(gè)回復(fù)消息生產(chǎn)方中的某個(gè)回執(zhí)信息丟失了,后面持續(xù)收到生產(chǎn)方的 mq 消息,然后再次回復(fù)消息的生產(chǎn)方回執(zhí)信息,這樣總能保證發(fā)送者能成功收到回執(zhí),消息的生產(chǎn)方在接收回執(zhí)消息的時(shí)候也要做到冪等性。
這里有兩個(gè)很重要的操作:
1、服務(wù)器處理消息需要是冪等的,消息的生產(chǎn)方和接收方都需要做到冪等性;
2、發(fā)送放需要添加一個(gè)定時(shí)器來遍歷重推未處理的消息,避免消息丟失,造成的事務(wù)執(zhí)行斷裂。
該方案的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
1、在設(shè)計(jì)層面上實(shí)現(xiàn)了消息數(shù)據(jù)的可靠性,不依賴消息中間件,弱化了對(duì) mq 特性的依賴。
2、簡(jiǎn)單,易于實(shí)現(xiàn)。
缺點(diǎn):
主要是需要和業(yè)務(wù)數(shù)據(jù)綁定到一起,耦合性比較高,使用相同的數(shù)據(jù)庫(kù),會(huì)占用業(yè)務(wù)數(shù)據(jù)庫(kù)的一些資源。
MQ事務(wù)-最終一致性
下面分析下幾種消息隊(duì)列對(duì)事務(wù)的支持
RocketMQ中如何處理事務(wù)
RocketMQ 中的事務(wù),它解決的問題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個(gè)操作,要么都成功,要么都失敗。并且,RocketMQ 增加了一個(gè)事務(wù)反查的機(jī)制,來盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。
主要是兩個(gè)方面,正常的事務(wù)提交和事務(wù)消息補(bǔ)償
正常的事務(wù)提交
1、發(fā)送消息(half消息),這個(gè) half 消息和普通消息的區(qū)別,在事務(wù)提交 之前,對(duì)于消費(fèi)者來說,這個(gè)消息是不可見的。
2、MQ SERVER
寫入信息,并且返回響應(yīng)的結(jié)果;
3、根據(jù)MQ SERVER
響應(yīng)的結(jié)果,決定是否執(zhí)行本地事務(wù),如果MQ SERVER
寫入信息成功執(zhí)行本地事務(wù),否則不執(zhí)行;
4、根據(jù)本地事務(wù)執(zhí)行的狀態(tài),決定是否對(duì)事務(wù)進(jìn)行 Commit 或者 Rollback。MQ SERVER
收到 Commit,之后就會(huì)投遞該消息到下游的訂閱服務(wù),下游的訂閱服務(wù)就能進(jìn)行數(shù)據(jù)同步,如果是 Rollback 則該消息就會(huì)被丟失;
如果MQ SERVER
沒有收到 Commit 或者 Rollback 的消息,這種情況就需要進(jìn)行補(bǔ)償流程了
補(bǔ)償流程
1、MQ SERVER
如果沒有收到來自消息發(fā)送方的 Commit 或者 Rollback 消息,就會(huì)向消息發(fā)送端也就是我們的服務(wù)器發(fā)起一次查詢,查詢當(dāng)前消息的狀態(tài);
2、消息發(fā)送方收到對(duì)應(yīng)的查詢請(qǐng)求,查詢事務(wù)的狀態(tài),然后把狀態(tài)重新推送給MQ SERVER
,MQ SERVER
就能之后后續(xù)的流程了。
相比于本地消息表來處理分布式事務(wù),MQ 事務(wù)是把原本應(yīng)該在本地消息表中處理的邏輯放到了 MQ 中來完成。
Kafka中如何處理事務(wù)
Kafka 中的事務(wù)解決問題,確保在一個(gè)事務(wù)中發(fā)送的多條信息,要么都成功,要么都失敗。也就是保證對(duì)多個(gè)分區(qū)寫入操作的原子性。
通過配合 Kafka 的冪等機(jī)制來實(shí)現(xiàn) Kafka 的 Exactly Once
,滿足了讀取-處理-寫入
這種模式的應(yīng)用程序。當(dāng)然 Kafka 中的事務(wù)主要也是來處理這種模式的。
什么是讀取-處理-寫入
模式呢?
栗如:在流計(jì)算中,用 Kafka 作為數(shù)據(jù)源,并且將計(jì)算結(jié)果保存到 Kafka 這種場(chǎng)景下,數(shù)據(jù)從 Kafka 的某個(gè)主題中消費(fèi),在計(jì)算集群中計(jì)算,再把計(jì)算結(jié)果保存在 Kafka 的其他主題中。這個(gè)過程中,要保證每條消息只被處理一次,這樣才能保證最終結(jié)果的成功。Kafka 事務(wù)的原子性就保證了,讀取和寫入的原子性,兩者要不一起成功,要不就一起失敗回滾。
這里來分析下 Kafka 的事務(wù)是如何實(shí)現(xiàn)的
它的實(shí)現(xiàn)原理和 RocketMQ 的事務(wù)是差不多的,都是基于兩階段提交來實(shí)現(xiàn)的,在實(shí)現(xiàn)上可能更麻煩
先來介紹下事務(wù)協(xié)調(diào)者,為了解決分布式事務(wù)問題,Kafka 引入了事務(wù)協(xié)調(diào)者這個(gè)角色,負(fù)責(zé)在服務(wù)端協(xié)調(diào)整個(gè)事務(wù)。這個(gè)協(xié)調(diào)者并不是一個(gè)獨(dú)立的進(jìn)程,而是 Broker 進(jìn)程的一部分,協(xié)調(diào)者和分區(qū)一樣通過選舉來保證自身的可用性。
Kafka 集群中也有一個(gè)特殊的用于記錄事務(wù)日志的主題,里面記錄的都是事務(wù)的日志。同時(shí)會(huì)有多個(gè)協(xié)調(diào)者的存在,每個(gè)協(xié)調(diào)者負(fù)責(zé)管理和使用事務(wù)日志中的幾個(gè)分區(qū)。這樣能夠并行的執(zhí)行事務(wù),提高性能。
下面看下具體的流程
1、首先在開啟事務(wù)的時(shí)候,生產(chǎn)者會(huì)給協(xié)調(diào)者發(fā)送一個(gè)開啟事務(wù)的請(qǐng)求,協(xié)調(diào)者在事務(wù)日志中記錄下事務(wù)ID;
2、然后生產(chǎn)者開始發(fā)送事務(wù)消息給協(xié)調(diào)者,不過需要先發(fā)送消息告知協(xié)調(diào)者在哪個(gè)主題和分區(qū),之后就正常的發(fā)送事務(wù)消息,這些事務(wù)消息不像 RocketMQ 會(huì)保存在特殊的隊(duì)列中,Kafka 未提交的事務(wù)消息和普通的消息一樣,只是在消費(fèi)的時(shí)候依賴客戶端進(jìn)行過濾。
3、消息發(fā)送完成,生產(chǎn)者根據(jù)自己的執(zhí)行的狀態(tài)對(duì)協(xié)調(diào)者進(jìn)行事務(wù)的提交或者回滾;
事務(wù)的提交
1、協(xié)調(diào)者設(shè)置事務(wù)的狀態(tài)為PrepareCommit,寫入到事務(wù)日志中;
2、協(xié)調(diào)者在每個(gè)分區(qū)中寫入事務(wù)結(jié)束的標(biāo)識(shí),然后客戶端就能把之前過濾的未提交的事務(wù)消息放行給消費(fèi)端進(jìn)行消費(fèi)了;
事務(wù)的回滾
1、協(xié)調(diào)者設(shè)置事務(wù)的狀態(tài)為PrepareAbort,寫入到事務(wù)日志中;
2、協(xié)調(diào)者在每個(gè)分區(qū)中寫入事務(wù)回滾的標(biāo)識(shí),然后之前未提交的事務(wù)消息就能被丟棄了;
這里引用一下【消息隊(duì)列高手課中的圖片】
RabbitMQ中的事務(wù)
RabbitMQ 中事務(wù)解決的問題是確保生產(chǎn)者的消息到達(dá)MQ SERVER
,這和其他 MQ 事務(wù)還是有點(diǎn)差別的,這里也不展開討論了。
消息防丟失
先來分析下一條消息在 MQ 中流轉(zhuǎn)所經(jīng)歷的階段。
生產(chǎn)階段:生產(chǎn)者產(chǎn)生消息,通過網(wǎng)絡(luò)發(fā)送到 Broker 端。
存儲(chǔ)階段:Broker 拿到消息,需要進(jìn)行落盤,如果是集群版的 MQ 還需要同步數(shù)據(jù)到其他節(jié)點(diǎn)。
消費(fèi)階段:消費(fèi)者在 Broker 端拉數(shù)據(jù),通過網(wǎng)絡(luò)傳輸?shù)竭_(dá)消費(fèi)者端。
生產(chǎn)階段防止消息丟失
發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等這些會(huì)導(dǎo)致消息的丟失
RabbitMQ 中的防丟失措施
1、對(duì)于可以感知的錯(cuò)誤,我們捕獲錯(cuò)誤,然后重新投遞;
2、通過 RabbitMQ 中的事務(wù)解決,RabbitMQ 中的事務(wù)解決的就是生產(chǎn)階段消息丟失的問題;
在生產(chǎn)者發(fā)送消息之前,通過channel.txSelect
開啟一個(gè)事務(wù),接著發(fā)送消息, 如果消息投遞 server 失敗,進(jìn)行事務(wù)回滾channel.txRollback
,然后重新發(fā)送, 如果 server 收到消息,就提交事務(wù)channel.txCommit
不過使用事務(wù)性能不好,這是同步操作,一條消息發(fā)送之后會(huì)使發(fā)送端阻塞,以等待RabbitMQ Server
的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會(huì)大大降低。
3、使用發(fā)送確認(rèn)機(jī)制。
使用確認(rèn)機(jī)制,生產(chǎn)者將信道設(shè)置成 confirm 確認(rèn)模式,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一 deliveryTag 和 multiple 參數(shù)),這就使得生產(chǎn)者知曉消息已經(jīng)正確到達(dá)了目的地了。
multiple 為 true 表示的是批量的消息確認(rèn),為 true 的時(shí)候,表示小于等于返回的 deliveryTag 的消息 id 都已經(jīng)確認(rèn)了,為 false 表示的是消息 id 為返回的 deliveryTag 的消息,已經(jīng)確認(rèn)了。
確認(rèn)機(jī)制有三種類型
1、同步確認(rèn)
2、批量確認(rèn)
3、異步確認(rèn)
同步模式的效率很低,因?yàn)槊恳粭l消息度都需要等待確認(rèn)好之后,才能處理下一條;
批量確認(rèn)模式相比同步模式效率是很高,不過有個(gè)致命的缺陷,一旦回復(fù)確認(rèn)失敗,當(dāng)前確認(rèn)批次的消息會(huì)全部重新發(fā)送,導(dǎo)致消息重復(fù)發(fā)送;
異步模式就是個(gè)很好的選擇了,不會(huì)有同步模式的阻塞問題,同時(shí)效率也很高,是個(gè)不錯(cuò)的選擇。
Kafka 中的防丟失措施
Kafaka 中引入了一個(gè) broker。 broker 會(huì)對(duì)生產(chǎn)者和消費(fèi)者進(jìn)行消息的確認(rèn),生產(chǎn)者發(fā)送消息到 broker,如果沒有收到 broker 的確認(rèn)就可以選擇繼續(xù)發(fā)送。
只要 Producer 收到了 Broker 的確認(rèn)響應(yīng),就可以保證消息在生產(chǎn)階段不會(huì)丟失。有些消息隊(duì)列在長(zhǎng)時(shí)間沒收到發(fā)送確認(rèn)響應(yīng)后,會(huì)自動(dòng)重試,如果重試再失敗,就會(huì)以返回值或者異常的方式告知用戶。
只要正確處理 Broker 的確認(rèn)響應(yīng),就可以避免消息的丟失。
RocketMQ 中的防丟失措施使用 SYNC 的發(fā)送消息方式,等待 broker 處理結(jié)果
RocketMQ 提供了3種發(fā)送消息方式,分別是:
同步發(fā)送:Producer 向 broker 發(fā)送消息,阻塞當(dāng)前線程等待 broker 響應(yīng) 發(fā)送結(jié)果。
異步發(fā)送:Producer 首先構(gòu)建一個(gè)向 broker 發(fā)送消息的任務(wù),把該任務(wù)提交給線程池,等執(zhí)行完該任務(wù)時(shí),回調(diào)用戶自定義的回調(diào)函數(shù),執(zhí)行處理結(jié)果。
Oneway發(fā)送:Oneway 方式只負(fù)責(zé)發(fā)送請(qǐng)求,不等待應(yīng)答,Producer 只負(fù)責(zé)把請(qǐng)求發(fā)出去,而不處理響應(yīng)結(jié)果。
使用事務(wù),RocketMQ 中的事務(wù),它解決的問題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個(gè)操作,要么都成功,要么都失敗。
存儲(chǔ)階段
在存儲(chǔ)階段正常情況下,只要 Broker 在正常運(yùn)行,就不會(huì)出現(xiàn)丟失消息的問題,但是如果 Broker 出現(xiàn)了故障,比如進(jìn)程死掉了或者服務(wù)器宕機(jī)了,還是可能會(huì)丟失消息的。
RabbitMQ 中的防丟失措施
防止在存儲(chǔ)階段消息額丟失,可以做持久化,防止異常情況(重啟,關(guān)閉,宕機(jī))。。。
RabbitMQ 持久化中有三部分:
交換器的持久化
交換器的持久化,是通過在聲明隊(duì)列時(shí)將 durable 參數(shù)置為 true 實(shí)現(xiàn)的,不設(shè)置持久化的話,交換器的信息將會(huì)丟失。
隊(duì)列持久化
隊(duì)列的持久化,是通過在聲明隊(duì)列時(shí)將 durable 參數(shù)置為 true 實(shí)現(xiàn)的,隊(duì)列的持久化能保證其本身的元數(shù)據(jù)不會(huì)因異常情況而丟失,但是并不能保證內(nèi)部所存儲(chǔ)的消息不會(huì)丟失。
消息的持久化
消息的持久化,在投遞時(shí)指定 delivery_mode=2
(1是非持久化),消息的持久化,需要配合隊(duì)列的持久,只設(shè)置消息的持久化,重啟之后隊(duì)列消失,繼而消息也會(huì)丟失。所以如果只設(shè)置消息持久化而不設(shè)置隊(duì)列的持久化意義不大。
對(duì)于持久化,如果所有的消息都設(shè)置持久化,會(huì)影響寫入的性能,所以可以選擇對(duì)可靠性要求比較高的消息進(jìn)行持久化處理。
不過消息持久化并不能百分之百避免消息的丟失
比如數(shù)據(jù)在落盤的過程中宕機(jī)了,消息還沒及時(shí)同步到內(nèi)存中,這也是會(huì)丟數(shù)據(jù)的,這種問題可以通過引入鏡像隊(duì)列來解決。
鏡像隊(duì)列的作用:引入鏡像隊(duì)列,可已將隊(duì)列鏡像到集群中的其他 Broker 節(jié)點(diǎn)之上,如果集群中的一個(gè)節(jié)點(diǎn)失效了,隊(duì)列能夠自動(dòng)切換到鏡像中的另一個(gè)節(jié)點(diǎn)上來保證服務(wù)的可用性。(更細(xì)節(jié)的這里不展開討論了)
Kafka 中的防丟失措施
操作系統(tǒng)本身有一層緩存,叫做 Page Cache,當(dāng)往磁盤文件寫入的時(shí)候,系統(tǒng)會(huì)先將數(shù)據(jù)流寫入緩存中。
Kafka 收到消息后也會(huì)先存儲(chǔ)在也緩存中(Page Cache)中,之后由操作系統(tǒng)根據(jù)自己的策略進(jìn)行刷盤或者通過 fsync 命令強(qiáng)制刷盤。如果系統(tǒng)掛掉,在 PageCache 中的數(shù)據(jù)就會(huì)丟失。也就是對(duì)應(yīng)的 Broker 中的數(shù)據(jù)就會(huì)丟失了。
處理思路
1、控制競(jìng)選分區(qū) leader 的 Broker。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。
2、控制消息能夠被寫入到多個(gè)副本中才能提交,這樣避免上面的問題1。
RocketMQ 中的防丟失措施
1、將刷盤方式改成同步刷盤;
2、對(duì)于多個(gè)節(jié)點(diǎn)的 Broker,需要將 Broker 集群配置成:至少將消息發(fā)送到 2 個(gè)以上的節(jié)點(diǎn),再給客戶端回復(fù)發(fā)送確認(rèn)響應(yīng)。這樣當(dāng)某個(gè) Broker 宕機(jī)時(shí),其他的 Broker 可以替代宕機(jī)的 Broker,也不會(huì)發(fā)生消息丟失。
消費(fèi)階段
消費(fèi)階段就很簡(jiǎn)單了,如果在網(wǎng)絡(luò)傳輸中丟失,這個(gè)消息之后還會(huì)持續(xù)的推送給消費(fèi)者,在消費(fèi)階段我們只需要控制在業(yè)務(wù)邏輯處理完成之后再去進(jìn)行消費(fèi)確認(rèn)就行了。
總結(jié):對(duì)于消息的丟失,也可以借助于本地消息表的思路,消息產(chǎn)生的時(shí)候進(jìn)行消息的落盤,長(zhǎng)時(shí)間未處理的消息,使用定時(shí)重推到隊(duì)列中。
消息重復(fù)發(fā)送
消息在 MQ 中的傳遞,大致可以歸類為下面三種:
1、At most once: 至多一次。消息在傳遞時(shí),最多會(huì)被送達(dá)一次。是不安全的,可能會(huì)丟數(shù)據(jù)。
2、At least once: 至少一次。消息在傳遞時(shí),至少會(huì)被送達(dá)一次。也就是說,不允許丟消息,但是允許有少量重復(fù)消息出現(xiàn)。
3、Exactly once:恰好一次。消息在傳遞時(shí),只會(huì)被送達(dá)一次,不允許丟失也不允許重復(fù),這個(gè)是最高的等級(jí)。
大部分消息隊(duì)列滿足的都是At least once
,也就是可以允許重復(fù)的消息出現(xiàn)。
我們消費(fèi)者需要滿足冪等性,通常有下面幾種處理方案
1、利用數(shù)據(jù)庫(kù)的唯一性
根據(jù)業(yè)務(wù)情況,選定業(yè)務(wù)中能夠判定唯一的值作為數(shù)據(jù)庫(kù)的唯一鍵,新建一個(gè)流水表,然后執(zhí)行業(yè)務(wù)操作和流水表數(shù)據(jù)的插入放在同一事務(wù)中,如果流水表數(shù)據(jù)已經(jīng)存在,那么就執(zhí)行失敗,借此保證冪等性。也可先查詢流水表的數(shù)據(jù),沒有數(shù)據(jù)然后執(zhí)行業(yè)務(wù),插入流水表數(shù)據(jù)。不過需要注意,數(shù)據(jù)庫(kù)讀寫延遲的情況。
2、數(shù)據(jù)庫(kù)的更新增加前置條件
3、給消息帶上唯一ID
每條消息加上唯一ID,利用方法1中通過增加流水表,借助數(shù)據(jù)庫(kù)的唯一性來處理重復(fù)消息的消費(fèi)。
消息的順序性
保證消息隊(duì)列的順序性,處理思路就是生產(chǎn)者保證入隊(duì)有序,消費(fèi)者保證出隊(duì)之后的消息消費(fèi)的順序性。
生產(chǎn)者
因?yàn)橥粋€(gè)消息隊(duì)列中的消息是有序的,所以生產(chǎn)者可以借助于算法,把需要進(jìn)行有序消費(fèi)的數(shù)據(jù)并且有相同的數(shù)據(jù)特性的數(shù)據(jù)(比如:同一個(gè)訂單的消息)放入到同一個(gè)消息隊(duì)列中,這樣消費(fèi)者去接收數(shù)據(jù)就能順序性的取出數(shù)據(jù)。
使用那種算法呢,哈希就是一個(gè)不過的選擇,如果處理的是訂單業(yè)務(wù),可以對(duì)訂單號(hào)進(jìn)行哈希操作,保證同一個(gè)訂單總能落入到同一個(gè)隊(duì)列中,這樣就在生產(chǎn)端保證了隊(duì)列中的消息是有序的。
當(dāng)然上文講的消息隊(duì)列是個(gè)統(tǒng)稱:
RabbitMQ 就是保證有相同數(shù)據(jù)特性的數(shù)據(jù)落入到同一個(gè) queue 中,同一個(gè) queue 中的數(shù)據(jù)是有序的;
Kafka 中就是保證有相同數(shù)據(jù)特性的數(shù)據(jù)落入到同一個(gè) partition 中,同一個(gè) partition 的消息是有序的;
RocketMQ 中就是保證有相同數(shù)據(jù)特性的數(shù)據(jù)落入同一個(gè) MessageQueue 中,同一個(gè) MessageQueue 中的消息是有序的。
三者消息模型的區(qū)別可參見RabbitMQ,RocketMQ,Kafka 消息模型對(duì)比分析
消費(fèi)者
如果單個(gè)消息者,并且消費(fèi)者只有一個(gè)線程來處理消息,那么順序性是可以得到保證的,因?yàn)殛?duì)列中的消息已經(jīng)是有序的了。
不過吞吐量大了,單個(gè)消費(fèi)者性能跟不上了如何處理
1、我們可以增加隊(duì)列的數(shù)量,一個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者,隊(duì)列的數(shù)量多了也就意味著消費(fèi)者的數(shù)量增加了;
2、增加單個(gè)消費(fèi)者的線程數(shù)量,因?yàn)樵黾恿司€程的數(shù)量,每個(gè)線程的消費(fèi)速度是不同的,所以這里還需要通過算法,把有相同的數(shù)據(jù)特性的數(shù)據(jù)放入到固定的線程中消費(fèi)。
不過針對(duì)消費(fèi)者的處理,可能要根據(jù)實(shí)際場(chǎng)景來進(jìn)行判斷了,如果沒有一個(gè)可進(jìn)行區(qū)分的特性數(shù)據(jù),那就只能通過單消費(fèi)者,單線程來進(jìn)行消費(fèi)了。同時(shí)生產(chǎn)者也只能推送消息到同一個(gè)消息隊(duì)列中。
參考
【消息隊(duì)列高手課】https://time.geekbang.org/column/intro/100032301
【消息隊(duì)列設(shè)計(jì)精要】https://tech.meituan.com/2016/07/01/mq-design.html
【RabbitMQ實(shí)戰(zhàn)指南】https://book.douban.com/subject/27591386/
【分布式事務(wù)最經(jīng)典的七種解決方案】https://segmentfault.com/a/1190000040321750
【分布式事務(wù)的實(shí)現(xiàn)原理】https://draveness.me/distributed-transaction-principle/
【理解分布式事務(wù)】https://juejin.cn/post/6844903734753886216
【設(shè)計(jì)(design)】https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
【Kafka 是如何實(shí)現(xiàn)事務(wù)的?】https://zhuanlan.zhihu.com/p/163683403
【MQ - RabbitMQ Cluster】https://www.cnblogs.com/Neeo/articles/13915836.html
【如何避免消息丟失?】https://www.lixueduan.com/post/kafka/09-avoid-msg-lost/
【RabbitMQ,RocketMQ,Kafka 事務(wù)性,消息丟失和消息重復(fù)發(fā)送的處理策略】https://boilingfrog.github.io/2021/12/30/消息隊(duì)列中事務(wù)性消息丟失重復(fù)發(fā)送的處理/#rocketmq-中的防丟失措施
到此這篇關(guān)于RabbitMQ,RocketMQ,Kafka 事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略的文章就介紹到這了,更多相關(guān)RabbitMQ,RocketMQ,Kafka事務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java Spring整合Freemarker的詳細(xì)步驟
本文對(duì)Spring整合Freemarker步驟做了詳細(xì)的說明,按步驟操作一定可以整合通過,這里提供給大家做參考2013-11-11解決Dubbo應(yīng)用啟動(dòng)注冊(cè)ZK獲取IP慢的原因之一
這篇文章主要介紹了解決Dubbo應(yīng)用啟動(dòng)注冊(cè)ZK獲取IP慢的原因之一,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04Mybatis中BindingException異常的產(chǎn)生原因及解決過程
BindingException異常是MyBatis框架中自定義的異常,顧名思義指的是綁定出現(xiàn)問題,下面這篇文章主要給大家介紹了關(guān)于MyBatis報(bào)錯(cuò)BindingException異常的產(chǎn)生原因及解決過程,需要的朋友可以參考下2023-06-06springmvc path請(qǐng)求映射到bean 方法的流程
這篇文章主要介紹了springmvc path請(qǐng)求映射到bean 方法的流程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07Java五子棋簡(jiǎn)單實(shí)現(xiàn)代碼舉例
Java五子棋游戲是一種經(jīng)典的兩人對(duì)戰(zhàn)棋類游戲,它基于簡(jiǎn)單的規(guī)則,即任何一方的棋子在棋盤上形成連續(xù)的五個(gè),無論是橫、豎還是斜線,都將獲勝,這篇文章主要介紹了Java五子棋實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2024-10-10struts2.5+框架使用通配符與動(dòng)態(tài)方法常見問題小結(jié)
這篇文章主要介紹了struts2.5+框架使用通配符與動(dòng)態(tài)方法常見問題 ,在文中給大家提到了Struts2.5框架使用通配符指定方法 ,需要的朋友可以參考下2018-09-09Spring Boot整合Elasticsearch實(shí)現(xiàn)全文搜索引擎案例解析
ElasticSearch作為基于Lucene的搜索服務(wù)器,既可以作為一個(gè)獨(dú)立的服務(wù)部署,也可以簽入Web應(yīng)用中。SpringBoot作為Spring家族的全新框架,使得使用SpringBoot開發(fā)Spring應(yīng)用變得非常簡(jiǎn)單,在本案例中我們給大家介紹Spring Boot整合Elasticsearch實(shí)現(xiàn)全文搜索引擎2017-11-11解決springboot中mongodb不啟動(dòng)及Dao不能被掃描到的問題
這篇文章主要介紹了解決springboot中mongodb不啟動(dòng)及Dao不能被掃描到的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Spring Boot使用AOP實(shí)現(xiàn)REST接口簡(jiǎn)易靈活的安全認(rèn)證的方法
這篇文章主要介紹了Spring Boot使用AOP實(shí)現(xiàn)REST接口簡(jiǎn)易靈活的安全認(rèn)證的方法,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2018-11-11Spring五大類注解讀取存儲(chǔ)Bean對(duì)象的方法
這篇文章主要介紹了Spring五大類注解讀取存儲(chǔ)Bean對(duì)象,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09