RocketMQ?Push?消費(fèi)模型示例詳解
Push 模式是指由 Server 端來控制消息的推送,即當(dāng)有消息到 Server 之后,會(huì)將消息主動(dòng)投遞給 client(Consumer 端)。
使用 DefaultMQPushConsumer 消費(fèi)消息
下面是使用 DefaultMQPushConsumer 消費(fèi)消息的官方示例代碼:
// 初始化consumer,并設(shè)置consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup"); // 設(shè)置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); //訂閱一個(gè)或多個(gè)topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息 consumer.subscribe("TopicTest", "*"); //注冊回調(diào)接口來處理從Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)Consumer consumer.start();
這里看到主要是通過 consumer 注冊回調(diào)接口來處理從 Broker 中收到的消息。這種監(jiān)聽回調(diào)的機(jī)制很容易想到是一種觀察者模式或者事件機(jī)制;對于這種 C-S 模型的架構(gòu)來說,如果要做到 Server 在有新消息時(shí)立即推送給 Client,那么 Client 和 Server 之間應(yīng)該是有連接存在的,Client 端開放端口來 watch Server 的推送。這里好論證,即可以查看當(dāng)前 Client 端所在進(jìn)程開啟了什么端口即可,通過如下指令查看:
- 1、先通過 jps 查看 Consumer Client 的進(jìn)程號
? rocketmq-4.9.4 git:(06f2208a3) jps 10722 Jps 4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1766 4121 BrokerStartup 4009 NamesrvStartup 9419 PushConsumer 9692 RemoteMavenServer36
可以看到 PushConsumer 的進(jìn)程號是 9419
- 2、通過 lsof 命令查看進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN ?
這里沒有看到 PushConsumer 有開啟端口。同樣,這里可以看看 Broker 的進(jìn)程端口占用
? rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN java 4121 glmapper 137u IPv6 0xca1142b0f200067d 0t0 TCP *:10912 (LISTEN) java 4121 glmapper 141u IPv6 0xca1142b0f1fc8cfd 0t0 TCP *:10911 (LISTEN) java 4121 glmapper 142u IPv6 0xca1142b0f1fc935d 0t0 TCP *:10909 (LISTEN)
所以得到一個(gè)初步的結(jié)論是,在 Push 模式下,Consumer Client 并沒有啟動(dòng)端口來接收 Server 的消息推送。 那么 RocketMQ 是怎么實(shí)現(xiàn)的?
基于長輪詢機(jī)制的偽 push 實(shí)現(xiàn)
真正的 Push 方式,是 Server 端接收到消息后,主動(dòng)把消息推送給 Client 端,這種情況一般需要 Client 和 Server 之間建立長連接。通過前面的分析,Client 既然沒有開啟端口用于接收 Server 的信息推送,那么只有一種可能就是 Client 自己去拉了消息,但是這種主動(dòng)拉消息的方式是對于用戶無感的,從使用上體驗(yàn)上來看,做到了和 push 一樣的效果;這種機(jī)制就是“長輪詢”。
為啥不用長連接方式,讓 Server 主動(dòng) Push 呢?其實(shí)很好理解,對于一個(gè)提供隊(duì)列服務(wù)的 Server 來說,用 Push方式主動(dòng)推送有兩個(gè)問題:
- 1、會(huì)增加 Server 端的工作量,進(jìn)而影響 Server 的性能
- 2、Client 的處理能力存在差異,Client 的狀態(tài)不受 Server 控制,如果 Client 不能及時(shí)處理 Server 推送過來的消息,會(huì)造成各種潛在問題
客戶端側(cè)發(fā)起的長輪詢請求
下圖是初始化相關(guān)資源的過程,DefaultMQPushConsumer 是面向用戶使用的 API client 類,內(nèi)部處理實(shí)際上是委托給 DefaultMQPushConsumerImpl 來處理的。DefaultMQPushConsumerImpl#start 時(shí),會(huì)初始化 MQClientInstance ,MQClientInstance 初始化過程中又會(huì)初始化一堆資源,比如請求-響應(yīng)的通道,開啟各種各樣的調(diào)度任務(wù)(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline狀態(tài)的 Broker、定期發(fā)送心跳給 Broker、定期持久化所有 Consumer Offset等等),開啟 pullMessageService,開啟 rebalance Service 等等。大致的調(diào)用鏈如下圖
下面這個(gè)代碼片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子類)
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 從 pullRequestQueue 中取 pullRequest PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
通過代碼,可以直觀的看起,pullMessageService 會(huì)一直從 pullRequestQueue 中取 pullRequest,然后執(zhí)行 pullMessage 請求。實(shí)際上 MessageQueue 是和 pullRequest 一一對應(yīng)的 ,pullRequest 全部存儲到該 Consumer 的 pullRequestQueue 隊(duì)列里面;消費(fèi)者會(huì)不停的從 PullRequest 的隊(duì)列里取 request 然后向broker 請求消息。
這里還有一個(gè)問題是隊(duì)列取出之后什么時(shí)候放回去的?在 pullMessage 的回調(diào)方法中,如果正常得到了 broker 的響應(yīng),那么會(huì)把 PullRequest放回隊(duì)列,相關(guān)代碼可以從 org.apache.rocketmq.client.consumer.PullCallback
onSuccess 方法中得到答案。
服務(wù)端阻塞請求
服務(wù)端處理 pullRequest 請求的是 PullMessageProcessor,當(dāng)沒有消息時(shí),則通過 PullRequestHoldService 將當(dāng)前請求先 hold 住。
case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; // 如果是 LongPolling,則 hold 住 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
PullRequestHoldService 中會(huì)將所有的 PullRequest 緩存到 pullRequestTable。PullRequestHoldService 也是一個(gè) task,默認(rèn)每次 hold 5s 然后再去檢查是否有新的消息過來,如果有新的消息到來,則喚醒對應(yīng)的線程來將消息返回給客戶端。
// 已省略無關(guān)代碼 public void run() { // loop while (!this.isStopped()) { // default hold 5s if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 檢查是否有新的消息到達(dá) this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } }
客戶端回調(diào)處理
我們在編寫 consumer 代碼時(shí),基于 push 模式是通過如下方式來監(jiān)聽消息的
//注冊回調(diào)接口來處理從Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消費(fèi)狀態(tài),ConsumeConcurrentlyStatus.CONSUME_SUCCESS 為消費(fèi)成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
通過前面的分析,對于如何通過“長輪詢”實(shí)現(xiàn)偽“push” 有了大概得了解;客戶端通過一個(gè)定時(shí)任務(wù)不斷向 Broker 發(fā)請求,Broker 在沒有消息時(shí)先 hold 住一小段時(shí)間,當(dāng)有新的消息時(shí)會(huì)立即將消息返回給 consumer;本節(jié)就主要探討 consumer 在收到消息之后的處理邏輯,以及是怎么觸發(fā) MessageListener 回調(diào)執(zhí)行的。
客戶端發(fā)起請求的底層邏輯
以異步調(diào)用為例,代碼在
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
中,截取部分代碼如下:
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr); assert pullResult != null; // 成功回調(diào) pullCallback.onSuccess(pullResult); } catch (Exception e) { // 異?;卣{(diào) pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { // 異常回調(diào) pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { // 異?;卣{(diào) pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { // 異?;卣{(diào) pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } });
PullCallback 回調(diào)
PullCallback 回調(diào)邏輯在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
方法中,以正常返回消息為例:
// 已省略無關(guān)代碼 public void onSuccess(PullResult pullResult) { // 將接收到的消息 交給 consumeMessageService 處理 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); // 將 pullRequest 放回 pullRequestQueue DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }
ConsumeRequest 是一個(gè) Runnable,submitConsumeRequest 就是將返回結(jié)果丟在一個(gè)單獨(dú)的線程池中去處理返回結(jié)果的。ConsumeRequest 的 run 方法中,會(huì)拿到 messageListener,然后執(zhí)行 consumeMessage 方法。
總結(jié)
到此,關(guān)于 RocketMQ push 消費(fèi)模型基本就探討完了。從實(shí)現(xiàn)機(jī)制上來看,push 本質(zhì)上并不是在建立雙向通道的前提下,由 Server 主動(dòng)推送給 Client 的,而是由 Client 端觸發(fā) pullRequest 請求,以長輪詢的方式“偽裝”的結(jié)果。從代碼上來,RocketMQ 代碼中使用了非常多的異步機(jī)制,如 pullRequestQueue 來解耦發(fā)送請求和等待結(jié)果,各種定時(shí)任務(wù)等等。
整體看,PushConsumer 采用了 長輪詢+超時(shí)時(shí)間+Pull的模式, 這種方式帶來的好處總結(jié)如下 :
- 1、減少 Broker 的壓力,避免由于不同 Consumer 消費(fèi)能力導(dǎo)致 Broker 出現(xiàn)問題
- 2、確保了 Consumer 不會(huì)負(fù)載過高,Consumer 在校驗(yàn)自己的緩存消息沒有超過閾值才會(huì)去從 Broker 拉取消息,Broker 不會(huì)主動(dòng)推過來
- 3、兼顧了消息的即時(shí)性,Broker 在沒有消息的時(shí)候會(huì)先 hold 一小段時(shí)間,有消息會(huì)立即喚起線程將消息返回給 Consumer
- 4、Broker 端無效請求的次數(shù)大大降低,Broker 在沒有消息時(shí)會(huì)掛起 PullRequest,而 Consumer 在未接收到Response 且未超時(shí)時(shí),也不會(huì)重新發(fā)起 PullRequest
以上就是RocketMQ Push 消費(fèi)模型示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Push 消費(fèi)模型的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java String字符串內(nèi)容實(shí)現(xiàn)添加雙引號
這篇文章主要介紹了Java String字符串內(nèi)容實(shí)現(xiàn)添加雙引號,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼
這篇文章主要介紹了application作用域?qū)崿F(xiàn)用戶登錄擠掉之前登錄用戶代碼,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11Java序列化常見實(shí)現(xiàn)方法代碼實(shí)例
這篇文章主要介紹了Java序列化常見實(shí)現(xiàn)方法代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11java實(shí)現(xiàn)短地址服務(wù)的方法(附代碼)
大多數(shù)情況下URL太長,字符多,不便于發(fā)布復(fù)制和存儲,本文就介紹了通過java實(shí)現(xiàn)短地址服務(wù),減少了許多使用太長URL帶來的不便,需要的朋友可以參考下2015-07-07詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法
本篇文章主要介紹了詳解在SpringBoot應(yīng)用中獲取應(yīng)用上下文方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-04-04Java之不通過構(gòu)造函數(shù)創(chuàng)建一個(gè)對象問題
這篇文章主要介紹了Java之不通過構(gòu)造函數(shù)創(chuàng)建一個(gè)對象問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-03-03