分布式消息隊(duì)列RocketMQ概念詳解
1.MQ概述
1.1 RocketMQ簡(jiǎn)介
RocketMQ 是阿里開(kāi)源的分布式消息中間件,跟其它中間件相比,RocketMQ 的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息生產(chǎn),存儲(chǔ),消費(fèi)全過(guò)程API的軟件系統(tǒng)。
1.2 MQ用途
限流削峰
MQ可以將系統(tǒng)的超量請(qǐng)求暫存其中,以便系統(tǒng)后期可以慢慢進(jìn)行處理,從而避免了請(qǐng)求的丟失或系統(tǒng)被壓垮。

異步解耦
上游系統(tǒng)對(duì)下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會(huì)大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高、而異步調(diào)用則會(huì)解決這些問(wèn)題。所以兩層之間若要實(shí)現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個(gè)MQ層。

數(shù)據(jù)收集
分布式系統(tǒng)會(huì)產(chǎn)生海量級(jí)數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控?cái)?shù)據(jù)、用戶行為等。針對(duì)這些數(shù)據(jù)流進(jìn)行實(shí)時(shí)或批量采集匯總,然后對(duì)這些數(shù)據(jù)流進(jìn)行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺(tái)的必備技術(shù)。通過(guò)MQ完成此類(lèi)數(shù)據(jù)收集是最好的選擇。
1.3 常見(jiàn)MQ產(chǎn)品
RabbitMQ
RabbitMQ是使用ErLang語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。其吞吐量較Kafka與RocketMQ要低,且由于其不是Java語(yǔ)言開(kāi)發(fā),所以公司內(nèi)部對(duì)其實(shí)現(xiàn)定制化開(kāi)發(fā)難度較大。
Kafka
Kafka是使用Scala/Java語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。其最大的特點(diǎn)就是高吞吐量,常用于大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算、日志采集等場(chǎng)景。其沒(méi)有遵循任何常見(jiàn)的MQ協(xié)議,而是使用自研協(xié)議。
RocketMQ
RocketMQ是使用Java語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。經(jīng)過(guò)數(shù)年阿里雙11的考驗(yàn),性能與穩(wěn)定性非常高。其沒(méi)有遵循任何常見(jiàn)的MQ協(xié)議,而是使用自研協(xié)議。
對(duì)比

2.RocketMQ 基本概念
2.1 消息
消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。單個(gè)消息所占空間不會(huì)很大。
RocketMQ中每個(gè)消息擁有唯一的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key,以方便對(duì)消息的查詢。不過(guò)需要注意的是,MessageId有兩個(gè):在生產(chǎn)者send()消息時(shí)會(huì)自動(dòng)生成一個(gè)MessageId(msgId),當(dāng)消息到達(dá)Broker后,Broker也會(huì)自動(dòng)生成一個(gè)MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標(biāo)識(shí)。
msgId:由producer端生成,其生成規(guī)則為: producerIp + 進(jìn)程pid + MessageClientIDSetter類(lèi)的ClassLoader的hashCode + 當(dāng)前時(shí)間 + AutomicInteger自增計(jì)數(shù)器
offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量)
key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識(shí)
2.2 主題
Topic表示一類(lèi)消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。 一個(gè)生產(chǎn)者可以同時(shí)發(fā)送多種Topic的消息;而一個(gè)消費(fèi)者只對(duì)某種特定的Topic感興趣,即只可以訂閱和消費(fèi)一種Topic的消息。
2.3 標(biāo)簽
標(biāo)簽為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類(lèi)型的消息。來(lái)自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。 標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。 Topic是消息的一級(jí)分類(lèi),Tag是消息的二級(jí)分類(lèi)。Topic相當(dāng)于貨物,Tag相當(dāng)于上海山東等地區(qū)。
2.4 隊(duì)列
存儲(chǔ)消息的物理實(shí)體。 一個(gè)Topic中可以包含多個(gè)Queue,每個(gè)Queue中存放的就是該Topic的消息。 一個(gè)Topic的Queue也被稱為一個(gè)Topic中消息的分區(qū)(Partition)。 一個(gè)Topic的Queue中的消息只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。 一個(gè)Queue中的消息不允許同一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者同時(shí)消費(fèi)。
分片不同于分區(qū)。在RocketMQ中,分片指的是存放相應(yīng)Topic的Broker。每個(gè)分片中會(huì)創(chuàng)建出相應(yīng)數(shù)量的分區(qū),即Queue,每個(gè)Queue的大小都是相同的。

