SpringBoot3集成RocketMq場景分析
RocketMQ因其架構簡單、業(yè)務功能豐富、具備極強可擴展性等特點被廣泛應用,比如金融業(yè)務、互聯(lián)網、大數據、物聯(lián)網等領域的業(yè)務場景;
標簽:RocketMq5.Dashboard;
一、簡介
RocketMQ因其架構簡單、業(yè)務功能豐富、具備極強可擴展性等特點被廣泛應用,比如金融業(yè)務、互聯(lián)網、大數據、物聯(lián)網等領域的業(yè)務場景;
二、環(huán)境部署
1、編譯打包
1、下載5.0版本源碼包 rocketmq-all-5.0.0-source-release.zip 2、解壓后進入目錄,編譯打包 mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
2、修改配置
在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
3、服務啟動
1、該目錄下 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/ 2、啟動NameServer sh mqnamesrv 輸出日志 The Name Server boot success. serializeType=JSON 3、啟動Broker+Proxy sh mqbroker -n localhost:9876 --enable-proxy 輸出日志 rocketmq-proxy startup successfully 4、關閉服務 sh mqshutdown namesrv Send shutdown request to mqnamesrv(18636) OK sh mqshutdown broker Send shutdown request to mqbroker with proxy enable OK(18647)
4、控制臺安裝
1、下載master源碼包 rocketmq-dashboard-master 2、解壓后進入目錄,編譯打包 mvn clean package -Dmaven.test.skip=true 3、啟動服務 java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar 4、輸出日志 INFO main - Tomcat started on port(s): 8080 (http) with context path '' 5、訪問服務:localhost:8080
三、工程搭建
1、工程結構
2、依賴管理
在 rocketmq-starter
組件中,實際上依賴的是 rocketmq-client
組件的 5.0
版本,由于兩個新版框架間的兼容問題,需要添加相關配置解決該問題;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-starter.version}</version> </dependency>
3、配置文件
配置RocketMq服務地址,消息生產者和消費者;
rocketmq: name-server: 127.0.0.1:9876 # 生產者 producer: group: boot_group_1 # 消息發(fā)送超時時間 send-message-timeout: 3000 # 消息最大長度4M max-message-size: 4096 # 消息發(fā)送失敗重試次數 retry-times-when-send-failed: 3 # 異步消息發(fā)送失敗重試次數 retry-times-when-send-async-failed: 2 # 消費者 consumer: group: boot_group_1 # 每次提取的最大消息數 pull-batch-size: 5
4、配置類
在配置類中主要定義兩個Bean的加載,即 RocketMQTemplate
和 DefaultMQProducer
,主要是提供消息發(fā)送的能力,即生產消息;
@Configuration public class RocketMqConfig { @Value("${rocketmq.name-server}") private String nameServer; @Value("${rocketmq.producer.group}") private String producerGroup; @Value("${rocketmq.producer.send-message-timeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.max-message-size}") private Integer maxMessageSize; @Value("${rocketmq.producer.retry-times-when-send-failed}") private Integer retryTimesWhenSendFailed ; @Value("${rocketmq.producer.retry-times-when-send-async-failed}") private Integer retryTimesWhenSendAsyncFailed ; @Bean public RocketMQTemplate rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); rocketMqTemplate.setProducer(defaultMqProducer()); return rocketMqTemplate; } @Bean public DefaultMQProducer defaultMqProducer() { DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr(this.nameServer); producer.setProducerGroup(this.producerGroup); producer.setSendMsgTimeout(this.sendMsgTimeout); producer.setMaxMessageSize(this.maxMessageSize); producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed); return producer; } }
四、基礎用法
1、消息生產
編寫一個生產者接口類,分別使用 RocketMQTemplate
和 DefaultMQProducer
實現消息發(fā)送的功能,然后可以通過 Dashboard
控制面板查看消息詳情;
@RestController public class ProducerWeb { private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class); @Autowired private RocketMQTemplate rocketMqTemplate; @GetMapping("/send/msg1") public String sendMsg1 (){ try { // 構建消息主體 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg")); // 發(fā)送消息 rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } @Autowired private DefaultMQProducer defaultMqProducer ; @GetMapping("/send/msg2") public String sendMsg2 (){ try { // 構建消息主體 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg")); // 構建消息對象 Message message = new Message(); message.setTopic("boot-mq-topic"); message.setTags("boot-mq-tag"); message.setKeys("boot-mq-key"); message.setBody(msgBody.getBytes()); // 發(fā)送消息,打印日志 SendResult sendResult = defaultMqProducer.send(message); log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } return "OK" ; } }
2、消息消費
編寫消息監(jiān)聽類,實現 RocketMQListener
接口,通過 RocketMQMessageListener
注解控制監(jiān)聽的具體信息;
@Service @RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic") public class ConsumerListener implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @Override public void onMessage(String message) { log.info("\n=====\n message:{} \n=====\n",message); } }
五、參考源碼
文檔倉庫: https://gitee.com/cicadasmile/butte-java-note
源碼倉庫: https://gitee.com/cicadasmile/butte-spring-parent
Gitee主頁: https://gitee.com/cicadasmile/butte-java-note
到此這篇關于SpringBoot3集成RocketMq的文章就介紹到這了,更多相關SpringBoot3集成RocketMq內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
maven插件assembly使用及springboot啟動腳本start.sh和停止腳本 stop.sh
這篇文章主要介紹了maven插件assembly使用及springboot啟動腳本start.sh和停止腳本 stop.sh的相關資料,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08教你如何監(jiān)控 Java 線程池運行狀態(tài)的操作(必看)
這篇文章主要介紹了教你如何監(jiān)控 Java 線程池運行狀態(tài)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02elasticsearch?java客戶端action的實現簡單分析
這篇文章主要為大家介紹了elasticsearch?java客戶端action的實現簡單分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04