SpringBoot整合RocketMQ的詳細過程
今天我們來討論如何在項目開發(fā)中優(yōu)雅地使用RocketMQ。本文分為三部分,第一部分實現(xiàn)SpringBoot與RocketMQ的整合,第二部分解決在使用RocketMQ過程中可能遇到的一些問題并解決他們,第三部分介紹如何封裝RocketMQ以便更好地使用。
1. SpringBoot整合RocketMQ
在SpringBoot中集成RocketMQ,只需要簡單四步:
1.引入相關(guān)依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
2.添加RocketMQ的相關(guān)配置
rocketmq: consumer: group: springboot_consumer_group # 一次拉取消息最大值,注意是拉取消息的最大值而非消費最大值 pull-batch-size: 10 name-server: 10.5.103.6:9876 producer: # 發(fā)送同一類消息的設(shè)置為同一個group,保證唯一 group: springboot_producer_group # 發(fā)送消息超時時間,默認3000 sendMessageTimeout: 10000 # 發(fā)送消息失敗重試次數(shù),默認2 retryTimesWhenSendFailed: 2 # 異步消息重試此處,默認2 retryTimesWhenSendAsyncFailed: 2 # 消息最大長度,默認1024 * 1024 * 4(默認4M) maxMessageSize: 4096 # 壓縮消息閾值,默認4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在內(nèi)部發(fā)送失敗時重試另一個broker,默認false retryNextServer: false
3.使用提供的模板工具類RocketMQTemplate發(fā)送消息
@RestController public class NormalProduceController { @Setter(onMethod_ = @Autowired) private RocketMQTemplate rocketmqTemplate; @GetMapping("/test") public SendResult test() { Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build(); SendResult sendResult = rocketmqTemplate.send(topic, msg); } }
4.實現(xiàn)RocketMQListener接口消費消息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name") public class MyConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { // 處理消息的邏輯 System.out.println("Received message: " + message); } }
以上4步即可實現(xiàn)SpringBoot與RocketMQ的整合,這部分屬于基礎(chǔ)知識,不做過多說明。
2 使用RocketMQ會遇到的問題
以下是一些在SpringBoot中使用RocketMQ時常遇到的問題,現(xiàn)在為您逐一解決。
2.1 WARN No appenders could be found for logger
啟動項目時會在日志中看到如下告警
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
此時我們只需要在啟動類中設(shè)置環(huán)境變量 rocketmq.client.logUseSlf4j
為 true 明確指定RocketMQ的日志框架
@SpringBootApplication public class RocketDemoApplication { public static void main(String[] args) { /* * 指定使用的日志框架,否則將會告警 * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap). * RocketMQLog:WARN Please initialize the logger system properly. */ System.setProperty("rocketmq.client.logUseSlf4j", "true"); SpringApplication.run(RocketDemoApplication.class, args); } }
同時還得在配置文件中調(diào)整日志級別,不然在控制臺會一直看到broker的日志信息
logging:
level:
RocketmqClient: ERROR
io:
netty: ERROR
2.2 不支持LocalDate 和 LocalDateTime
在使用Java8后經(jīng)常會使用LocalDate/LocalDateTime
這兩個時間類型字段,然而RocketMQ原始配置并不支持Java時間類型,當我們發(fā)送的實體消息中包含上述兩個字段時,消費端在消費時會出現(xiàn)如下所示的錯誤。
比如生產(chǎn)者的代碼如下:
@GetMapping("/test") public void test(){ //普通消息無返回值,只負責發(fā)送消息?不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā)。 RocketMessage rocketMessage = RocketMessage.builder(). id(1111L). message("hello,world") .localDate(LocalDate.now()) .localDateTime(LocalDateTime.now()) .build(); rocketmqTemplate.convertAndSend(destination,rocketMessage); }
消費者的代碼如下:
@Component @RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic") public class RocketMQConsumer implements RocketMQListener<RocketMessage> { @Override public void onMessage(RocketMessage message) { System.out.println("消費消息-" + message); } }
消費者開始消費時會出現(xiàn)類型轉(zhuǎn)換異常錯誤Cannot construct instance of java.time.LocalDate
,錯誤詳情如下:
原因:RocketMQ內(nèi)置使用的轉(zhuǎn)換器是RocketMQMessageConverter,轉(zhuǎn)換Json時使用的是MappingJackson2MessageConverter,但是這個轉(zhuǎn)換器不支持時間類型。
解決辦法:需要自定義消息轉(zhuǎn)換器,將MappingJackson2MessageConverter進行替換,并添加支持時間模塊
@Configuration public class RocketMQEnhanceConfig { /** * 解決RocketMQ Jackson不支持Java時間類型配置 * 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} */ @Bean @Primary public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ RocketMQMessageConverter converter = new RocketMQMessageConverter(); CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); for (MessageConverter messageConverter : messageConverterList) { if(messageConverter instanceof MappingJackson2MessageConverter){ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); objectMapper.registerModules(new JavaTimeModule()); } } return converter; } }
2.3 RockeMQ環(huán)境隔離
在使用RocketMQ時,通常會在代碼中直接指定消息主題(topic),而且開發(fā)環(huán)境和測試環(huán)境可能共用一個RocketMQ環(huán)境。如果沒有進行處理,在開發(fā)環(huán)境發(fā)送的消息就可能被測試環(huán)境的消費者消費,測試環(huán)境發(fā)送的消息也可能被開發(fā)環(huán)境的消費者消費,從而導致數(shù)據(jù)混亂的問題。
為了解決這個問題,我們可以根據(jù)不同的環(huán)境實現(xiàn)自動隔離。通過簡單配置一個選項,如dev、test、prod等不同環(huán)境,所有的消息都會被自動隔離。例如,當發(fā)送的消息主題為consumer_topic
時,可以自動在topic后面加上環(huán)境后綴,如consumer_topic_dev
。
那么,我們該如何實現(xiàn)呢?
可以編寫一個配置類實現(xiàn)BeanPostProcessor,并重寫postProcessBeforeInitialization方法,在監(jiān)聽器實例初始化前修改對應(yīng)的topic。
- BeanPostProcessor是Spring框架中的一個接口,它的作用是在Spring容器實例化、配置完bean之后,在bean初始化前后進行一些額外的處理工作。
- 具體來說,BeanPostProcessor接口定義了兩個方法:
- postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前進行處理,可以對bean做一些修改等操作。
- postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后進行處理,可以進行一些清理或者其他操作。
- BeanPostProcessor可以在應(yīng)用程序中對Bean的創(chuàng)建和初始化過程進行攔截和修改,對Bean的生命周期進行干預和操作。它可以對所有的Bean類實例進行增強處理,使得開發(fā)人員可以在Bean初始化前后自定義一些操作,從而實現(xiàn)自己的業(yè)務(wù)需求。比如,可以通過BeanPostProcessor來實現(xiàn)注入某些必要的屬性值、加入某一個對象等等。
實現(xiàn)方案如下:
1.在配置文件中增加相關(guān)配置
rocketmq: enhance: # 啟動隔離,用于激活配置類EnvironmentIsolationConfig # 啟動后會自動在topic上拼接激活的配置文件,達到自動隔離的效果 enabledIsolation: true # 隔離環(huán)境名稱,拼接到topic后,topic_dev,默認空字符串 environment: dev
2.新增配置類,在實例化消息監(jiān)聽者之前把topic修改掉
@Configuration public class EnvironmentIsolationConfig implements BeanPostProcessor { @Value("${rocketmq.enhance.enabledIsolation:true}") private boolean enabledIsolation; @Value("${rocketmq.enhance.environment:''}") private String environmentName; /** * 在裝載Bean之前實現(xiàn)參數(shù)修改 */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof DefaultRocketMQListenerContainer){ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; //拼接Topic if(enabledIsolation && StringUtils.hasText(environmentName)){ container.setTopic(String.join("_", container.getTopic(),environmentName)); } return container; } return bean; } }
啟動項目可以看到日志中消息監(jiān)聽的隊列已經(jīng)被修改了
2023-03-23 17:04:59.726 [main] INFO o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
3. RocketMQ二次封裝
在解釋為什么要二次封裝之前先來看看RocketMQ官方文檔中推薦的最佳實踐
- 消息發(fā)送成功或者失敗要打印消息日志,用于業(yè)務(wù)排查問題。
- 如果消息量較少,建議在消費入口方法打印消息,消費耗時等,方便后續(xù)排查問題。
- RocketMQ 無法避免消息重復(Exactly-Once),所以如果業(yè)務(wù)對消費重復非常敏感,務(wù)必要在業(yè)務(wù)層面進行去重處理。可以借助關(guān)系數(shù)據(jù)庫進行去重。首先需要確定消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標識字段,例如訂單Id等。
上面三個步驟基本每次發(fā)送消息或者消費消息都要實現(xiàn),屬于重復動作。
接下來討論的是在RocketMQ中發(fā)送消息時選擇何種消息類型最為合適。
在RocketMQ中有四種可選格式:
- 發(fā)送Json對象
- 發(fā)送轉(zhuǎn)Json后的String對象
- 根據(jù)業(yè)務(wù)封裝對應(yīng)實體類
- 直接使用原生MessageExt接收。
對于如何選擇消息類型,需要考慮到消費者在不查看消息發(fā)送者的情況下,如何獲取消息的含義。因此,在這種情況下,使用第三種方式即根據(jù)業(yè)務(wù)封裝對應(yīng)實體類的方式最為合適,也是大多數(shù)開發(fā)者在發(fā)送消息時的常用方式。
有了上面兩點結(jié)論以后我們來看看為什么要對RocketMQ二次封裝。
3.1 為什么要二次封裝
按照上述最佳實踐,一個完整的消息傳遞鏈路從生產(chǎn)到消費應(yīng)包括 準備消息、發(fā)送消息、記錄消息日志、處理發(fā)送失敗、記錄接收消息日志、處理業(yè)務(wù)邏輯、異常處理和異常重試 等步驟。
雖然使用原生RocketMQ可以完成這些動作,但每個生產(chǎn)者和消費者都需要編寫大量重復的代碼來完成相同的任務(wù),這就是需要進行二次封裝的原因。我們希望通過二次封裝,**生產(chǎn)者只需準備好消息實體并調(diào)用封裝后的工具類發(fā)送,而消費者只需處理核心業(yè)務(wù)邏輯,其他公共邏輯會得到統(tǒng)一處理。 **
在二次封裝中,關(guān)鍵是找出框架在日常使用中所涵蓋的許多操作,以及區(qū)分哪些操作是可變的,哪些是不變的。以上述例子為例,實際上只有生產(chǎn)者的消息準備和消費者的業(yè)務(wù)處理是可變的操作,需要根據(jù)需求進行處理,而其他步驟可以固定下來形成一個模板。
當然,本文提到的二次封裝不是指對源代碼進行封裝,而是針對工具的原始使用方式進行的封裝??梢詫⑵渑cMybatis和Mybatis-plus區(qū)分開來。這兩者都能完成任務(wù),只不過Mybatis-plus更為簡單便捷。
3.2 實現(xiàn)二次封裝
實現(xiàn)二次封裝需要創(chuàng)建一個自定義的starter,這樣其他項目只需要依賴此starter即可使用封裝功能。同時,在自定義starter中還需要解決文章第二部分中提到的一些問題。
代碼結(jié)構(gòu)如下所示:
3.2.1 消息實體類的封裝
/** * 消息實體,所有消息都需要繼承此類 * 公眾號:JAVA日知錄 */ @Data public abstract class BaseMessage { /** * 業(yè)務(wù)鍵,用于RocketMQ控制臺查看消費情況 */ protected String key; /** * 發(fā)送消息來源,用于排查問題 */ protected String source = ""; /** * 發(fā)送時間 */ protected LocalDateTime sendTime = LocalDateTime.now(); /** * 重試次數(shù),用于判斷重試次數(shù),超過重試次數(shù)發(fā)送異常警告 */ protected Integer retryTimes = 0; }
后面所有發(fā)送的消息實體都需要繼承此實體類。
3.2.2 消息發(fā)送工具類的封裝
@Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; @Resource private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { return template; } /** * 根據(jù)系統(tǒng)上下文自動構(gòu)建隔離后的topic * 構(gòu)建目的地 */ public String buildDestination(String topic, String tag) { topic = reBuildTopic(topic); return topic + ":" + tag; } /** * 根據(jù)環(huán)境重新隔離topic * @param topic 原始topic */ private String reBuildTopic(String topic) { if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ return topic +"_" + rocketEnhanceProperties.getEnvironment(); } return topic; } /** * 發(fā)送同步消息 */ public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(buildDestination(topic,tag), message); } public <T extends BaseMessage> SendResult send(String destination, T message) { // 設(shè)置業(yè)務(wù)鍵,此處根據(jù)公共的參數(shù)進行處理 // 更多的其它基礎(chǔ)業(yè)務(wù)處理... Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此處為了方便查看給日志轉(zhuǎn)了json,根據(jù)選擇選擇日志記錄方式,例如ELK采集 log.info("[{}]同步消息[{}]發(fā)送結(jié)果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 發(fā)送延遲消息 */ public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { return send(buildDestination(topic,tag), message, delayLevel); } public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); log.info("[{}]延遲等級[{}]消息[{}]發(fā)送結(jié)果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } }
這里封裝了一個消息發(fā)送類,實現(xiàn)了日志記錄以及自動重建topic的功能(即生產(chǎn)者實現(xiàn)環(huán)境隔離),后面項目中只需要注入RocketMQEnhanceTemplate來實現(xiàn)消息的發(fā)送。
3.2.3 消費者的封裝
@Slf4j public abstract class EnhanceMessageHandler<T extends BaseMessage> { /** * 默認重試次數(shù) */ private static final int MAX_RETRY_TIMES = 3; /** * 延時等級 */ private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND; @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; /** * 消息處理 * * @param message 待處理消息 * @throws Exception 消費異常 */ protected abstract void handleMessage(T message) throws Exception; /** * 超過重試次數(shù)消息,需要啟用isRetry * * @param message 待處理消息 */ protected abstract void handleMaxRetriesExceeded(T message); /** * 是否需要根據(jù)業(yè)務(wù)規(guī)則過濾消息,去重邏輯可以在此處處理 * @param message 待處理消息 * @return true: 本次消息被過濾,false:不過濾 */ protected boolean filter(T message) { return false; } /** * 是否異常時重復發(fā)送 * * @return true: 消息重試,false:不重試 */ protected abstract boolean isRetry(); /** * 消費異常時是否拋出異常 * 返回true,則由rocketmq機制自動重試 * false:消費異常(如果沒有開啟重試則消息會被自動ack) */ protected abstract boolean throwException(); /** * 最大重試次數(shù) * * @return 最大重試次數(shù),默認5次 */ protected int getMaxRetryTimes() { return MAX_RETRY_TIMES; } /** * isRetry開啟時,重新入隊延遲時間 * @return -1:立即入隊重試 */ protected int getDelayLevel() { return DELAY_LEVEL; } /** * 使用模板模式構(gòu)建消息消費框架,可自由擴展或刪減 */ public void dispatchMessage(T message) { // 基礎(chǔ)日志記錄被父類處理了 log.info("消費者收到消息[{}]", JSONObject.toJSON(message)); if (filter(message)) { log.info("消息id{}不滿足消費條件,已過濾。",message.getKey()); return; } // 超過最大重試次數(shù)時調(diào)用子類方法處理 if (message.getRetryTimes() > getMaxRetryTimes()) { handleMaxRetriesExceeded(message); return; } try { long now = System.currentTimeMillis(); handleMessage(message); long costTime = System.currentTimeMillis() - now; log.info("消息{}消費成功,耗時[{}ms]", message.getKey(),costTime); } catch (Exception e) { log.error("消息{}消費異常", message.getKey(),e); // 是捕獲異常還是拋出,由子類決定 if (throwException()) { //拋出異常,由DefaultMessageListenerConcurrently類處理 throw new RuntimeException(e); } //此時如果不開啟重試機制,則默認ACK了 if (isRetry()) { handleRetry(message); } } } protected void handleRetry(T message) { // 獲取子類RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); if (annotation == null) { return; } //重新構(gòu)建消息體 String messageSource = message.getSource(); if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource); } message.setRetryTimes(message.getRetryTimes() + 1); SendResult sendResult; try { // 如果消息發(fā)送不成功,則再次重新發(fā)送,如果發(fā)送異常則拋出由MQ再次處理(異常時不走延遲消息) sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel()); } catch (Exception ex) { // 此處捕獲之后,相當于此條消息被消息完成然后重新發(fā)送新的消息 //由生產(chǎn)者直接發(fā)送 throw new RuntimeException(ex); } // 發(fā)送失敗的處理就是不進行ACK,由RocketMQ重試 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { throw new RuntimeException("重試消息發(fā)送失敗"); } } }
使用模版設(shè)計模式定義了消息消費的骨架,實現(xiàn)了日志打印,異常處理,異常重試等公共邏輯,消息過濾(查重)、業(yè)務(wù)處理則交由子類實現(xiàn)。
3.2.4 基礎(chǔ)配置類
@Configuration @EnableConfigurationProperties(RocketEnhanceProperties.class) public class RocketMQEnhanceAutoConfiguration { /** * 注入增強的RocketMQEnhanceTemplate */ @Bean public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){ return new RocketMQEnhanceTemplate(rocketMQTemplate); } /** * 解決RocketMQ Jackson不支持Java時間類型配置 * 源碼參考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} */ @Bean @Primary public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ RocketMQMessageConverter converter = new RocketMQMessageConverter(); CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); for (MessageConverter messageConverter : messageConverterList) { if(messageConverter instanceof MappingJackson2MessageConverter){ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); objectMapper.registerModules(new JavaTimeModule()); } } return converter; } /** * 環(huán)境隔離配置 */ @Bean @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true") public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){ return new EnvironmentIsolationConfig(rocketEnhanceProperties); } }
public class EnvironmentIsolationConfig implements BeanPostProcessor { private RocketEnhanceProperties rocketEnhanceProperties; public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) { this.rocketEnhanceProperties = rocketEnhanceProperties; } /** * 在裝載Bean之前實現(xiàn)參數(shù)修改 */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof DefaultRocketMQListenerContainer){ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment())); } return container; } return bean; } }
@ConfigurationProperties(prefix = "rocketmq.enhance") @Data public class RocketEnhanceProperties { private boolean enabledIsolation; private String environment; }
3.3 封裝后的使用
3.3.1 引入依賴
<dependency> <groupId>com.jianzh5</groupId> <artifactId>cloud-rocket-starter</artifactId> </dependency>
3.3.2 自定義配置
rocketmq: ... enhance: # 啟動隔離,用于激活配置類EnvironmentIsolationConfig # 啟動后會自動在topic上拼接激活的配置文件,達到自動隔離的效果 enabledIsolation: true # 隔離環(huán)境名稱,拼接到topic后,topic_dev,默認空字符串 environment: dev
3.3.3 發(fā)送消息
@RestController @RequestMapping("enhance") @Slf4j public class EnhanceProduceController { //注入增強后的模板,可以自動實現(xiàn)環(huán)境隔離,日志記錄 @Setter(onMethod_ = @Autowired) private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; private static final String topic = "rocket_enhance"; private static final String tag = "member"; /** * 發(fā)送實體消息 */ @GetMapping("/member") public SendResult member() { String key = UUID.randomUUID().toString(); MemberMessage message = new MemberMessage(); // 設(shè)置業(yè)務(wù)key message.setKey(key); // 設(shè)置消息來源,便于查詢 message.setSource("MEMBER"); // 業(yè)務(wù)消息內(nèi)容 message.setUserName("Java日知錄"); message.setBirthday(LocalDate.now()); return rocketMQEnhanceTemplate.send(topic, tag, message); } }
注意這里使用的是封裝后的模板工具類,一旦在配置文件中啟動環(huán)境隔離,則生產(chǎn)者的消息也自動發(fā)送到隔離后的topic中。
3.3.4 消費者
@Slf4j @Component @RocketMQMessageListener( consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", consumeThreadMax = 5 //默認是64個線程并發(fā)消息,配置 consumeThreadMax 參數(shù)指定并發(fā)消費線程數(shù),避免太大導致資源不夠 ) public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> { @Override protected void handleMessage(MemberMessage message) throws Exception { // 此時這里才是最終的業(yè)務(wù)處理,代碼只需要處理資源類關(guān)閉異常,其他的可以交給父類重試 System.out.println("業(yè)務(wù)消息處理:"+message.getUserName()); } @Override protected void handleMaxRetriesExceeded(MemberMessage message) { // 當超過指定重試次數(shù)消息時此處方法會被調(diào)用 // 生產(chǎn)中可以進行回退或其他業(yè)務(wù)操作 log.error("消息消費失敗,請執(zhí)行后續(xù)處理"); } /** * 是否執(zhí)行重試機制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否拋出異常,false搭配retry自行處理異常 return false; } @Override protected boolean filter() { // 消息過濾 return false; } /** * 監(jiān)聽消費消息,不需要執(zhí)行業(yè)務(wù)處理,委派給父類做基礎(chǔ)操作,父類做完基礎(chǔ)操作后會調(diào)用子類的實際處理類型 */ @Override public void onMessage(MemberMessage memberMessage) { super.dispatchMessage(memberMessage); } }
為了方便消費者對RocketMQ中的消息進行處理,我們可以使用EnhanceMessageHandler來進行消息的處理和邏輯的處理。
消費者實現(xiàn)了RocketMQListener的同時,可以繼承EnhanceMessageHandler來進行公共邏輯的處理,而核心業(yè)務(wù)邏輯需要自己實現(xiàn)handleMessage
方法。 如果需要對消息進行過濾或者去重的處理,則可以重寫父類的filter方法進行實現(xiàn)。這樣可以更加方便地對消息進行處理,減輕開發(fā)者的工作量。
到此這篇關(guān)于SpringBoot整合RocketMQ的詳細過程的文章就介紹到這了,更多相關(guān)SpringBoot整合RocketMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringCloud新一代網(wǎng)關(guān)Gateway
SpringCloud Gateway是Spring Cloud的一個全新項目,Spring 5.0+ Spring Boot 2.0和Project Reactor等技術(shù)開發(fā)的網(wǎng)關(guān),它旨在為微服務(wù)架構(gòu)提供一種簡單有效的統(tǒng)一的API路由管理方式2021-06-06使用ShardingSphere-Proxy實現(xiàn)分表分庫
這篇文章介紹了使用ShardingSphere-Proxy實現(xiàn)分表分庫的方法,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-02-02SpringMVC五大組件與執(zhí)行原理分析總結(jié)
這篇文章主要介紹了SpringMVC五大組件與執(zhí)行原理分析總結(jié),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2023-01-01永久解決 Intellij idea 報錯:Error :java 不支持發(fā)行版本5的問題
這篇文章主要介紹了永久解決 Intellij idea 報錯:Error :java 不支持發(fā)行版本5的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-02-02Java編程實現(xiàn)提取文章中關(guān)鍵字的方法
這篇文章主要介紹了Java編程實現(xiàn)提取文章中關(guān)鍵字的方法,較為詳細的分析了Java提取文章關(guān)鍵字的原理與具體實現(xiàn)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-11-11