RocketMQ NameServer保障數(shù)據(jù)一致性實現(xiàn)方法講解
路由注冊角度
對于ZooKeeper這樣的強(qiáng)一致性組件,使用主從分離的架構(gòu),數(shù)據(jù)只寫到主節(jié)點(diǎn),主從之間的數(shù)據(jù)同步通過內(nèi)部機(jī)制來進(jìn)行數(shù)據(jù)復(fù)制。
對于RocketMQ來說,NameServer節(jié)點(diǎn)之間是互相不進(jìn)行通信的,這樣也就無法進(jìn)行數(shù)據(jù)復(fù)制。RocketMQ采用的機(jī)制是:在Broker節(jié)點(diǎn)啟動的時候,輪詢所有的NameServer節(jié)點(diǎn),并與每個NameServer節(jié)點(diǎn)建立長連接,發(fā)送注冊請求。
相應(yīng)的,NameServer節(jié)點(diǎn)內(nèi)部也會維護(hù)一個Broker列表,用來動態(tài)存儲Broker的信息,做服務(wù)發(fā)現(xiàn)。
與此同時,Broker使用心跳機(jī)制來向所有NameServer節(jié)點(diǎn)證明自己是存活的,即定期發(fā)送心跳包;收到心跳包之后,NameServer節(jié)點(diǎn)會更新這個Broker的最新存活時間。

注意: NameServer節(jié)點(diǎn)在處理心跳包時,存在多個請求同時處理同一張表的情況,為了保證并發(fā)安全性,RocketMQ引入了讀寫鎖(ReadWriteLock),保證了多個Producer并發(fā)讀取路由信息不受影響,但同一時刻只能處理一個Broker發(fā)來的心跳包,這也符合讀多寫少的經(jīng)典場景。
路由剔除
正常情況下:
如果Broker下線,則會與NameServer斷開長連接,底層基于Netty的通道關(guān)閉監(jiān)聽器會監(jiān)聽到連接斷開事件,然后將這個Broker信息剔除。
異常情況下:
NameServer有一個周期為10s的定時任務(wù),定期掃描Broker表,如果超過120s沒有收到某個Broker的心跳包,則會判定其失效并移除。
對于日常運(yùn)維的需求,RocketMQ提供了優(yōu)雅剔除路由信息的方式,即可以先禁止Broker的寫權(quán)限,這樣發(fā)送到這個Broker的請求都會收到一個NO_PERMISSION的響應(yīng),客戶端自動重試其他的Broker。
路由發(fā)現(xiàn)
生產(chǎn)者視角:
一般是在發(fā)送第一條消息時,才會根據(jù)Topic從NameServer獲取路由信息
消費(fèi)者視角:
訂閱的Topic一般是固定的,所以在啟動時就會拉取
針對路由信息可能變化的場景,RocketMQ提供了定時拉取Topic最新路由信息的機(jī)制,以應(yīng)對Broker集群發(fā)生變化的場景。
DefaultMQProducer和DefaultMQConsumer有一個pollNameServerInterval的配置項,用于指定從NameServer獲取路由信息的周期,其底層依賴MQClientInstance類,MQClientInstance類中的updateTopicRouteInfoFromNameServer方法,可以根據(jù)指定的時間間隔,周期性地從NameServer里拉取路由信息。在拉取時,會將當(dāng)前啟動的Producer和Consumer需要用到的Topic列表放到一個集合里,逐個進(jìn)行更新,源碼如下:
/**
* 更新單個Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
/**
* 更新單個Topic路由信息
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 使用默認(rèn)TopicKey獲取TopicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
// 克隆出一個實例cloneTopicRouteData : topicRouteData會被設(shè)置到下面的publishInfo/subscribeInfo
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新Broker地址相關(guān)信息,當(dāng)某個Broker心跳超時后,會被從brokerAddrTable中移除
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
當(dāng)Broker宕機(jī)時,還可以通過客戶端的重試機(jī)制來解決,避免因為定時更新路由信息不及時導(dǎo)致的服務(wù)宕機(jī)~~
到此這篇關(guān)于RocketMQ NameServer保障數(shù)據(jù)一致性實現(xiàn)方法講解的文章就介紹到這了,更多相關(guān)RocketMQ NameServer內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
分布式難題ElasticSearch解決大數(shù)據(jù)量檢索面試
這篇文章主要為大家介紹了分布式面試難題,ElasticSearch解決大數(shù)據(jù)量檢索的問題分析回答,讓面試官無話可說,幫助大家實現(xiàn)面試開薪自由2022-03-03
Java Socket編程實例(三)- TCP服務(wù)端線程池
這篇文章主要講解Java Socket編程中TCP服務(wù)端線程池的實例,希望能給大家做一個參考。2016-06-06
SpringBoot(cloud)自動裝配bean找不到類型的問題
這篇文章主要介紹了SpringBoot(cloud)自動裝配bean找不到類型的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
springboot整合swagger3報Unable to infer base&nbs
這篇文章主要介紹了springboot整合swagger3報Unable to infer base url錯誤問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
java連接mysql數(shù)據(jù)庫及測試是否連接成功的方法
這篇文章主要介紹了java連接mysql數(shù)據(jù)庫及測試是否連接成功的方法,結(jié)合完整實例形式分析了java基于jdbc連接mysql數(shù)據(jù)庫并返回連接狀態(tài)的具體步驟與相關(guān)操作技巧,需要的朋友可以參考下2017-09-09

