RocketMQ設計之故障規(guī)避機制
NameServer
為了簡化和客戶端通信,發(fā)現(xiàn)Broker故障時并不會立即通知客戶端。故障規(guī)避機制就是用來解決當Broker出現(xiàn)故障,Producer
不能及時感知而導致消息發(fā)送失敗的問題。默認不開啟,如果開啟,消息發(fā)送失敗的時候會將失敗的Broker暫時排除在隊列選擇列表外
MQFaultStrategy類的:
public class MQFaultStrategy { ? ? private final static InternalLogger log = ClientLogger.getLog(); ? ? private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); ? ? private boolean sendLatencyFaultEnable = false; ? ? private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; ? ? private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; ? ? public long[] getNotAvailableDuration() { ? ? ? ? return notAvailableDuration; ? ? } ? ? public void setNotAvailableDuration(final long[] notAvailableDuration) { ? ? ? ? this.notAvailableDuration = notAvailableDuration; ? ? } ? ? public long[] getLatencyMax() { ? ? ? ? return latencyMax; ? ? } ? ? public void setLatencyMax(final long[] latencyMax) { ? ? ? ? this.latencyMax = latencyMax; ? ? } ? ? public boolean isSendLatencyFaultEnable() { ? ? ? ? return sendLatencyFaultEnable; ? ? } ? ? public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { ? ? ? ? this.sendLatencyFaultEnable = sendLatencyFaultEnable; ? ? } ? ? public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { ? ? ? ? //是否開啟故障延遲機制 ? ? ? ? if (this.sendLatencyFaultEnable) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? int index = tpInfo.getSendWhichQueue().getAndIncrement(); ? ? ? ? ? ? ? ? for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { ? ? ? ? ? ? ? ? ? ? int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); ? ? ? ? ? ? ? ? ? ? if (pos < 0) ? ? ? ? ? ? ? ? ? ? ? ? pos = 0; ? ? ? ? ? ? ? ? ? ? MessageQueue mq = tpInfo.getMessageQueueList().get(pos); ? ? ? ? ? ? ? ? ? ? //判斷Queue是否可用 ? ? ? ? ? ? ? ? ? ? if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { ? ? ? ? ? ? ? ? ? ? ? ? if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) ? ? ? ? ? ? ? ? ? ? ? ? ? ? return mq; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); ? ? ? ? ? ? ? ? int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); ? ? ? ? ? ? ? ? if (writeQueueNums > 0) { ? ? ? ? ? ? ? ? ? ? final MessageQueue mq = tpInfo.selectOneMessageQueue(); ? ? ? ? ? ? ? ? ? ? if (notBestBroker != null) { ? ? ? ? ? ? ? ? ? ? ? ? mq.setBrokerName(notBestBroker); ? ? ? ? ? ? ? ? ? ? ? ? mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return mq; ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? latencyFaultTolerance.remove(notBestBroker); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? log.error("Error occurred when selecting message queue", e); ? ? ? ? ? ? } ? ? ? ? ? ? return tpInfo.selectOneMessageQueue(); ? ? ? ? } ? ? ? ? //默認輪詢 ? ? ? ? return tpInfo.selectOneMessageQueue(lastBrokerName); ? ? } ? ? public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { ? ? ? ? if (this.sendLatencyFaultEnable) { ? ? ? ? ? ? long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); ? ? ? ? ? ? this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); ? ? ? ? } ? ? } ? ? private long computeNotAvailableDuration(final long currentLatency) { ? ? ? ? for (int i = latencyMax.length - 1; i >= 0; i--) { ? ? ? ? ? ? if (currentLatency >= latencyMax[i]) ? ? ? ? ? ? ? ? return this.notAvailableDuration[i]; ? ? ? ? } ? ? ? ? return 0; ? ? } }
在選擇查找路由時,選擇消息隊列的關鍵步驟:
- 先按輪詢算法選擇一個消息隊列
- 從故障列表判斷該消息隊列是否可用
LatencyFaultToleranceImpl中判斷是否可用:
@Override public boolean isAvailable(final String name) { ? ? final FaultItem faultItem = this.faultItemTable.get(name); ? ? if (faultItem != null) { ? ? ? ? return faultItem.isAvailable(); ? ? } ? ? return true; } public boolean isAvailable() { ? ? ? ? ? ? return (System.currentTimeMillis() - startTimestamp) >= 0; ? ? ? ? }
- 判斷是否在故障列表中,不在故障列表中代表可用。
- 在故障列表中判斷當前時間是否大于等于故障規(guī)避的開始時間
startTimestamp
在消息發(fā)送結束后和發(fā)送出現(xiàn)異常時調用updateFaultItem()
方法來更新故障列表,computeNotAvailableDuration()
根據響應時間來計算故障周期時長,響應時間越長故障周期越長。網絡異常、Broker異常、客戶端異常都是固定響應時長30s,它們故障周期時長為10分鐘。消息發(fā)送成功或線程中斷異常響應時間在100毫秒以內,故障周期時長為0。
LatencyFaultToleranceImpl類的updateFaultItem方法:
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { ? ? FaultItem old = this.faultItemTable.get(name); ? ? if (null == old) { ? ? ? ? final FaultItem faultItem = new FaultItem(name); ? ? ? ? faultItem.setCurrentLatency(currentLatency); ? ? ? ? faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? ? ? //加入故障列表 ? ? ? ? old = this.faultItemTable.putIfAbsent(name, faultItem); ? ? ? ? if (old != null) { ? ? ? ? ? ? old.setCurrentLatency(currentLatency); ? ? ? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? ? ? } ? ? } else { ? ? ? ? old.setCurrentLatency(currentLatency); ? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); ? ? } }
FaultItem
存儲Broker名稱、響應時長、故障規(guī)避開始時間,最重要的是故障規(guī)避開始時間,用來判斷Queue是否可用
到此這篇關于RocketMQ設計之故障規(guī)避機制的文章就介紹到這了,更多相關RocketMQ故障規(guī)避機制內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
淺談idea live template高級知識_進階(給方法,類,js方法添加注釋)
下面小編就為大家?guī)硪黄獪\談idea live template高級知識_進階(給方法,類,js方法添加注釋)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息
這篇文章主要給大家介紹了關于Spring Boot教程之利用ActiveMQ實現(xiàn)延遲消息的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Spring Boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-11-11activemq整合springboot使用方法(個人微信小程序用)
這篇文章主要介紹了activemq整合springboot使用(個人微信小程序用),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03SpringBoot項目改為SpringCloud項目使用nacos作為注冊中心的方法
本文主要介紹了SpringBoot項目改為SpringCloud項目使用nacos作為注冊中心,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04