RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例
RocketMQ發(fā)送消息
我們在使用RocketMQ發(fā)送消息時,一般都會使用DefaultMQProducer
,類型的代碼如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("42.192.50.8:9876"); try { producer.start(); producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); }
上述代碼中,在消息發(fā)送之前調(diào)用了start()
方法,如果不調(diào)用start()
方法,直接發(fā)送消息,那么會出現(xiàn)以下報錯:
報錯消息里面很明顯地告知我們,目前這個DefaultMQProducer
狀態(tài)沒有準備好,還不能發(fā)送消息。為了一探究竟,我們得去看看start()
里面究竟做了什么操作呢?
start()里面究竟做了什么操作
我們根據(jù)源碼一路走下來,可以追蹤到DefaultMQProducerImpl.start(final boolean startFactory)
這個方法:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 創(chuàng)建MQClientInstance this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 注冊Producer到MQClientInstance中 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 啟動MQClientInstance實例 if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
上述代碼主要做了以下幾點:
1.創(chuàng)建MQClientInstance實例;
2.注冊Producer到MQClientInstance實例中;
3.啟動MQClientInstance實例;
MQClientInstance
實例并不是每次都會創(chuàng)建的,它創(chuàng)建出來也會緩存的MQClientManager
中,不過根據(jù)源碼來看的話,每次創(chuàng)建Producer
都會對應(yīng)創(chuàng)建一個新的MQClientInstance
實例,所以一般情況下不建議一個應(yīng)用服務(wù)中重復(fù)創(chuàng)建Producer
;
最終start()
方法的關(guān)鍵實現(xiàn)邏輯還是需要進入MQClientInstance.start()
中:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 如果namesrv地址為null,那么就需要自己找namesrv地址 if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // 開啟一個請求響應(yīng)渠道,沒猜錯的話,應(yīng)該是netty實現(xiàn)的 this.mQClientAPIImpl.start(); // 開啟定時任務(wù) this.startScheduledTask(); // 開啟拉消息服務(wù) this.pullMessageService.start(); // 開啟負載均衡服務(wù) this.rebalanceService.start(); // 再開啟一個默認生產(chǎn)者,這個生產(chǎn)者不需要啟動MQClientInstance實例 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
看樣子,這才是start()方法真正要做的事情:
1.找namesrv地址,應(yīng)該是后面需要使用namesrv地址查詢對應(yīng)的broker
;
2.開啟Netty客戶端的初始化,包括與namesrv建立信道;另外開啟兩個定時任務(wù),一個清除列表中過期的請求,第二個就是篩選可用的namesrv服務(wù);
3.開啟一些定時任務(wù);包括如果沒有設(shè)置namesrv地址的話,會從指定站點拉namesrv地址;清除下線broker并發(fā)送心跳給所有的broker等工作;
4.因為當(dāng)前是生產(chǎn)者,所以pullMessageService很快就結(jié)束;
5.生產(chǎn)者不需要做負載均衡,所以rebalanceService很快也結(jié)束;
6.給默認創(chuàng)建的生產(chǎn)者執(zhí)行一下start()方法,其實啥也沒做;
上述大多數(shù)任務(wù)都是給消費者使用的,作為生產(chǎn)者,唯一起作用的就是前三步,查找namesrv地址、第二步與namesrv建立通信以及第三步對broker的一些定時清理工作;不過沒有發(fā)生消息之前,是不會從遠程獲取任何數(shù)據(jù)的。所以綜上所述,start()方法里面只做了以下兩件事情:
1.與namesrv建立通信渠道,它甚至都沒有從namesrv獲取任何數(shù)據(jù);
2.啟動一些定時任務(wù),包括清理下線的broker;
小結(jié)
雖然在生產(chǎn)者中,start()方法里面真正做的事情比較少,但是卻是非常有必要的。發(fā)送消息之前,我們沒有使用start()方法,導(dǎo)致消息發(fā)送失敗,是因為生產(chǎn)者與namesrv之間的通信渠道沒有建立。
以上就是RocketMQ生產(chǎn)者調(diào)用start發(fā)送消息原理示例的詳細內(nèi)容,更多關(guān)于RocketMQ調(diào)用start發(fā)送消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
@RefreshScope在Quartz 觸發(fā)器類導(dǎo)致異常問題解決分析
這篇文章主要為大家介紹了@RefreshScope在Quartz 觸發(fā)器類導(dǎo)致異常問題解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02Java使用自定義注解+反射實現(xiàn)字典轉(zhuǎn)換代碼實例
這篇文章主要介紹了Java使用自定義注解+反射實現(xiàn)字典轉(zhuǎn)換代碼實例,注解是一種能被添加到j(luò)ava代碼中的元數(shù)據(jù),類、方法、變量、參數(shù)和包都可以用注解來修飾,注解對于它所修飾的代碼并沒有直接的影響,需要的朋友可以參考下2023-09-09Java簡化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計模式
Java門面模式是一種結(jié)構(gòu)性設(shè)計模式,它為復(fù)雜系統(tǒng)提供了一個簡單的接口,使得系統(tǒng)的客戶端能夠更加方便地使用系統(tǒng)功能。門面模式通過封裝復(fù)雜的子系統(tǒng),隱藏系統(tǒng)的實現(xiàn)細節(jié),提高了系統(tǒng)的易用性和靈活性2023-04-04SpringBoot啟動security后如何關(guān)閉彈出的/login頁面
這篇文章主要介紹了SpringBoot啟動security后如何關(guān)閉彈出的login頁面問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12