2.5 Producer
消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過(guò)MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列進(jìn)行消息投遞,投遞的過(guò)程支持快速失敗并且低延遲。 例如:用戶提交的請(qǐng)求寫(xiě)入到MQ的過(guò)程,就是消息生產(chǎn)的過(guò)程,在這里用戶就是生產(chǎn)者 。
RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(Producer Group)的形式出現(xiàn)的。生產(chǎn)者組是同一類(lèi)生產(chǎn)者的集合,這類(lèi)Producer發(fā)送相同Topic類(lèi)型的消息。一個(gè)生產(chǎn)者組可以同時(shí)發(fā)送多個(gè)主題的消息。如果主題中有多個(gè)隊(duì)列,生產(chǎn)者組只有一個(gè)生產(chǎn)者,生產(chǎn)者會(huì)采取輪詢的方式進(jìn)行發(fā)送消息。
生產(chǎn)者代碼如下:
導(dǎo)入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>生產(chǎn)者代碼
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer order = new DefaultMQProducer("order");
order.setNamesrvAddr("localhost:9876");
order.start();
Message message = new Message("myTopic", "myTag", ("test").getBytes());
SendResult result = order.send(message);
System.out.println(result);
order.shutdown();
}2.6 Consumer
消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器中獲取到消息,并對(duì)消息進(jìn)行相關(guān)業(yè)務(wù)處理。 例如:系統(tǒng)從MQ中讀取到請(qǐng)求,并對(duì)請(qǐng)求進(jìn)行處理的過(guò)程就是消息消費(fèi)的過(guò)程,在這里系統(tǒng)就是消費(fèi)者。
RocketMQ中的消息消費(fèi)者都是以消費(fèi)者組(Consumer Group)的形式出現(xiàn)的。消費(fèi)者組是同一類(lèi)消費(fèi)者的集合,這類(lèi)Consumer消費(fèi)的是同一個(gè)Topic類(lèi)型的消息。 消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡(將一個(gè)Topic中的不同的Queue平均分配給同一個(gè)Consumer Group的不同的Consumer,注意,并不是將消息負(fù)載均衡)和容錯(cuò)(一個(gè)Consmer掛了,該Consumer Group中的其它Consumer可以接著消費(fèi)原Consumer消費(fèi)的Queue)的目標(biāo)變得非常容易。
消費(fèi)者代碼
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("myTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println("收到的消息"+list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}負(fù)載均衡策略
queue 個(gè)數(shù)大于 Consumer個(gè)數(shù), 那么 Consumer 會(huì)平均分配 queue,不夠平均,會(huì)根據(jù)clientId排序來(lái)拿取余數(shù)
queue個(gè)數(shù)小于Consumer個(gè)數(shù),那么會(huì)有Consumer閑置,就是浪費(fèi)掉了,其余Consumer平均分配到queue
消費(fèi)者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費(fèi)消息。
2.7 NameServer
NameServer是一個(gè)Broker與Topic路由的注冊(cè)中心,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。
主要包括兩個(gè)功能:
Broker管理:接受Broker集群的注冊(cè)信息并且保存下來(lái)作為路由信息的基本數(shù)據(jù);提供心跳檢測(cè)機(jī)制,檢查Broker是否還存活。
路由信息管理:每個(gè)NameServer中都保存著B(niǎo)roker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。Producer和Conumser通過(guò)NameServer可以獲取整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。NameServer可以獲取整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。
路由注冊(cè)
Name Server既然是注冊(cè)中心,那么是如何完成注冊(cè)的呢? NameServer通常也是以集群的方式部署,不過(guò),NameServer是無(wú)狀態(tài)的,即NameServer集群中的各個(gè)節(jié)點(diǎn)間是無(wú)差異的,各節(jié)點(diǎn)間相互不進(jìn)行信息通訊。 那各節(jié)點(diǎn)中的數(shù)據(jù)是如何進(jìn)行數(shù)據(jù)同步的呢?在Broker節(jié)點(diǎn)啟動(dòng)時(shí),輪詢NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接,發(fā)起注冊(cè)請(qǐng)求。在NameServer內(nèi)部維護(hù)著?個(gè)Broker列表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息。
Broker節(jié)點(diǎn)為了證明自己是活著的,為了維護(hù)與NameServer間的長(zhǎng)連接,會(huì)將最新的信息以心跳包的方式上報(bào)給NameServer,每30秒發(fā)送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會(huì)更新心跳時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間。
路由剔除
由于Broker關(guān)機(jī)、宕機(jī)或網(wǎng)絡(luò)抖動(dòng)等原因,NameServer沒(méi)有收到Broker的心跳,NameServer可能會(huì)將其從Broker列表中剔除。 NameServer中有?個(gè)定時(shí)任務(wù),每隔10秒就會(huì)掃描?次Broker表,查看每一個(gè)Broker的最新心跳時(shí)間戳距離當(dāng)前時(shí)間是否超過(guò)120秒,如果超過(guò),則會(huì)判定Broker失效,然后將其從Broker列表中剔除。
路由發(fā)現(xiàn)
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時(shí),NameServer不會(huì)主動(dòng)推送給客戶端,而是客戶端定時(shí)拉取Topic最新的路由。 默認(rèn)客戶端每30秒會(huì)拉取一次最新的路由。
2.8 Broker
Broker充當(dāng)著消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。
Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲(chǔ)從生產(chǎn)者發(fā)送來(lái)的消息,同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。Broker同時(shí)也存儲(chǔ)著消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組消費(fèi)進(jìn)度偏移offset、主題、隊(duì)列等。
模塊如下圖:

Remoting Module:整個(gè)Broker的實(shí)體,負(fù)責(zé)處理來(lái)自clients端的請(qǐng)求。而這個(gè)Broker實(shí)體則由以下模塊構(gòu)成。
Client Manager:客戶端管理器。負(fù)責(zé)接收、解析客戶端(Producer/Consumer)請(qǐng)求,管理客戶端。例如,維護(hù)Consumer的Topic訂閱信息
Store Service:存儲(chǔ)服務(wù)。提供方便簡(jiǎn)單的API接口,處理消息存儲(chǔ)到物理硬盤(pán)和消息查詢功能。
HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。
Index Service:索引服務(wù)。根據(jù)特定的Message key,對(duì)投遞到Broker的消息進(jìn)行索引服務(wù),同時(shí)也提供根據(jù)Message Key對(duì)消息進(jìn)行快速查詢的功能。
2.9 RocketMQ 工作流程
工作流程如下圖:

1)啟動(dòng)NameServer,NameServer啟動(dòng)后開(kāi)始監(jiān)聽(tīng)端口,等待Broker、Producer、Consumer連接。
2)啟動(dòng)Broker時(shí),Broker會(huì)與所有的NameServer建立并保持長(zhǎng)連接,然后每50秒向NameServer定時(shí)發(fā)送心跳包。
3)發(fā)送消息前,可以先創(chuàng)建Topic,創(chuàng)建Topic時(shí)需要指定該Topic要存儲(chǔ)在哪些Broker上,當(dāng)然,在創(chuàng)建Topic時(shí)也會(huì)將Topic與Broker的關(guān)系寫(xiě)入到NameServer中。不過(guò),這步是可選的,也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建Topic。
4) Producer發(fā)送消息,啟動(dòng)時(shí)先跟NameServer集群中的其中一臺(tái)建立長(zhǎng)連接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略從隊(duì)選擇一個(gè)Queue,與隊(duì)列所在的Broker建立長(zhǎng)連接從而向Broker發(fā)消息。當(dāng)然,在獲取到路由信息后,Producer會(huì)首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。
5)Consumer跟Producer類(lèi)似,跟其中一臺(tái)NameServer建立長(zhǎng)連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費(fèi)的Queue,然后直接跟Broker建立長(zhǎng)連接,開(kāi)始消費(fèi)其中的消息。Consumer在獲取到路由信息后,同樣也會(huì)每30秒從NameServer更新一次路由信息。不過(guò)不同于Producer的是,Consumer還會(huì)向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。
以上就是分布式消息隊(duì)列RocketMQ概念詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ概念的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis Properties及別名定義實(shí)例詳解
這篇文章主要介紹了MyBatis Properties及別名定義實(shí)例詳解,需要的朋友可以參考下2017-08-08
搭建maven私有倉(cāng)庫(kù)的方法實(shí)現(xiàn)
Maven是一個(gè)流行的Java項(xiàng)目管理工具,它可以幫助我們管理項(xiàng)目的構(gòu)建、報(bào)告和文檔,本文主要介紹了搭建maven私有倉(cāng)庫(kù)的方法實(shí)現(xiàn),感興趣的可以了解一下2023-05-05
SpringBoot 項(xiàng)目使用hutool 工具進(jìn)行 http 接口調(diào)用的處理方
在實(shí)際的開(kāi)發(fā)過(guò)程中一個(gè)互聯(lián)網(wǎng)的項(xiàng)目來(lái)說(shuō) ,有可能會(huì)涉及到調(diào)用外部接口的實(shí)際業(yè)務(wù)場(chǎng)景,下面通過(guò)本文給大家介紹SpringBoot 項(xiàng)目 使用hutool 工具進(jìn)行 http 接口調(diào)用的處理方法,需要的朋友可以參考下2022-06-06
使用springmvc參數(shù)接收boolean類(lèi)型參數(shù)的問(wèn)題
這篇文章主要介紹了使用springmvc參數(shù)接收boolean類(lèi)型參數(shù)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
Java實(shí)現(xiàn)圖片轉(zhuǎn)換PDF文件的示例代碼
這篇文章主要介紹了Java實(shí)現(xiàn)圖片轉(zhuǎn)換PDF文件的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
從java反編譯及字節(jié)碼角度探索分析String拼接字符串效率
這篇文章主要介紹了從java反編譯及字節(jié)碼角度探索分析String拼接字符串效率,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
詳解Java對(duì)象創(chuàng)建的過(guò)程及內(nèi)存布局
今天給大家?guī)?lái)的文章是Java對(duì)象創(chuàng)建的過(guò)程及內(nèi)存布局,文中有非常詳細(xì)的圖文示例及介紹,需要的朋友可以參考下2021-06-06

