SpringBoot集成消息隊(duì)列的項(xiàng)目實(shí)踐
背景
最近在對(duì)公司開(kāi)發(fā)框架進(jìn)行優(yōu)化,框架內(nèi)涉及到多處入庫(kù)的日志記錄,例如登錄日志/操作日志/訪問(wèn)日志/業(yè)務(wù)執(zhí)行日志,集成在業(yè)務(wù)代碼中耦合度較高且占用業(yè)務(wù)操作執(zhí)行時(shí)間,所以準(zhǔn)備集成相關(guān)消息隊(duì)列進(jìn)行代碼解耦
方案規(guī)劃
現(xiàn)有的成熟消息隊(duì)列組件非常多,例如RabbitMQ,ActiveMQ,Kafka等,考慮到業(yè)務(wù)并發(fā)量不高且框架已經(jīng)應(yīng)用于多個(gè)項(xiàng)目平穩(wěn)運(yùn)行,準(zhǔn)備提供基于Redis的消息隊(duì)列和集成ActiveMQ兩種方案,Redis消息隊(duì)列的好處是無(wú)需額外安裝部署存量項(xiàng)目可平穩(wěn)過(guò)度但消息無(wú)法持久化可能丟失,ActiveMQ解決方案成熟可以保證消息持久化但是需要實(shí)施人員額外掌握操作部署
統(tǒng)一設(shè)計(jì)
增加自定義配置指定消息隊(duì)列方式
system: #消息隊(duì)列方式 redis/activemq messageChannel: redis
定義消息傳輸統(tǒng)一模型
public class MessageModel { private Class<? extends IMessageReceiver> handleClazz; private String bodyContent; private Class bodyClass; private HashMap extraParam; public MessageModel(){ extraParam = new HashMap(); } public Class<? extends IMessageReceiver> getHandleClazz() { return handleClazz; } public void setHandleClazz(Class<? extends IMessageReceiver> handleClazz) { this.handleClazz = handleClazz; } public HashMap getExtraParam() { return extraParam; } public void setExtraParam(HashMap extraParam) { this.extraParam = extraParam; } public String getBodyContent() { return bodyContent; } public void setBodyContent(String bodyContent) { this.bodyContent = bodyContent; } public Class getBodyClass() { return bodyClass; } public void setBodyClass(Class bodyClass) { this.bodyClass = bodyClass; } }
定義標(biāo)準(zhǔn)消息處理接口
public interface IMessageReceiver { void handleMessage(Object bodyObject, HashMap extraParam); }
定義統(tǒng)一對(duì)外發(fā)送消息工具類
@Component public class MessageUtil { @Autowired private SystemConfig systemConfig; @Autowired private RedisUtil redisUtil; @Autowired private JmsMessagingTemplate jmsMessagingTemplate; public void sendMessage(Object messageBody, Class<? extends IMessageReceiver> handleClass, HashMap<String,Object> extraParam) { MessageModel messageModel = new MessageModel(); messageModel.setHandleClazz(handleClass); messageModel.setBodyClass(messageBody.getClass()); messageModel.setBodyContent(JSON.toJSONString(messageBody)); if (extraParam != null) { for (String key:extraParam.keySet()) { messageModel.getExtraParam().put(key,extraParam.get(key)); } } if(systemConfig.getMessageChannel().equals("redis")){ redisUtil.sendMessage("message", JSON.toJSON(messageModel)); }else{ jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel)); } } }
集成Redis消息隊(duì)列
pom配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.1.RELEASE</version> </dependency>
連接配置
spring: redis: host: localhost port: 6379 password:
操作工具類
@Autowired private RedisTemplate redisTemplate; public void sendMessage(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) public class RedisMessageReceiver { public void receiveMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊(cè)
@Configuration public class MessageCenter { @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個(gè) messageListener,配置不同的交換機(jī) container.addMessageListener(listenerAdapter, new PatternTopic("message")); return container; } /** * 消息監(jiān)聽(tīng)器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法 * * @param receiver * @return */ @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } }
集成ActiveMQ消息隊(duì)列
pom配置
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>
注意:jdk1.8對(duì)應(yīng)版本5.15.0
連接配置
spring: activemq: broker-url: tcp://127.0.0.1:61616 #MQ服務(wù)器地址 user: admin password: admin pool: enabled: true
消息處理
@Component @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public class ActiveMQMessageReceiver { @JmsListener(destination = "message", containerFactory = "customQueueListener") public void handleMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); } }
配置注冊(cè)
@Configuration @EnableJms public class MessageCenter { @Bean(name = "customQueueListener") @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); //重連間隔時(shí)間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); //連接數(shù) factory.setConcurrency("5-10"); //指定任務(wù)線程池 factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy())); return factory; } }
使用示例
消息處理
@Service public class RequestLogMessageReceiver implements IMessageReceiver{ @Autowired private F_RequestLogService requestLogService; @Override public void handleMessage(Object bodyObject, HashMap extraParam) { F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject; requestLogService.insert(requestLogDO); } }
發(fā)送消息
@AutoWired private MessageUtil messageUtil; messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);
到此這篇關(guān)于SpringBoot集成消息隊(duì)列的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)SpringBoot集成消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ消息隊(duì)列的完整步驟
- 詳解SpringBoot集成消息隊(duì)列的案例應(yīng)用
- Springboot?整合?RabbitMQ?消息隊(duì)列?詳情
- springboot整合消息隊(duì)列RabbitMQ
- SpringBoot整合消息隊(duì)列RabbitMQ
- SpringBoot基于RabbitMQ實(shí)現(xiàn)消息延時(shí)隊(duì)列的方案
- SpringBoot基于RabbitMQ實(shí)現(xiàn)消息延遲隊(duì)列方案及使用場(chǎng)景
- Springboot RabbitMQ 消息隊(duì)列使用示例詳解
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
java安全?ysoserial?CommonsCollections1示例解析
這篇文章主要介紹了java安全?ysoserial?CommonsCollections1示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10Java實(shí)現(xiàn)兩人五子棋游戲(二) 畫(huà)出棋盤(pán)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)兩人五子棋游戲,畫(huà)出五子棋的棋盤(pán),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03shiro與spring集成基礎(chǔ)Hello案例詳解
這篇文章主要介紹了shiro與spring集成基礎(chǔ)Hello案例詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11SpringBoot獲取Request對(duì)象的常見(jiàn)方法
HttpServletRequest 簡(jiǎn)稱 Request,它是一個(gè) Servlet API 提供的對(duì)象,用于獲取客戶端發(fā)起的 HTTP 請(qǐng)求信息,那么在SpringBoot中,獲取 Request對(duì)象的方法有哪些呢,本文小編將給大家講講SpringBoot獲取Request對(duì)象的常見(jiàn)方法2023-08-08Java實(shí)現(xiàn)發(fā)送短信驗(yàn)證碼功能
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)發(fā)送短信驗(yàn)證碼功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11