RocketMQ設(shè)計(jì)之主從復(fù)制和讀寫分離
一、主從復(fù)制
RocketMQ為了提高消費(fèi)的高可用性,避免Broker發(fā)生單點(diǎn)故障引起B(yǎng)roker上的消息無法及時(shí)消費(fèi),同時(shí)避免單個(gè)機(jī)器上硬盤壞損出現(xiàn)消費(fèi)數(shù)據(jù)丟失。
RocketMQ采用Broker數(shù)據(jù)主從復(fù)制機(jī)制,當(dāng)消息發(fā)送到Master服務(wù)器后會(huì)將消息同步到Slave服務(wù)器,如果Master服務(wù)器宕機(jī),消息消費(fèi)者還可以繼續(xù)從Slave拉取消息。
消息從Master服務(wù)器復(fù)制到Slave服務(wù)器上,有兩種復(fù)制方式:同步復(fù)制SYNC_MASTER和異步復(fù)制ASYNC_MASTER。
通過配置文件conf/broker.conf文件配置:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. ?See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. ?You may obtain a copy of the License at # # ? ? http://www.apache.org/licenses/LICENSE-2.0 # # ?Unless required by applicable law or agreed to in writing, software # ?distributed under the License is distributed on an "AS IS" BASIS, # ?WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # ?See the License for the specific language governing permissions and # ?limitations under the License. brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
對(duì)brokerRole參數(shù)進(jìn)行設(shè)置:
同步復(fù)制:Master和Slave都寫成功后才返回客戶端寫成功的狀態(tài)。
- 優(yōu)點(diǎn):Master服務(wù)器出現(xiàn)故障,Slave服務(wù)器上有全部數(shù)據(jù)的備份,很容易恢復(fù)到Master服務(wù)器。
- 缺點(diǎn):由于多了一個(gè)同步等待的步驟,增加數(shù)據(jù)寫入延遲,降低系統(tǒng)吞吐量。
異步復(fù)制:僅Master服務(wù)器寫成功即可返回給客戶端寫成功的狀態(tài)。
- 優(yōu)點(diǎn):沒有同步等待的步驟,低延遲,高吞吐。
- 缺點(diǎn):如果Master服務(wù)器出現(xiàn)故障,有些數(shù)據(jù)可能未寫入Slave服務(wù)器,未同步的數(shù)據(jù)可能丟失
實(shí)際應(yīng)用中,需要結(jié)合業(yè)務(wù)場景,合理設(shè)置刷盤方式和主從復(fù)制方式。不建議使用同步刷盤方式,因?yàn)樗l繁觸發(fā)寫磁盤操作,性能下降很明顯。**通常把Master和Slave設(shè)置為異步刷盤,同步復(fù)制,保證數(shù)據(jù)不丟失。**這樣即使一臺(tái)服務(wù)器出故障,仍然可以保證數(shù)據(jù)不丟失。
二、讀寫分離
讀寫分離機(jī)制是高性能、高可用架構(gòu)中常見的設(shè)計(jì),例如Mysql實(shí)現(xiàn)讀寫分離機(jī)制,Client只能從Master服務(wù)器寫數(shù)據(jù),可以從Master服務(wù)器和Slave服務(wù)器都讀數(shù)據(jù)。
RocketMQ的Consumer在拉取消息時(shí),Broker會(huì)判斷Master服務(wù)器的消息堆積量來決定Consumer是否從Slave服務(wù)器拉取消息消費(fèi)。默認(rèn)一開始從Master服務(wù)器拉群消息,如果Master服務(wù)器的消息堆積超過物理內(nèi)存40%,則會(huì)返回給Consumer的消息結(jié)果并告知Consumer,下次從其他Slave服務(wù)器上拉取消息。
RocketMQ 有屬于自己的一套讀寫分離邏輯,會(huì)判斷主服務(wù)器的消息堆積量來決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)。
Consumer在向 Broker 發(fā)送消息拉取請(qǐng)求時(shí),會(huì)根據(jù)篩選出來的消息隊(duì)列,判定是從Master,還是從Slave拉取消息,默認(rèn)是Master。
Broker 接收到消息消費(fèi)者拉取請(qǐng)求,在獲取本地堆積的消息量后,會(huì)計(jì)算服務(wù)器的消息堆積量是否大于物理內(nèi)存的一定值,如果是,則標(biāo)記下次從 Slave服務(wù)器拉取,計(jì)算 Slave服務(wù)器的 Broker Id,并響應(yīng)給消費(fèi)者。
Consumer在接收到 Broker的響應(yīng)后,會(huì)把消息隊(duì)列與建議下一次拉取節(jié)點(diǎn)的 Broker Id 關(guān)聯(lián)起來,并緩存在內(nèi)存中,以便下次拉取消息時(shí),確定從哪個(gè)節(jié)點(diǎn)發(fā)送請(qǐng)求。
public class GetMessageResult {
? ? private final List<SelectMappedBufferResult> messageMapedList =
? ? ? ? new ArrayList<SelectMappedBufferResult>(100);
? ? private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
? ? private GetMessageStatus status;
? ? private long nextBeginOffset;
? ? private long minOffset;
? ? private long maxOffset;
? ? private int bufferTotalSize = 0;
? ? // 標(biāo)識(shí)是否通過Slave拉拉取消息
? ? private boolean suggestPullingFromSlave = false;
? ? private int msgCount4Commercial = 0;
}
// 針對(duì)消息堆積量過大會(huì)切換到Slave進(jìn)行查詢。
// maxOffsetPy 為當(dāng)前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示當(dāng)前系統(tǒng)物理內(nèi)存,accessMessageInMemoryMaxRatio 的默認(rèn)值為 40,
// 以上邏輯即可算出當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%,如果大于則將 suggestPullingFromSlave 設(shè)置為 true。
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
? ? * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);- 決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)的值存在
GetMessageResult類中。 suggestPullingFromSlave的默認(rèn)值為 false,即默認(rèn)消費(fèi)者不會(huì)消費(fèi)從服務(wù)器,但它會(huì)在消費(fèi)者發(fā)送消息拉取請(qǐng)求時(shí),動(dòng)態(tài)改變?cè)撝?,Broker 接收、處理消費(fèi)者拉取消息請(qǐng)求。- 針對(duì)本MessageQueue消息堆積量過大會(huì)切換到Slave進(jìn)行查詢,maxOffsetPy 為當(dāng)前最大物理偏移量,
maxPhyOffsetPulling為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量,當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%就會(huì)切換到Slave進(jìn)行查詢。
public class PullMessageResponseHeader implements CommandCustomHeader {
? ? // suggestWhichBrokerId標(biāo)識(shí)從哪個(gè)broker進(jìn)行查詢
? ? private Long suggestWhichBrokerId;
? ? private Long nextBeginOffset;
? ? private Long minOffset;
? ? private Long maxOffset;
}
public class PullMessageProcessor implements NettyRequestProcessor {
? ? private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
? ? ? ? throws RemotingCommandException {
? ? ? ? RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
? ? ? ? final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
? ? ? ? final PullMessageRequestHeader requestHeader =
? ? ? ? ? ? (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
? ? ? ? response.setOpaque(request.getOpaque());
? ? ? ? final GetMessageResult getMessageResult =
? ? ? ? ? ? this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
? ? ? ? ? ? ? ? requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
? ? ? ? if (getMessageResult != null) {
? ? ? ? ? ? response.setRemark(getMessageResult.getStatus().name());
? ? ? ? ? ? responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
? ? ? ? ? ? responseHeader.setMinOffset(getMessageResult.getMinOffset());
? ? ? ? ? ? responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
? ? ? ? ? ? // 建議從slave消費(fèi)消息
? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) {
? ? ? ? ? ? ? ? // 從slave查詢
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? // 從master查詢
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? }
? ? ? ? ? ? switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
? ? ? ? ? ? ? ? case ASYNC_MASTER:
? ? ? ? ? ? ? ? case SYNC_MASTER:
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? case SLAVE:
? ? ? ? ? ? ? ? ? ? // 針對(duì)SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER
? ? ? ? ? ? ? ? ? ? if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
? ? ? ? ? ? ? ? ? ? ? ? response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
? ? ? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
? ? ? ? ? ? ? ? // consume too slow ,redirect to another machine
? ? ? ? ? ? ? ? if (getMessageResult.isSuggestPullingFromSlave()) {
? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // consume ok
? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return response;
? ? }
}PullMessageResponseHeader的suggestWhichBrokerId標(biāo)識(shí)某個(gè)MessageQueue的消息從具體的brokerId進(jìn)行查詢。
針對(duì)Slave不可讀的情況會(huì)設(shè)置為從MASTER_ID進(jìn)行查詢。
public class PullAPIWrapper {
? ? private final InternalLogger log = ClientLogger.getLog();
? ? private final MQClientInstance mQClientFactory;
? ? private final String consumerGroup;
? ? private final boolean unitMode;
? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
? ? private volatile boolean connectBrokerByUser = false;
? ? private volatile long defaultBrokerId = MixAll.MASTER_ID;
? ? private Random random = new Random(System.currentTimeMillis());
? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
? ? public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
? ? ? ? final SubscriptionData subscriptionData) {
? ? ? ? PullResultExt pullResultExt = (PullResultExt) pullResult;
? ? ? ? // 處理MessageQueue對(duì)應(yīng)拉取的brokerId
? ? ? ? this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
? ? ? ? // 省略相關(guān)代碼
? ? ? ? pullResultExt.setMessageBinary(null);
? ? ? ? return pullResult;
? ? }
? ? public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
? ? ? ? // 保存在pullFromWhichNodeTable對(duì)象中
? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
? ? ? ? if (null == suggest) {
? ? ? ? ? ? this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
? ? ? ? } else {
? ? ? ? ? ? suggest.set(brokerId);
? ? ? ? }
? ? }
}Consumer收到拉取響應(yīng)回來的數(shù)據(jù)后,會(huì)將下次建議拉取的 brokerId緩存起來。
public class PullAPIWrapper {
? ? private final InternalLogger log = ClientLogger.getLog();
? ? private final MQClientInstance mQClientFactory;
? ? private final String consumerGroup;
? ? private final boolean unitMode;
? ? private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
? ? ? ? new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
? ? private volatile boolean connectBrokerByUser = false;
? ? private volatile long defaultBrokerId = MixAll.MASTER_ID;
? ? private Random random = new Random(System.currentTimeMillis());
? ? private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
? ? public PullResult pullKernelImpl(
? ? ? ? final MessageQueue mq,
? ? ? ? final String subExpression,
? ? ? ? final String expressionType,
? ? ? ? final long subVersion,
? ? ? ? final long offset,
? ? ? ? final int maxNums,
? ? ? ? final int sysFlag,
? ? ? ? final long commitOffset,
? ? ? ? final long brokerSuspendMaxTimeMillis,
? ? ? ? final long timeoutMillis,
? ? ? ? final CommunicationMode communicationMode,
? ? ? ? final PullCallback pullCallback
? ? ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
? ? ? ? // 查找MessageQueue應(yīng)該從brokerName的哪個(gè)節(jié)點(diǎn)查詢
? ? ? ? FindBrokerResult findBrokerResult =
? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false);
? ? ? ? if (null == findBrokerResult) {
? ? ? ? ? ? this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
? ? ? ? ? ? findBrokerResult =
? ? ? ? ? ? ? ? this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
? ? ? ? ? ? ? ? ? ? this.recalculatePullFromWhichNode(mq), false);
? ? ? ? }
? ? ? ? if (findBrokerResult != null) {
? ? ? ? ? ? {
? ? ? ? ? ? ? ? // check version
? ? ? ? ? ? ? ? if (!ExpressionType.isTagType(expressionType)
? ? ? ? ? ? ? ? ? ? && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
? ? ? ? ? ? ? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
? ? ? ? ? ? ? ? ? ? ? ? + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? int sysFlagInner = sysFlag;
? ? ? ? ? ? if (findBrokerResult.isSlave()) {
? ? ? ? ? ? ? ? sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
? ? ? ? ? ? }
? ? ? ? ? ? PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
? ? ? ? ? ? requestHeader.setConsumerGroup(this.consumerGroup);
? ? ? ? ? ? requestHeader.setTopic(mq.getTopic());
? ? ? ? ? ? requestHeader.setQueueId(mq.getQueueId());
? ? ? ? ? ? requestHeader.setQueueOffset(offset);
? ? ? ? ? ? requestHeader.setMaxMsgNums(maxNums);
? ? ? ? ? ? requestHeader.setSysFlag(sysFlagInner);
? ? ? ? ? ? requestHeader.setCommitOffset(commitOffset);
? ? ? ? ? ? requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
? ? ? ? ? ? requestHeader.setSubscription(subExpression);
? ? ? ? ? ? requestHeader.setSubVersion(subVersion);
? ? ? ? ? ? requestHeader.setExpressionType(expressionType);
? ? ? ? ? ? String brokerAddr = findBrokerResult.getBrokerAddr();
? ? ? ? ? ? if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
? ? ? ? ? ? ? ? brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
? ? ? ? ? ? }
? ? ? ? ? ? PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
? ? ? ? ? ? ? ? brokerAddr,
? ? ? ? ? ? ? ? requestHeader,
? ? ? ? ? ? ? ? timeoutMillis,
? ? ? ? ? ? ? ? communicationMode,
? ? ? ? ? ? ? ? pullCallback);
? ? ? ? ? ? return pullResult;
? ? ? ? }
? ? ? ? throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
? ? }
? ? public long recalculatePullFromWhichNode(final MessageQueue mq) {
? ? ? ? if (this.isConnectBrokerByUser()) {
? ? ? ? ? ? return this.defaultBrokerId;
? ? ? ? }
? ? ? ? AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
? ? ? ? if (suggest != null) {
? ? ? ? ? ? return suggest.get();
? ? ? ? }
? ? ? ? return MixAll.MASTER_ID;
? ? }
}Consumer拉取消息的時(shí)候會(huì)從 pullFromWhichNodeTable 中取出拉取 brokerId確定去具體的broker進(jìn)行查詢。
到此這篇關(guān)于RocketMQ設(shè)計(jì)之主從復(fù)制和讀寫分離的文章就介紹到這了,更多相關(guān)RocketMQ從復(fù)制和讀寫分離內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對(duì)比
批量插入功能是我們?nèi)粘9ぷ髦斜容^常見的業(yè)務(wù)功能之一,下面這篇文章主要給大家介紹了關(guān)于Mybatis批量插入數(shù)據(jù)的兩種方式總結(jié)與對(duì)比的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01
Java基礎(chǔ)之重載(Overload)與重寫(Override)詳解
這篇文章主要介紹了Java基礎(chǔ)之重載(Overload)與重寫(Override)詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04
Java/Android 實(shí)現(xiàn)簡單的HTTP服務(wù)器
這篇文章主要介紹了Java/Android 如何實(shí)現(xiàn)簡單的HTTP服務(wù)器,幫助大家更好的進(jìn)行功能測試,感興趣的朋友可以了解下2020-10-10
mybatis返回的map結(jié)果如何設(shè)置有序
這篇文章主要介紹了mybatis返回的map結(jié)果如何設(shè)置有序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
MyBatis通過JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語句問題
這篇文章主要介紹了MyBatis通過JDBC數(shù)據(jù)驅(qū)動(dòng)生成的執(zhí)行語句問題的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-08-08

