亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

分布式消息隊(duì)列RocketMQ概念詳解

 更新時(shí)間:2023年05月09日 14:19:17   作者:山河亦問(wèn)安  
RocketMQ?是阿里開(kāi)源的分布式消息中間件,跟其它中間件相比,RocketMQ?的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息生產(chǎn),存儲(chǔ),消費(fèi)全過(guò)程API的軟件系統(tǒng),本文詳細(xì)介紹了分布式消息隊(duì)列RocketMQ概念,需要的朋友可以參考下

1.MQ概述

1.1 RocketMQ簡(jiǎn)介

RocketMQ 是阿里開(kāi)源的分布式消息中間件,跟其它中間件相比,RocketMQ 的特點(diǎn)是純JAVA實(shí)現(xiàn),是一套提供了消息生產(chǎn),存儲(chǔ),消費(fèi)全過(guò)程API的軟件系統(tǒng)。

1.2 MQ用途

限流削峰

MQ可以將系統(tǒng)的超量請(qǐng)求暫存其中,以便系統(tǒng)后期可以慢慢進(jìn)行處理,從而避免了請(qǐng)求的丟失或系統(tǒng)被壓垮。

 異步解耦

上游系統(tǒng)對(duì)下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會(huì)大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高、而異步調(diào)用則會(huì)解決這些問(wèn)題。所以兩層之間若要實(shí)現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個(gè)MQ層。

 數(shù)據(jù)收集

分布式系統(tǒng)會(huì)產(chǎn)生海量級(jí)數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控?cái)?shù)據(jù)、用戶行為等。針對(duì)這些數(shù)據(jù)流進(jìn)行實(shí)時(shí)或批量采集匯總,然后對(duì)這些數(shù)據(jù)流進(jìn)行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺(tái)的必備技術(shù)。通過(guò)MQ完成此類(lèi)數(shù)據(jù)收集是最好的選擇。

1.3 常見(jiàn)MQ產(chǎn)品

RabbitMQ
RabbitMQ是使用ErLang語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。其吞吐量較Kafka與RocketMQ要低,且由于其不是Java語(yǔ)言開(kāi)發(fā),所以公司內(nèi)部對(duì)其實(shí)現(xiàn)定制化開(kāi)發(fā)難度較大。
Kafka
Kafka是使用Scala/Java語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。其最大的特點(diǎn)就是高吞吐量,常用于大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算、日志采集等場(chǎng)景。其沒(méi)有遵循任何常見(jiàn)的MQ協(xié)議,而是使用自研協(xié)議。
RocketMQ
RocketMQ是使用Java語(yǔ)言開(kāi)發(fā)的一款MQ產(chǎn)品。經(jīng)過(guò)數(shù)年阿里雙11的考驗(yàn),性能與穩(wěn)定性非常高。其沒(méi)有遵循任何常見(jiàn)的MQ協(xié)議,而是使用自研協(xié)議。

對(duì)比

2.RocketMQ 基本概念

2.1 消息

消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。單個(gè)消息所占空間不會(huì)很大。

RocketMQ中每個(gè)消息擁有唯一的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key,以方便對(duì)消息的查詢。不過(guò)需要注意的是,MessageId有兩個(gè):在生產(chǎn)者send()消息時(shí)會(huì)自動(dòng)生成一個(gè)MessageId(msgId),當(dāng)消息到達(dá)Broker后,Broker也會(huì)自動(dòng)生成一個(gè)MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標(biāo)識(shí)。 

msgId:由producer端生成,其生成規(guī)則為: producerIp + 進(jìn)程pid + MessageClientIDSetter類(lèi)的ClassLoader的hashCode + 當(dāng)前時(shí)間 + AutomicInteger自增計(jì)數(shù)器 
offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量) 
key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識(shí)

2.2 主題

Topic表示一類(lèi)消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。 一個(gè)生產(chǎn)者可以同時(shí)發(fā)送多種Topic的消息;而一個(gè)消費(fèi)者只對(duì)某種特定的Topic感興趣,即只可以訂閱和消費(fèi)一種Topic的消息。 

2.3 標(biāo)簽

標(biāo)簽為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類(lèi)型的消息。來(lái)自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。 標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。 Topic是消息的一級(jí)分類(lèi),Tag是消息的二級(jí)分類(lèi)。Topic相當(dāng)于貨物,Tag相當(dāng)于上海山東等地區(qū)。

2.4 隊(duì)列

