淺談spring-boot-rabbitmq動(dòng)態(tài)管理的方法
使用spring boot + rabbitmq的時(shí)候,在開(kāi)發(fā)過(guò)程中,可能會(huì)想要臨時(shí)停用/啟用監(jiān)聽(tīng),或修改監(jiān)聽(tīng)消費(fèi)者數(shù)量。如果每次修改都重啟比較浪費(fèi)時(shí)間,所以研究了一下不停機(jī)就啟用停用監(jiān)聽(tīng)或修改一些配置
一. 關(guān)于rabbitmq監(jiān)聽(tīng)的配置
- 配置屬性類:RabbitProperties,包含rabbitmq的認(rèn)證、監(jiān)聽(tīng)、發(fā)送者以及其他的一些配置
- 自動(dòng)配置類:RabbitAutoConfiguration,主要配置rabbitmq的連接工廠和發(fā)送者等,不包含監(jiān)聽(tīng)的配置
- rabbitmq監(jiān)聽(tīng)的配置是RabbitAnnotationDrivenConfiguration,是通過(guò)RabbitAutoConfiguration引入的
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { ... }
RabbitAnnotationDrivenConfiguration中主要就是監(jiān)聽(tīng)工廠的配置、監(jiān)聽(tīng)工廠,但是這里也只是創(chuàng)建bean,并沒(méi)有真正的初始化
通過(guò)配置里的bean類名,分析一下,rabbitmq的監(jiān)聽(tīng)肯定是由監(jiān)聽(tīng)工廠創(chuàng)建的,所以找到監(jiān)聽(tīng)工廠SimpleRabbitListenerContainerFactory
@Bean @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }
既然自動(dòng)配置里面沒(méi)有初始化監(jiān)聽(tīng),那就應(yīng)該是在其他地方調(diào)用的,進(jìn)入監(jiān)聽(tīng)工廠類中,發(fā)現(xiàn)有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測(cè)初始化肯定與這個(gè)方法有關(guān),所以查看有哪些地方調(diào)用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有創(chuàng)建監(jiān)聽(tīng)容器和初始化的代碼
/** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. * @param factory the {@link RabbitListenerContainerFactory} to use. * @return the {@link MessageListenerContainer}. */ protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer; }
繼續(xù)找調(diào)用這個(gè)方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,發(fā)現(xiàn)調(diào)用的地方很多了
看看afterPropertiesSet方法,是InitializingBean接口中的,猜測(cè)應(yīng)該是spring容器創(chuàng)建bean之后都會(huì)調(diào)用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里創(chuàng)建的實(shí)例。原來(lái)是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個(gè)自動(dòng)配置里面初始化的,所以這就找到rabbitmq初始化監(jiān)聽(tīng)的源頭了
二. 動(dòng)態(tài)管理rabbitmq監(jiān)聽(tīng)
回到最初的問(wèn)題,想要?jiǎng)討B(tài)的啟用停用mq的監(jiān)聽(tīng),所以先看看初始化配置的類,既然有初始化,那可能會(huì)有相關(guān)的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有對(duì)監(jiān)聽(tīng)容器進(jìn)行操作,主要源碼如下
/** * @return the managed {@link MessageListenerContainer} instance(s). */ public Collection<MessageListenerContainer> getListenerContainers() { return Collections.unmodifiableCollection(this.listenerContainers.values()); } @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } /** * Start the specified {@link MessageListenerContainer} if it should be started * on startup or when start is called explicitly after startup. * @see MessageListenerContainer#isAutoStartup() */ private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); } } @Override public void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); } }
寫(xiě)個(gè)controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對(duì)監(jiān)聽(tīng)進(jìn)行啟用停用的操作,并且RabbitListenerEndpointRegistry實(shí)例還可以獲取監(jiān)聽(tīng)容器,對(duì)監(jiān)聽(tīng)的一些參數(shù)也能進(jìn)行修改,比如消費(fèi)者數(shù)量。代碼如下:
import java.util.Set; import javax.annotation.Resource; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.itopener.framework.ResultMap; /** * Created by fuwei.deng on 2017年7月24日. */ @RestController @RequestMapping("rabbitmq/listener") public class RabbitMQController { @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("stop") public ResultMap stop(){ rabbitListenerEndpointRegistry.stop(); return ResultMap.buildSuccess(); } @RequestMapping("start") public ResultMap start(){ rabbitListenerEndpointRegistry.start(); return ResultMap.buildSuccess(); } @RequestMapping("setup") public ResultMap setup(int consumer, int maxConsumer){ Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds(); SimpleMessageListenerContainer container = null; for(String id : containerIds){ container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id); if(container != null){ container.setConcurrentConsumers(consumer); container.setMaxConcurrentConsumers(maxConsumer); } } return ResultMap.buildSuccess(); } }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
springboot中RestTemplate發(fā)送HTTP請(qǐng)求的實(shí)現(xiàn)示例
RestTemplate是一個(gè) spring-web 提供的執(zhí)行HTTP請(qǐng)求的同步阻塞式工具類,本文就來(lái)介紹一下RestTemplate發(fā)送HTTP請(qǐng)求,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03詳解Java并發(fā)編程之內(nèi)置鎖(synchronized)
這篇文章主要介紹了Java并發(fā)編程之內(nèi)置鎖(synchronized)的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03Spring定時(shí)任務(wù)實(shí)現(xiàn)與配置(一)
這篇文章主要為大家詳細(xì)介紹了Spring定時(shí)任務(wù)的實(shí)現(xiàn)與配置第一篇,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06java應(yīng)用程序如何自定義log4j配置文件的位置
這篇文章主要介紹了java應(yīng)用程序如何自定義log4j配置文件的位置,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Apache POI將PPT轉(zhuǎn)換成圖片實(shí)例代碼
這篇文章主要介紹了Apache POI將PPT轉(zhuǎn)換成圖片實(shí)例代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01Spring?Boot實(shí)現(xiàn)JWT?token自動(dòng)續(xù)期的實(shí)現(xiàn)
本文主要介紹了Spring?Boot實(shí)現(xiàn)JWT?token自動(dòng)續(xù)期,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12SpringBoot ApplicationContextAware拓展接口使用詳解
當(dāng)一個(gè)類實(shí)現(xiàn)了這個(gè)接口(ApplicationContextAware)之后,這個(gè)類就可以方便獲得ApplicationContext中的所有bean。換句話說(shuō),就是這個(gè)類可以直接獲取spring配置文件中,所有有引用到的bean對(duì)象2023-04-04