RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決
前言
目前有兩套RocketMQ集群,集群A包含topic名稱為cluster_A_topic,集群B包含topic名稱為cluster_B_topic,在應(yīng)用服務(wù)OrderApp上通過RocketMQ Client創(chuàng)建兩個DefaultMQProducer實(shí)例發(fā)送消息給集群A和集群B
架構(gòu)圖如下:

根據(jù)上述架構(gòu)圖,我們給出的示例代碼如下:
// 創(chuàng)建第一個DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
// 設(shè)置nameServer地址
producer1.setNamesrvAddr("192.168.2.230:9876");
try {
producer1.start();
// 發(fā)送消息
SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result1.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_A_topic 發(fā)送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_A_topic 持久化失?。?);
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_A_topic 同步slave失?。?);
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_A_topic 副本不可用!");
}
} catch (Exception e) {
e.printStackTrace();
}
// 創(chuàng)建第二個DefaultMQProducer
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
// 設(shè)置nameServer地址
producer2.setNamesrvAddr("192.168.2.231:9876");
try {
producer2.start();
// 發(fā)送消息
SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result2.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_B_topic 發(fā)送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_B_topic 持久化失?。?);
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_B_topic 同步slave失??!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_B_topic 副本不可用!");
}
return "ok";
} catch (Exception e) {
e.printStackTrace();
} finally {
producer1.shutdown();
producer2.shutdown();
}
結(jié)果竟然報(bào)錯了,報(bào)錯內(nèi)容時cluster_B_topic不存在:

經(jīng)過不斷的測試,發(fā)現(xiàn)只有放在最前面啟動的DefaultMQProducer會生效,后面啟動的DefaultMQProducer發(fā)送消息就報(bào)錯說對應(yīng)的topic不存在,而且報(bào)錯的broker竟然是前面啟動的DefaultMQProducer對應(yīng)的broker。這就不科學(xué)了,難道RocketMQ不允許在一個應(yīng)用上創(chuàng)建多個生產(chǎn)者?
問題定位
首先說明一下,當(dāng)前使用的RocketMQ Client版本是4.8.0。為了確定是哪兒出了問題,不得不對源碼來一波探索[哭泣臉??]。
我們都知道生產(chǎn)者是發(fā)送消息給Broker的,獲取Broker信息是通過連接NameServer獲取的。既然報(bào)錯的Broker和目標(biāo)Broker竟然不對應(yīng),肯定是后面啟動的生產(chǎn)者獲取的Broker不對。有了最基本的判斷,我們先從DefaultMQProducer#start()入手,最終我們定位到這樣一段代碼DefaultMQProducerImpl#start(final boolean startFactory):
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
// 如果生產(chǎn)者group名稱不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 創(chuàng)建MQClientInstance實(shí)例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注冊生產(chǎn)者實(shí)例到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加TBW102對應(yīng)的topic信息,broker設(shè)置autoCreateTopicEnable = true才起作用
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 啟動剛剛創(chuàng)建的MQClientInstance實(shí)例
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 修改服務(wù)狀態(tài)為RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
上面的代碼主要是創(chuàng)建了MQClientInstance實(shí)例,并且通過start()方法啟動。
通過針對這兩段代碼的debug,我們發(fā)現(xiàn)創(chuàng)建的兩個DefaultMQProducer對象是共用了一個MQClientInstance實(shí)例,并且所有針對NameServer和Broker的遠(yuǎn)程操作全部是通過MQClientInstance實(shí)例來做的。比如發(fā)送消息的時候需要找到對應(yīng)的Broker下的消息隊(duì)列:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 從NameServer更新topic路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
最終我們發(fā)現(xiàn)兩個DefaultMQProducer對象都是去同一個NameServer下獲取對應(yīng)的topic信息,這下問題就定位到了:因?yàn)槭褂昧送粋€MQClientInstance實(shí)例導(dǎo)致不同的DefaultMQProducer去訪問了同一個NameServer,同一個集群需要同時接收兩個topic的消息,也就出現(xiàn)了前面的報(bào)錯說topic不存在的情況。
如何解決
我們來看看MQClientInstance實(shí)例是如何保證唯一性的:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 生成clientID
String clientId = clientConfig.buildMQClientId();
// 從緩存中獲取MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 沒有緩存的話就創(chuàng)建一個MQClientInstance
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 新創(chuàng)建出來的再放進(jìn)緩存
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
// 返回MQClientInstance實(shí)例
return instance;
}
我們之所以拿到的MQClientInstance實(shí)例是同一個,是因?yàn)樵谕粋€服務(wù)下創(chuàng)建的clientId相同:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
兩個clientId都是192.168.18.173@14933,為了防止clientId相同,我們可以在創(chuàng)建DefaultMQProducer實(shí)例是加上unitName值,保證兩個unitName值不同來避免共享同一個MQClientInstance。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();
通過上述代碼修改后,兩個消息都發(fā)送成功了。
另一個辦法就是升級RocketMQ Client到4.9.0,我們來看一下RocketMQ Client 4.9.0是怎么解決這個問題的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
RocketMQ Client 4.9.0在后面補(bǔ)充了一個納秒值,之前的代碼是這樣的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
也就是說,在新的版本中,一個應(yīng)用服務(wù)內(nèi)創(chuàng)建多個DefaultMQProducer就會有多個MQClientInstance實(shí)例對應(yīng),不會再出現(xiàn)我們前面的報(bào)錯。
以上就是RocketMQ生產(chǎn)者一個應(yīng)用不能發(fā)送多個NameServer消息解決的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ發(fā)送NameServer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū)
這篇文章主要介紹了淺談JAVA 線程狀態(tài)中可能存在的一些誤區(qū),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
java實(shí)現(xiàn)獲取安卓設(shè)備里已安裝的軟件包
本文給大家介紹的是如何獲取設(shè)備中已經(jīng)安裝的應(yīng)用軟件包的代碼,其核心方法原理很簡單,我們通過Android中提供的PackageManager類,來獲取手機(jī)中安裝的應(yīng)用程序信息2015-10-10
spring mvc DispatcherServlet之前端控制器架構(gòu)詳解
這篇文章主要為大家詳細(xì)介紹了spring mvc DispatcherServlet之前端控制器架構(gòu),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-04-04
Java程序單實(shí)例運(yùn)行的簡單實(shí)現(xiàn)
這篇文章主要介紹了Java程序單實(shí)例運(yùn)行的簡單實(shí)現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Java Math類的三個方法ceil,floor,round用法
這篇文章主要介紹了Java Math類的三個方法ceil,floor,round用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
Mybatis order by 動態(tài)傳參出現(xiàn)的問題及解決方法
今天,我正在愉快地CRUD,突然發(fā)現(xiàn)出現(xiàn)一個Bug,我們來看看是怎么回事吧!接下來通過本文給大家介紹Mybatis order by 動態(tài)傳參出現(xiàn)的一個小bug,需要的朋友可以參考下2021-07-07

