Spring Boot 3 集成 RabbitMQ 實(shí)踐指南(原理解析)
Spring Boot 3 集成 RabbitMQ 實(shí)踐指南
1. RabbitMQ 核心原理
1.1 什么是RabbitMQ
RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,使用Erlang語言開發(fā),基于AMQP(Advanced Message Queuing Protocol)協(xié)議實(shí)現(xiàn)。它支持多種消息傳遞模式,具有高可用性、可擴(kuò)展性和可靠性等特點(diǎn)。
1.2 核心概念
1.2.1 基礎(chǔ)組件
Producer(生產(chǎn)者)
- 消息的發(fā)送者
- 負(fù)責(zé)創(chuàng)建消息并發(fā)布到RabbitMQ中
Consumer(消費(fèi)者)
- 消息的接收者
- 連接到RabbitMQ服務(wù)器并訂閱隊(duì)列
Exchange(交換機(jī))
- 接收生產(chǎn)者發(fā)送的消息并根據(jù)路由規(guī)則轉(zhuǎn)發(fā)到隊(duì)列
- 類型:
- Direct Exchange:根據(jù)routing key精確匹配
- Topic Exchange:根據(jù)routing key模式匹配
- Fanout Exchange:廣播到所有綁定隊(duì)列
- Headers Exchange:根據(jù)消息屬性匹配
Queue(隊(duì)列)
- 消息存儲(chǔ)的地方
- 支持持久化、臨時(shí)、自動(dòng)刪除等特性
Binding(綁定)
- 交換機(jī)和隊(duì)列之間的虛擬連接
- 定義消息路由規(guī)則
1.2.2 高級(jí)特性
消息持久化
- 交換機(jī)持久化:創(chuàng)建時(shí)設(shè)置durable=true
- 隊(duì)列持久化:創(chuàng)建時(shí)設(shè)置durable=true
- 消息持久化:設(shè)置delivery-mode=2
消息確認(rèn)機(jī)制
- 生產(chǎn)者確認(rèn):Publisher Confirm和Return機(jī)制
- 消費(fèi)者確認(rèn):自動(dòng)確認(rèn)、手動(dòng)確認(rèn)、批量確認(rèn)
死信隊(duì)列(DLX)
- 消息被拒絕且不重新入隊(duì)
- 消息過期(TTL)
- 隊(duì)列達(dá)到最大長度
1.3 應(yīng)用場(chǎng)景
異步處理
- 發(fā)送郵件、短信通知
- 日志處理、報(bào)表生成
- 文件處理、圖片處理
應(yīng)用解耦
- 系統(tǒng)間通信
- 服務(wù)解耦
- 流程分離
流量控制
- 削峰填谷
- 請(qǐng)求緩沖
- 流量整形
定時(shí)任務(wù)
- 延遲隊(duì)列
- 定時(shí)處理
- 任務(wù)調(diào)度
2. 環(huán)境搭建
2.1 基礎(chǔ)環(huán)境
- Spring Boot: 3.x
- Java: 17+
- RabbitMQ: 3.12+
- Maven/Gradle
2.2 依賴配置
<dependencies> <!-- Spring Boot Starter AMQP --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- Jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>
2.3 基礎(chǔ)配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / # 消息確認(rèn)配置 publisher-confirm-type: correlated # 開啟發(fā)布確認(rèn) publisher-returns: true # 開啟發(fā)布返回 template: mandatory: true # 消息路由失敗返回 # 消費(fèi)者配置 listener: simple: acknowledge-mode: manual # 手動(dòng)確認(rèn) prefetch: 1 # 每次獲取消息數(shù)量 retry: enabled: true # 開啟重試 initial-interval: 1000 # 重試間隔時(shí)間 max-attempts: 3 # 最大重試次數(shù) multiplier: 1.0 # 重試時(shí)間乘數(shù) # SSL配置(可選) ssl: enabled: false key-store: classpath:keystore.p12 key-store-password: password trust-store: classpath:truststore.p12 trust-store-password: password
3. 核心配置類
3.1 RabbitMQ配置類
@Configuration @EnableRabbit public class RabbitMQConfig { // 交換機(jī)名稱 public static final String BUSINESS_EXCHANGE = "business.exchange"; public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange"; // 隊(duì)列名稱 public static final String BUSINESS_QUEUE = "business.queue"; public static final String DEAD_LETTER_QUEUE = "dead.letter.queue"; // 路由鍵 public static final String BUSINESS_KEY = "business.key"; public static final String DEAD_LETTER_KEY = "dead.letter.key"; // 業(yè)務(wù)交換機(jī) @Bean public DirectExchange businessExchange() { return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE) .durable(true) .build(); } // 死信交換機(jī) @Bean public DirectExchange deadLetterExchange() { return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE) .durable(true) .build(); } // 業(yè)務(wù)隊(duì)列 @Bean public Queue businessQueue() { Map<String, Object> args = new HashMap<>(3); // 消息過期時(shí)間 args.put("x-message-ttl", 60000); // 隊(duì)列最大長度 args.put("x-max-length", 1000); // 死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY); return QueueBuilder.durable(BUSINESS_QUEUE) .withArguments(args) .build(); } // 死信隊(duì)列 @Bean public Queue deadLetterQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } // 業(yè)務(wù)綁定 @Bean public Binding businessBinding() { return BindingBuilder.bind(businessQueue()) .to(businessExchange()) .with(BUSINESS_KEY); } // 死信綁定 @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with(DEAD_LETTER_KEY); } // 消息轉(zhuǎn)換器 @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // RabbitTemplate配置 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } }
3.2 消息確認(rèn)配置
@Configuration @Slf4j public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息發(fā)送到交換機(jī)成功: correlationData={}", correlationData); } else { log.error("消息發(fā)送到交換機(jī)失敗: correlationData={}, cause={}", correlationData, cause); // 處理失敗邏輯,如重試、告警等 } } @Override public void returnedMessage(ReturnedMessage returned) { log.error("消息路由到隊(duì)列失敗: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), new String(returned.getMessage().getBody())); // 處理失敗邏輯,如重試、告警等 } }
4. 消息生產(chǎn)者
4.1 消息發(fā)送服務(wù)
@Service @Slf4j public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(Object message, String exchange, String routingKey) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); try { rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); log.info("消息發(fā)送成功: message={}, exchange={}, routingKey={}, correlationData={}", message, exchange, routingKey, correlationData); } catch (Exception e) { log.error("消息發(fā)送異常: message={}, exchange={}, routingKey={}, correlationData={}, error={}", message, exchange, routingKey, correlationData, e.getMessage()); throw new RuntimeException("消息發(fā)送失敗", e); } } public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor messagePostProcessor = msg -> { msg.getMessageProperties().setDelay((int) delayMillis); return msg; }; try { rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData); log.info("延遲消息發(fā)送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}", message, exchange, routingKey, delayMillis, correlationData); } catch (Exception e) { log.error("延遲消息發(fā)送異常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}", message, exchange, routingKey, delayMillis, correlationData, e.getMessage()); throw new RuntimeException("延遲消息發(fā)送失敗", e); } } }
5. 消息消費(fèi)者
5.1 消息處理服務(wù)
@Service @Slf4j public class MessageConsumer { @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE) public void handleMessage(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 獲取消息內(nèi)容 String messageBody = new String(message.getBody()); log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag); // 業(yè)務(wù)處理 processMessage(messageBody); // 手動(dòng)確認(rèn)消息 channel.basicAck(deliveryTag, false); log.info("消息處理成功: deliveryTag={}", deliveryTag); } catch (Exception e) { log.error("消息處理異常: deliveryTag={}, error={}", deliveryTag, e.getMessage()); // 判斷是否重新投遞 if (message.getMessageProperties().getRedelivered()) { log.error("消息已重試,拒絕消息: deliveryTag={}", deliveryTag); channel.basicReject(deliveryTag, false); } else { log.info("消息首次處理失敗,重新投遞: deliveryTag={}", deliveryTag); channel.basicNack(deliveryTag, false, true); } } } private void processMessage(String message) { // 實(shí)現(xiàn)具體的業(yè)務(wù)邏輯 log.info("處理消息: {}", message); } }
5.2 死信消息處理
@Service @Slf4j public class DeadLetterConsumer { @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE) public void handleDeadLetter(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { String messageBody = new String(message.getBody()); log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag); // 死信消息處理邏輯 processDeadLetter(messageBody); channel.basicAck(deliveryTag, false); log.info("死信消息處理成功: deliveryTag={}", deliveryTag); } catch (Exception e) { log.error("死信消息處理異常: deliveryTag={}, error={}", deliveryTag, e.getMessage()); channel.basicReject(deliveryTag, false); } } private void processDeadLetter(String message) { // 實(shí)現(xiàn)死信消息處理邏輯 log.info("處理死信消息: {}", message); } }
6. 接口控制器
@RestController @RequestMapping("/api/mq") @Slf4j public class MessageController { @Autowired private MessageProducer messageProducer; @PostMapping("/send") public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) { try { messageProducer.sendMessage(message.getContent(), RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.BUSINESS_KEY); return ResponseEntity.ok("消息發(fā)送成功"); } catch (Exception e) { log.error("消息發(fā)送失敗: {}", e.getMessage()); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("消息發(fā)送失敗: " + e.getMessage()); } } @PostMapping("/send/delay") public ResponseEntity<String> sendDelayMessage( @RequestBody MessageDTO message, @RequestParam long delayMillis) { try { messageProducer.sendDelayMessage(message.getContent(), RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.BUSINESS_KEY, delayMillis); return ResponseEntity.ok("延遲消息發(fā)送成功"); } catch (Exception e) { log.error("延遲消息發(fā)送失敗: {}", e.getMessage()); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("延遲消息發(fā)送失敗: " + e.getMessage()); } } }
7. 監(jiān)控與運(yùn)維
7.1 RabbitMQ管理界面
- 訪問地址:http://localhost:15672
- 默認(rèn)賬號(hào):guest/guest
- 主要功能:
- 隊(duì)列監(jiān)控
- 交換機(jī)管理
- 連接狀態(tài)
- 消息追蹤
7.2 Prometheus + Grafana監(jiān)控
# prometheus.yml scrape_configs: - job_name: 'rabbitmq' static_configs: - targets: ['localhost:15692']
7.3 日志配置
logging: level: org.springframework.amqp: INFO com.your.package: DEBUG pattern: console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7.4 告警配置
@Configuration public class RabbitMQAlertConfig { @Value("${alert.dingtalk.webhook}") private String webhookUrl; @Bean public AlertService alertService() { return new DingTalkAlertService(webhookUrl); } }
8. 最佳實(shí)踐
8.1 消息冪等性處理
@Service public class MessageIdempotentHandler { @Autowired private RedisTemplate<String, String> redisTemplate; public boolean isProcessed(String messageId) { String key = "mq:processed:" + messageId; return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS)); } }
8.2 消息重試策略
@Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(backOffPolicy); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; } }
8.3 消息序列化
@Configuration public class MessageConverterConfig { @Bean public MessageConverter jsonMessageConverter() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); converter.setCreateMessageIds(true); return converter; } }
8.4 消息追蹤
@Aspect @Component @Slf4j public class MessageTraceAspect { @Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)") public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable { String messageId = MDC.get("messageId"); log.info("開始處理消息: messageId={}", messageId); try { Object result = joinPoint.proceed(); log.info("消息處理完成: messageId={}", messageId); return result; } catch (Exception e) { log.error("消息處理異常: messageId={}, error={}", messageId, e.getMessage()); throw e; } } }
9. 常見問題與解決方案
9.1 消息丟失問題
- 生產(chǎn)者確認(rèn)機(jī)制
- 消息持久化
- 手動(dòng)確認(rèn)模式
- 集群高可用
9.2 消息重復(fù)消費(fèi)
- 冪等性處理
- 消息去重
- 業(yè)務(wù)檢查
9.3 消息堆積問題
- 增加消費(fèi)者數(shù)量
- 提高處理效率
- 隊(duì)列分片
- 死信隊(duì)列處理
9.4 性能優(yōu)化
- 合理設(shè)置預(yù)取數(shù)量
- 批量確認(rèn)消息
- 消息壓縮
- 連接池優(yōu)化
10. 高可用部署
10.1 集群配置
spring: rabbitmq: addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672 username: admin password: password virtual-host: /
10.2 鏡像隊(duì)列
# 設(shè)置鏡像策略 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
10.3 負(fù)載均衡
# nginx.conf upstream rabbitmq_cluster { server rabbit1:15672; server rabbit2:15672; server rabbit3:15672; }
11. 參考資源
到此這篇關(guān)于Spring Boot 3 集成 RabbitMQ 實(shí)踐指南的文章就介紹到這了,更多相關(guān)Spring Boot 3 集成 RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實(shí)現(xiàn)Topic主題模式(推薦)
- Springboot集成RabbitMQ死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對(duì)象傳輸?shù)姆椒?/a>
- spring boot集成rabbitmq的實(shí)例教程
- 詳解spring boot集成RabbitMQ
相關(guān)文章
Springboot使用put、delete請(qǐng)求報(bào)錯(cuò)405的處理
這篇文章主要介紹了Springboot使用put、delete請(qǐng)求報(bào)錯(cuò)405的處理方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07淺談Java數(shù)據(jù)結(jié)構(gòu)之稀疏數(shù)組知識(shí)總結(jié)
今天帶大家了解一下Java稀疏數(shù)組的相關(guān)知識(shí),文中有非常詳細(xì)的介紹及代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05SpringMVC使用MultipartResolver實(shí)現(xiàn)文件上傳
MultipartResolver 用于處理文件上傳,當(dāng)收到請(qǐng)求時(shí) DispatcherServlet 的 checkMultipart() 方法會(huì)調(diào)用 MultipartResolver 的 isMultipart() 方法判斷請(qǐng)求中是否包含文件2023-02-02springSecurity自定義登錄接口和JWT認(rèn)證過濾器的流程
這篇文章主要介紹了springSecurity自定義登陸接口和JWT認(rèn)證過濾器的相關(guān)資料,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12Java JDK動(dòng)態(tài)代理實(shí)現(xiàn)原理實(shí)例解析
這篇文章主要介紹了Java JDK動(dòng)態(tài)代理實(shí)現(xiàn)原理實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06spring boot攔截器的使用場(chǎng)景示例詳解
這篇文章主要給大家介紹了關(guān)于spring boot攔截器的使用場(chǎng)景,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05SpringBoot中的maven插件spring-boot-maven-plugin使用
這篇文章主要介紹了SpringBoot中的maven插件spring-boot-maven-plugin使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例
這篇文章主要為大家介紹了靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10