存儲(chǔ)消息的物理實(shí)體。 一個(gè)Topic中可以包含多個(gè)Queue,每個(gè)Queue中存放的就是該Topic的消息。 一個(gè)Topic的Queue也被稱為一個(gè)Topic中消息的分區(qū)(Partition)。 一個(gè)Topic的Queue中的消息只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。 一個(gè)Queue中的消息不允許同一個(gè)消費(fèi)者組中的多個(gè)消費(fèi)者同時(shí)消費(fèi)。

分片不同于分區(qū)。在RocketMQ中,分片指的是存放相應(yīng)Topic的Broker。每個(gè)分片中會(huì)創(chuàng)建出相應(yīng)數(shù)量的分區(qū),即Queue,每個(gè)Queue的大小都是相同的。

2.5 Producer

消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過(guò)MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列進(jìn)行消息投遞,投遞的過(guò)程支持快速失敗并且低延遲。  例如:用戶提交的請(qǐng)求寫(xiě)入到MQ的過(guò)程,就是消息生產(chǎn)的過(guò)程,在這里用戶就是生產(chǎn)者 。

RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(Producer Group)的形式出現(xiàn)的。生產(chǎn)者組是同一類(lèi)生產(chǎn)者的集合,這類(lèi)Producer發(fā)送相同Topic類(lèi)型的消息。一個(gè)生產(chǎn)者組可以同時(shí)發(fā)送多個(gè)主題的消息。如果主題中有多個(gè)隊(duì)列,生產(chǎn)者組只有一個(gè)生產(chǎn)者,生產(chǎn)者會(huì)采取輪詢的方式進(jìn)行發(fā)送消息。

生產(chǎn)者代碼如下:

導(dǎo)入依賴

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

 生產(chǎn)者代碼

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }

2.6 Consumer

消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器中獲取到消息,并對(duì)消息進(jìn)行相關(guān)業(yè)務(wù)處理。  例如:系統(tǒng)從MQ中讀取到請(qǐng)求,并對(duì)請(qǐng)求進(jìn)行處理的過(guò)程就是消息消費(fèi)的過(guò)程,在這里系統(tǒng)就是消費(fèi)者。  

RocketMQ中的消息消費(fèi)者都是以消費(fèi)者組(Consumer Group)的形式出現(xiàn)的。消費(fèi)者組是同一類(lèi)消費(fèi)者的集合,這類(lèi)Consumer消費(fèi)的是同一個(gè)Topic類(lèi)型的消息。 消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡(將一個(gè)Topic中的不同的Queue平均分配給同一個(gè)Consumer Group的不同的Consumer,注意,并不是將消息負(fù)載均衡)和容錯(cuò)(一個(gè)Consmer掛了,該Consumer Group中的其它Consumer可以接著消費(fèi)原Consumer消費(fèi)的Queue)的目標(biāo)變得非常容易。

消費(fèi)者代碼

  public static void main(String[] args) throws MQClientException {
 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
 
    }

負(fù)載均衡策略

queue 個(gè)數(shù)大于 Consumer個(gè)數(shù), 那么 Consumer 會(huì)平均分配 queue,不夠平均,會(huì)根據(jù)clientId排序來(lái)拿取余數(shù)
queue個(gè)數(shù)小于Consumer個(gè)數(shù),那么會(huì)有Consumer閑置,就是浪費(fèi)掉了,其余Consumer平均分配到queue

消費(fèi)者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費(fèi)消息。

2.7 NameServer

NameServer是一個(gè)Broker與Topic路由的注冊(cè)中心,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。 
主要包括兩個(gè)功能: 
Broker管理:接受Broker集群的注冊(cè)信息并且保存下來(lái)作為路由信息的基本數(shù)據(jù);提供心跳檢測(cè)機(jī)制,檢查Broker是否還存活。

路由信息管理:每個(gè)NameServer中都保存著B(niǎo)roker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。Producer和Conumser通過(guò)NameServer可以獲取整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。NameServer可以獲取整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。 

路由注冊(cè) 

Name Server既然是注冊(cè)中心,那么是如何完成注冊(cè)的呢? NameServer通常也是以集群的方式部署,不過(guò),NameServer是無(wú)狀態(tài)的,即NameServer集群中的各個(gè)節(jié)點(diǎn)間是無(wú)差異的,各節(jié)點(diǎn)間相互不進(jìn)行信息通訊。 那各節(jié)點(diǎn)中的數(shù)據(jù)是如何進(jìn)行數(shù)據(jù)同步的呢?在Broker節(jié)點(diǎn)啟動(dòng)時(shí),輪詢NameServer列表,與每個(gè)NameServer節(jié)點(diǎn)建立長(zhǎng)連接,發(fā)起注冊(cè)請(qǐng)求。在NameServer內(nèi)部維護(hù)著?個(gè)Broker列表,用來(lái)動(dòng)態(tài)存儲(chǔ)Broker的信息。  

