RocketMq同組消費(fèi)者如何自動(dòng)設(shè)置InstanceName
一、背景
同組多于1個(gè)消費(fèi)者,如果沒單獨(dú)設(shè)置instanceName,默認(rèn)為DEFAULT。
啟動(dòng)時(shí)會(huì)報(bào)如下錯(cuò)誤:
org.apache.rocketmq.client.exception.MQClientException: The consumer group[group_03] has been created before, specify another name please.
二、處理方法
創(chuàng)建MqBeanPost,利用后置處理器獲取到想要設(shè)置的bean,把instanceName設(shè)置成隨機(jī)數(shù)。
@Component public class MqBeanPost implements BeanPostProcessor { @Autowired MqJudgePacsConfig mqJudgePacsConfig; @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof DefaultRocketMQListenerContainer){ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; String topic = container.getTopic(); if(topic.equals(mqJudgePacsConfig.getTopic())){ DefaultMQPushConsumer consumer = container.getConsumer(); consumer.setInstanceName(UUID.fastUUID().toString()); } } return bean; } }
三、源碼分析
一、springboot整合rocketmq啟動(dòng)流程:
(1)SpringBootApplication
(2)@EnableAutoConfiguration
(3)AutoConfigurationImportSelector實(shí)現(xiàn)了ImportSelector 接口,所以執(zhí)行selectImports方法
- ->getAutoConfigurationEntry
- ->getCandidateConfigurations
- ->SpringFactoriesLoader.loadFactoryNames
- ->loadSpringFactories此方法會(huì)讀取所有META-INF/spring.factories文件,轉(zhuǎn)成Map<String, List>,最后getOrDefault(factoryTypeName, Collections.emptyList())獲取key 為org.springframework.boot.autoconfigure.EnableAutoConfiguration的值為需要加載到容器類的全類名的集合。
(4)rocketmq和springboot整合jar中spring.factories位置。
(5)RocketMQAutoConfiguration中@import注入ListenerContainerConfiguration。
ListenerContainerConfiguration 實(shí)現(xiàn)了SmartInitializingSingleton類所以當(dāng)spring容器創(chuàng)建ListenerContainerConfiguration是會(huì)進(jìn)入afterSingletonsInstantiated方法。
(6)此方法中,獲取帶有RocketMQMessageListener注解類的集合,遍歷執(zhí)行registerContainer。
public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); beans.forEach(this::registerContainer); }
(7)重點(diǎn)分析一下畫紅的代碼。createRocketMQListenerContainer方法是獲取注解中的屬性,創(chuàng)建出DefaultRocketMQListenerContainer對(duì)象。最后注冊(cè)到容器。
(8)DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);
spring容器創(chuàng)建DefaultRocketMQListenerContainer對(duì)象。
創(chuàng)建對(duì)象的流程不在贅述(可以翻看我以前博客創(chuàng)建對(duì)象流程)。
主要分析后置處理器。使用后置處理器來(lái)處理instanceName。
(9)invokeInitMethods方法中,執(zhí)行afterPropertiesSet初始化方法。
invokeInitMethods方法執(zhí)行前會(huì)調(diào)用applyBeanPostProcessorsBeforeInitialization,方法執(zhí)行后會(huì)調(diào)用applyBeanPostProcessorsAfterInitialization。
(10)DefaultRocketMQListenerContainer 實(shí)現(xiàn)了InitializingBean所以在執(zhí)行初始化方法時(shí),調(diào)用afterPropertiesSet,然后繼續(xù)調(diào)用initRocketMQPushConsumer
(11)initRocketMQPushConsumer 方法會(huì)創(chuàng)建DefaultMQPushConsumer對(duì)象,默認(rèn)的instanceName就是在此創(chuàng)建。
所以如果想給DefaultMQPushConsumer設(shè)置instanceName,就可以在applyBeanPostProcessorsAfterInitialization中設(shè)置。
為何不能在applyBeanPostProcessorsBeforeInitialization執(zhí)行的時(shí)候,因?yàn)镈efaultMQPushConsumer還未創(chuàng)建。
(12)拓展:initRocketMQPushConsumer 中畫紅的地方。
如果消費(fèi)端實(shí)現(xiàn)了RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener類的話,可以重寫prepareStart方法。
在prepareStart方法中設(shè)置instanceName。
但是這種方法如果有多個(gè)消費(fèi)端的話,要寫多次。
四、總結(jié)
此方法的切入點(diǎn)是DefaultRocketMQListenerContainer類創(chuàng)建過程中,使用后置處理器設(shè)置instanceName。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
零基礎(chǔ)學(xué)Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個(gè)Java項(xiàng)目及Eclipse的一些基礎(chǔ)使用技巧
這篇文章主要介紹了零基礎(chǔ)學(xué)Java:Java開發(fā)工具 Eclipse 安裝過程創(chuàng)建第一個(gè)Java項(xiàng)目及Eclipse的一些基礎(chǔ)使用技巧,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09Java 生成任意長(zhǎng)度的驗(yàn)證碼過程解析
這篇文章主要介紹了Java 生成任意長(zhǎng)度的驗(yàn)證碼過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10Java mysql詳細(xì)講解雙數(shù)據(jù)源配置使用
在開發(fā)過程中我們常常會(huì)用到兩個(gè)數(shù)據(jù)庫(kù),一個(gè)數(shù)據(jù)用來(lái)實(shí)現(xiàn)一些常規(guī)的增刪改查,另外一個(gè)數(shù)據(jù)庫(kù)用來(lái)實(shí)時(shí)存數(shù)據(jù)。進(jìn)行數(shù)據(jù)的統(tǒng)計(jì)分析。可以讀寫分離??梢愿玫膬?yōu)化和提高效率;或者兩個(gè)數(shù)據(jù)存在業(yè)務(wù)分離的時(shí)候也需要多個(gè)數(shù)據(jù)源來(lái)實(shí)現(xiàn)2022-06-06SpringBoot中的@EnableConfigurationProperties注解原理及用法
在SpringBoot中,@EnableConfigurationProperties注解是一個(gè)非常有用的注解,它可以用于啟用對(duì)特定配置類的支持,在本文中,我們將深入探討@EnableConfigurationProperties注解,包括它的原理和如何使用,需要的朋友可以參考下2023-06-06Java中Dijkstra算法求解最短路徑的實(shí)現(xiàn)
Dijkstra算法是一種解決最短路徑問題的常用算法,本文主要介紹了Java中Dijkstra算法求解最短路徑的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09SpringBoot中的異常處理與參數(shù)校驗(yàn)的方法實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot中的異常處理與參數(shù)校驗(yàn)的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04