RocketMQ生產(chǎn)者一個(gè)應(yīng)用不能發(fā)送多個(gè)NameServer消息解決
前言
目前有兩套R(shí)ocketMQ集群,集群A包含topic
名稱為cluster_A_topic
,集群B包含topic
名稱為cluster_B_topic
,在應(yīng)用服務(wù)OrderApp
上通過(guò)RocketMQ Client
創(chuàng)建兩個(gè)DefaultMQProducer
實(shí)例發(fā)送消息給集群A和集群B
架構(gòu)圖如下:
根據(jù)上述架構(gòu)圖,我們給出的示例代碼如下:
// 創(chuàng)建第一個(gè)DefaultMQProducer DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); // 設(shè)置nameServer地址 producer1.setNamesrvAddr("192.168.2.230:9876"); try { producer1.start(); // 發(fā)送消息 SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result1.getSendStatus()) { case SEND_OK: System.out.println("cluster_A_topic 發(fā)送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_A_topic 持久化失?。?); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_A_topic 同步slave失?。?); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_A_topic 副本不可用!"); } } catch (Exception e) { e.printStackTrace(); } // 創(chuàng)建第二個(gè)DefaultMQProducer DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2"); // 設(shè)置nameServer地址 producer2.setNamesrvAddr("192.168.2.231:9876"); try { producer2.start(); // 發(fā)送消息 SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result2.getSendStatus()) { case SEND_OK: System.out.println("cluster_B_topic 發(fā)送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_B_topic 持久化失?。?); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_B_topic 同步slave失??!"); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_B_topic 副本不可用!"); } return "ok"; } catch (Exception e) { e.printStackTrace(); } finally { producer1.shutdown(); producer2.shutdown(); }
結(jié)果竟然報(bào)錯(cuò)了,報(bào)錯(cuò)內(nèi)容時(shí)cluster_B_topic
不存在:
經(jīng)過(guò)不斷的測(cè)試,發(fā)現(xiàn)只有放在最前面啟動(dòng)的DefaultMQProducer
會(huì)生效,后面啟動(dòng)的DefaultMQProducer
發(fā)送消息就報(bào)錯(cuò)說(shuō)對(duì)應(yīng)的topic
不存在,而且報(bào)錯(cuò)的broker
竟然是前面啟動(dòng)的DefaultMQProducer
對(duì)應(yīng)的broker
。這就不科學(xué)了,難道RocketMQ不允許在一個(gè)應(yīng)用上創(chuàng)建多個(gè)生產(chǎn)者?
問(wèn)題定位
首先說(shuō)明一下,當(dāng)前使用的RocketMQ Client
版本是4.8.0
。為了確定是哪兒出了問(wèn)題,不得不對(duì)源碼來(lái)一波探索[哭泣臉??]。
我們都知道生產(chǎn)者是發(fā)送消息給Broker
的,獲取Broker
信息是通過(guò)連接NameServer
獲取的。既然報(bào)錯(cuò)的Broker
和目標(biāo)Broker
竟然不對(duì)應(yīng),肯定是后面啟動(dòng)的生產(chǎn)者獲取的Broker
不對(duì)。有了最基本的判斷,我們先從DefaultMQProducer#start()
入手,最終我們定位到這樣一段代碼DefaultMQProducerImpl#start(final boolean startFactory)
:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); // 如果生產(chǎn)者group名稱不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 創(chuàng)建MQClientInstance實(shí)例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 注冊(cè)生產(chǎn)者實(shí)例到MQClientInstance中 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 添加TBW102對(duì)應(yīng)的topic信息,broker設(shè)置autoCreateTopicEnable = true才起作用 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { // 啟動(dòng)剛剛創(chuàng)建的MQClientInstance實(shí)例 mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); // 修改服務(wù)狀態(tài)為RUNNING this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
上面的代碼主要是創(chuàng)建了MQClientInstance
實(shí)例,并且通過(guò)start()
方法啟動(dòng)。
通過(guò)針對(duì)這兩段代碼的debug,我們發(fā)現(xiàn)創(chuàng)建的兩個(gè)DefaultMQProducer
對(duì)象是共用了一個(gè)MQClientInstance
實(shí)例,并且所有針對(duì)NameServer
和Broker
的遠(yuǎn)程操作全部是通過(guò)MQClientInstance
實(shí)例來(lái)做的。比如發(fā)送消息的時(shí)候需要找到對(duì)應(yīng)的Broker
下的消息隊(duì)列:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 從NameServer更新topic路由 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
最終我們發(fā)現(xiàn)兩個(gè)DefaultMQProducer
對(duì)象都是去同一個(gè)NameServer
下獲取對(duì)應(yīng)的topic
信息,這下問(wèn)題就定位到了:因?yàn)槭褂昧送粋€(gè)MQClientInstance
實(shí)例導(dǎo)致不同的DefaultMQProducer
去訪問(wèn)了同一個(gè)NameServer
,同一個(gè)集群需要同時(shí)接收兩個(gè)topic
的消息,也就出現(xiàn)了前面的報(bào)錯(cuò)說(shuō)topic
不存在的情況。
如何解決
我們來(lái)看看MQClientInstance
實(shí)例是如何保證唯一性的:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { // 生成clientID String clientId = clientConfig.buildMQClientId(); // 從緩存中獲取MQClientInstance MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { // 沒(méi)有緩存的話就創(chuàng)建一個(gè)MQClientInstance instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); // 新創(chuàng)建出來(lái)的再放進(jìn)緩存 MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } // 返回MQClientInstance實(shí)例 return instance; }
我們之所以拿到的MQClientInstance
實(shí)例是同一個(gè),是因?yàn)樵谕粋€(gè)服務(wù)下創(chuàng)建的clientId
相同:
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
兩個(gè)clientId
都是192.168.18.173@14933
,為了防止clientId
相同,我們可以在創(chuàng)建DefaultMQProducer
實(shí)例是加上unitName
值,保證兩個(gè)unitName
值不同來(lái)避免共享同一個(gè)MQClientInstance
。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); producer1.setNamesrvAddr("192.168.2.230:9876"); producer1.setUnitName("producer1"); producer1.start(); DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1"); producer2.setNamesrvAddr("192.168.2.231:9876"); producer2.setUnitName("producer2"); producer2.start();
通過(guò)上述代碼修改后,兩個(gè)消息都發(fā)送成功了。
另一個(gè)辦法就是升級(jí)RocketMQ Client
到4.9.0
,我們來(lái)看一下RocketMQ Client 4.9.0
是怎么解決這個(gè)問(wèn)題的:
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); } }
RocketMQ Client 4.9.0
在后面補(bǔ)充了一個(gè)納秒值,之前的代碼是這樣的:
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } }
也就是說(shuō),在新的版本中,一個(gè)應(yīng)用服務(wù)內(nèi)創(chuàng)建多個(gè)DefaultMQProducer
就會(huì)有多個(gè)MQClientInstance
實(shí)例對(duì)應(yīng),不會(huì)再出現(xiàn)我們前面的報(bào)錯(cuò)。
以上就是RocketMQ生產(chǎn)者一個(gè)應(yīng)用不能發(fā)送多個(gè)NameServer消息解決的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ發(fā)送NameServer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū)
這篇文章主要介紹了淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04java實(shí)現(xiàn)獲取安卓設(shè)備里已安裝的軟件包
本文給大家介紹的是如何獲取設(shè)備中已經(jīng)安裝的應(yīng)用軟件包的代碼,其核心方法原理很簡(jiǎn)單,我們通過(guò)Android中提供的PackageManager類,來(lái)獲取手機(jī)中安裝的應(yīng)用程序信息2015-10-10spring mvc DispatcherServlet之前端控制器架構(gòu)詳解
這篇文章主要為大家詳細(xì)介紹了spring mvc DispatcherServlet之前端控制器架構(gòu),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04Java程序單實(shí)例運(yùn)行的簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要介紹了Java程序單實(shí)例運(yùn)行的簡(jiǎn)單實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java Math類的三個(gè)方法ceil,floor,round用法
這篇文章主要介紹了Java Math類的三個(gè)方法ceil,floor,round用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Mybatis order by 動(dòng)態(tài)傳參出現(xiàn)的問(wèn)題及解決方法
今天,我正在愉快地CRUD,突然發(fā)現(xiàn)出現(xiàn)一個(gè)Bug,我們來(lái)看看是怎么回事吧!接下來(lái)通過(guò)本文給大家介紹Mybatis order by 動(dòng)態(tài)傳參出現(xiàn)的一個(gè)小bug,需要的朋友可以參考下2021-07-07