Broker節(jié)點(diǎn)為了證明自己是活著的,為了維護(hù)與NameServer間的長(zhǎng)連接,會(huì)將最新的信息以心跳包的方式上報(bào)給NameServer,每30秒發(fā)送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會(huì)更新心跳時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間。 

路由剔除 
由于Broker關(guān)機(jī)、宕機(jī)或網(wǎng)絡(luò)抖動(dòng)等原因,NameServer沒(méi)有收到Broker的心跳,NameServer可能會(huì)將其從Broker列表中剔除。 NameServer中有?個(gè)定時(shí)任務(wù),每隔10秒就會(huì)掃描?次Broker表,查看每一個(gè)Broker的最新心跳時(shí)間戳距離當(dāng)前時(shí)間是否超過(guò)120秒,如果超過(guò),則會(huì)判定Broker失效,然后將其從Broker列表中剔除。 

路由發(fā)現(xiàn) 
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時(shí),NameServer不會(huì)主動(dòng)推送給客戶端,而是客戶端定時(shí)拉取Topic最新的路由。 默認(rèn)客戶端每30秒會(huì)拉取一次最新的路由。

2.8 Broker

Broker充當(dāng)著消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。
Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲(chǔ)從生產(chǎn)者發(fā)送來(lái)的消息,同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備。Broker同時(shí)也存儲(chǔ)著消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組消費(fèi)進(jìn)度偏移offset、主題、隊(duì)列等。

模塊如下圖:

Remoting Module:整個(gè)Broker的實(shí)體,負(fù)責(zé)處理來(lái)自clients端的請(qǐng)求。而這個(gè)Broker實(shí)體則由以下模塊構(gòu)成。

Client Manager:客戶端管理器。負(fù)責(zé)接收、解析客戶端(Producer/Consumer)請(qǐng)求,管理客戶端。例如,維護(hù)Consumer的Topic訂閱信息

Store Service:存儲(chǔ)服務(wù)。提供方便簡(jiǎn)單的API接口,處理消息存儲(chǔ)到物理硬盤(pán)和消息查詢功能。

HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。

Index Service:索引服務(wù)。根據(jù)特定的Message key,對(duì)投遞到Broker的消息進(jìn)行索引服務(wù),同時(shí)也提供根據(jù)Message Key對(duì)消息進(jìn)行快速查詢的功能。

2.9 RocketMQ 工作流程

工作流程如下圖:

1)啟動(dòng)NameServer,NameServer啟動(dòng)后開(kāi)始監(jiān)聽(tīng)端口,等待Broker、Producer、Consumer連接。

2)啟動(dòng)Broker時(shí),Broker會(huì)與所有的NameServer建立并保持長(zhǎng)連接,然后每50秒向NameServer定時(shí)發(fā)送心跳包。

3)發(fā)送消息前,可以先創(chuàng)建Topic,創(chuàng)建Topic時(shí)需要指定該Topic要存儲(chǔ)在哪些Broker上,當(dāng)然,在創(chuàng)建Topic時(shí)也會(huì)將Topic與Broker的關(guān)系寫(xiě)入到NameServer中。不過(guò),這步是可選的,也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建Topic。

4) Producer發(fā)送消息,啟動(dòng)時(shí)先跟NameServer集群中的其中一臺(tái)建立長(zhǎng)連接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略從隊(duì)選擇一個(gè)Queue,與隊(duì)列所在的Broker建立長(zhǎng)連接從而向Broker發(fā)消息。當(dāng)然,在獲取到路由信息后,Producer會(huì)首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。

5)Consumer跟Producer類(lèi)似,跟其中一臺(tái)NameServer建立長(zhǎng)連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費(fèi)的Queue,然后直接跟Broker建立長(zhǎng)連接,開(kāi)始消費(fèi)其中的消息。Consumer在獲取到路由信息后,同樣也會(huì)每30秒從NameServer更新一次路由信息。不過(guò)不同于Producer的是,Consumer還會(huì)向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。

以上就是分布式消息隊(duì)列RocketMQ概念詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ概念的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論