springboot+redis自定義注解實現(xiàn)發(fā)布訂閱的實現(xiàn)代碼
前言
最近開發(fā)了一個內(nèi)部消息組件,邏輯大體是通過定義注解 @MessageHub,在啟動時掃描全部bean中有使用了該注解的方法后臺創(chuàng)建一個常駐線程代理消費數(shù)據(jù),當(dāng)線程消費到數(shù)據(jù)就回寫到對應(yīng)加了注解的方法里。
@Slf4j @Service public class RedisConsumerDemo { @MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB") public void consumer(Object message) { log.info("pubsub info {} ", message); } }
實現(xiàn)redis的隊列、stream方式實現(xiàn)都很簡單,唯獨發(fā)布訂閱方式,網(wǎng)上的demo全都是一個固定套路,通過redis容器注入監(jiān)聽器,而且回寫非常死板。那么如何將這塊的邏輯統(tǒng)一呢。之前總結(jié)過消息組件的代碼設(shè)計,這里貼一下鏈接。
常規(guī)寫法
常規(guī)實現(xiàn)reids的發(fā)布訂閱模式寫法一共三步
創(chuàng)建消息監(jiān)聽器
@Bean public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) { return new MessageListenerAdapter(messageListener, "onMessage"); }
創(chuàng)建訂閱器
@Component public class TestSubscriber implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { log.info("get data :{}", msg); } }
向redis容器中添加消息監(jiān)聽器
@Configuration public class RedisConfig { ? ? @Bean ? ? public RedisMessageListenerContainer container( ? ? ? ? RedisConnectionFactory redisConnectionFactory, ? ? ? ? MessageListenerAdapter smsExpirationListener) { ? ? ? ? RedisMessageListenerContainer container = new RedisMessageListenerContainer(); ? ? ? ? container.setConnectionFactory(redisConnectionFactory); ? ? ? ? container.addMessageListener(smsExpirationListener, new PatternTopic("test")); ? ? ? ? return container; ? ? } }
這樣定義非常簡單明了,但是有個問題是太代碼僵硬了,創(chuàng)建監(jiān)聽者很不靈活,只能指定內(nèi)部的onMessage方法,那么怎么才能融入到我們的內(nèi)部消息流轉(zhuǎn)中間件里呢。
自定義注解實現(xiàn)
我們內(nèi)部組件抽象了兩個方法,生產(chǎn)和消費,但這兩個方法邏輯截然不同,生產(chǎn)方法是暴露給serverice層接口調(diào)用,調(diào)用方在調(diào)用生產(chǎn)方法后能直接知道生產(chǎn)了幾條數(shù)據(jù)和成功與否。而消費方法是配合Spring生命周期函數(shù)服務(wù)啟動時建立常駐消費線程的。
/** ?* 生產(chǎn)消息 ?*/ Integer producer(MessageForm messageForm); /** ?* 消費消息 ?*/ void consumer(ConsumerAdapterForm adapterForm);
生產(chǎn)消息當(dāng)然很容易實現(xiàn),只需要調(diào)用已經(jīng)封裝好的convertAndSend方法。
stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());
消費方法就有說法了,動態(tài)生成監(jiān)聽者的場景下使用redis容器用代碼挨個注冊已經(jīng)滿足不了了,但仔細(xì)過一遍源代碼就會發(fā)現(xiàn),監(jiān)聽類的構(gòu)造方法的入?yún)⒅挥袃蓚€,第一個需要回調(diào)的代理類,第二個消費到數(shù)據(jù)后回調(diào)的方法。
/** * Create a new {@link MessageListenerAdapter} for the given delegate. * * @param delegate the delegate object * @param defaultListenerMethod method to call when a message comes * @see #getListenerMethodName */ public MessageListenerAdapter(Object delegate, String defaultListenerMethod) { this(delegate); setDefaultListenerMethod(defaultListenerMethod); }
那么好了好了,方案有了,本質(zhì)上就是把RedisMessageListenerContainer注入進(jìn)來之后,掃描項目里所有加了 @MessageHub 的bean,包裝成監(jiān)聽類加載到容器里就完事了。怎么掃描的代碼就不再贅述了,實現(xiàn)Spring的生命周期函數(shù)BeanPostProcessor#postProcessAfterInitialization,在這里用AnnotationUtils判斷是否標(biāo)注了注解。
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class); if (annotation == null) { continue; }
標(biāo)注了后判斷如果是發(fā)布訂閱,進(jìn)入發(fā)布訂閱的實現(xiàn)類。
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS) @Service("redisPubSubProcessor") public class RedisPubSubProcessor extends MessageHubServiceImpl { ? ? @Resource ? ? RedisMessageListenerContainer redisPubSubContainer; ? ? @Override ? ? public void produce(ProducerAdapterForm producerAdapterForm) { ? ? ? ? stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage()); ? ? } ? ? @Override ? ? public void consume(ConsumerAdapterForm messageForm) { ? ? ? ? MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName()); ? ? ? ? adapter.afterPropertiesSet(); ? ? ? ? redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic())); ? ? } ? ? @Bean ? ? public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) { ? ? ? ? RedisMessageListenerContainer container = new RedisMessageListenerContainer(); ? ? ? ? container.setConnectionFactory(connectionFactory); ? ? ? ? return container; ? ? } }
首先先將RedisMessageListenerContainer注入到Spring容器里,produce方法只需要調(diào)用下現(xiàn)程的api。consume方法由于上一步我們獲取了bean和對應(yīng)的method,直接用MessageListenerAdapter的構(gòu)造器創(chuàng)建出監(jiān)聽器來,這里有個坑,需要手動調(diào)用adapter.afterPropertiesSet()設(shè)置一些必要的屬性,這個在常規(guī)寫法里框架幫忙做了。如果不調(diào)用的話會出一些空指針之類的bug。
隨后把監(jiān)聽器add到容器就實現(xiàn)了方法代理,背后的線程監(jiān)聽到數(shù)據(jù)會回調(diào)到標(biāo)注了 @MessageHub 的方法里
到此這篇關(guān)于springboot+redis自定義注解實現(xiàn)發(fā)布訂閱的實現(xiàn)代碼的文章就介紹到這了,更多相關(guān)springboot redis發(fā)布訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中String、StringBuffer和StringBuilder的區(qū)別
這篇文章主要介紹了Java中String、StringBuffer和StringBuilder的區(qū)別,StringBuilder與StringBuffer都繼承自AbstractStringBuilder類,在AbstractStringBuilder中也是使用字符數(shù)組保存字符串char[]value但是沒有final關(guān)鍵字修飾,所以這兩個可變,需要的朋友可以參考下2024-01-01Java實現(xiàn)根據(jù)前端所要格式返回樹形3級層級數(shù)據(jù)
這篇文章主要為大家詳細(xì)介紹了Java如何實現(xiàn)根據(jù)前端所要格式返回樹形3級層級數(shù)據(jù),文中的示例代碼講解詳細(xì),有需要的小伙伴可以了解下2024-02-02Java實現(xiàn)權(quán)重隨機(jī)算法詳解
平時,經(jīng)常會遇到權(quán)重隨機(jī)算法,從不同權(quán)重的N個元素中隨機(jī)選擇一個,并使得總體選擇結(jié)果是按照權(quán)重分布的。本文就詳細(xì)來介紹如何實現(xiàn),感興趣的可以了解一下2021-07-07Spring實戰(zhàn)之Bean定義中的SpEL表達(dá)式語言支持操作示例
這篇文章主要介紹了Spring實戰(zhàn)之Bean定義中的SpEL表達(dá)式語言支持操作,結(jié)合實例形式分析了Bean定義中的SpEL表達(dá)式語言操作步驟與實現(xiàn)技巧,需要的朋友可以參考下2019-12-12java Hibernate save()與persist()區(qū)別
本文章來給各位同學(xué)介紹一下Hibernate save()與persist()區(qū)別,希望此文章能對各位同學(xué)對于Hibernate save()與persist()有所理解2016-01-01java通過DelayQueue實現(xiàn)延時任務(wù)
本文主要介紹了java通過DelayQueue實現(xiàn)延時任務(wù),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07